Skip to main content
Run an agent on a recurring schedule with async-native execution, cooperative cancellation, and built-in retries.

Quick Start

1

Simple Usage

import asyncio
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

async def main():
    agent = Agent(
        name="NewsChecker",
        instructions="Summarise today's AI news in 3 bullet points.",
    )

    scheduler = AsyncAgentScheduler(agent, task="Check the latest AI news")
    await scheduler.start("hourly", max_retries=3, run_immediately=True)

    # ... run your app ...

    await scheduler.stop()
    print(await scheduler.get_stats())

asyncio.run(main())
2

With Callbacks

import asyncio
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

def on_success(result):
    print(f"Agent completed successfully: {result}")

def on_failure(error):
    print(f"Agent failed: {error}")

async def main():
    agent = Agent(
        name="DataProcessor",
        instructions="Process incoming data efficiently",
    )

    scheduler = AsyncAgentScheduler(
        agent, 
        task="Process latest batch of data",
        on_success=on_success,
        on_failure=on_failure
    )

    # Run every 30 minutes
    await scheduler.start("*/30m", max_retries=3, run_immediately=True)

    # Keep running
    try:
        await asyncio.sleep(3600)  # Run for 1 hour
    finally:
        await scheduler.stop()
        stats = await scheduler.get_stats()
        print(f"Completed {stats['success_count']} successful executions")

asyncio.run(main())

How It Works

The AsyncAgentScheduler uses async-native execution with cooperative cancellation, replacing the old thread-based scheduler.

Schedule Expression Reference

ExpressionIntervalDescription
"daily"86400sEvery 24 hours
"hourly"3600sEvery hour
"*/30m"1800sEvery 30 minutes
"*/1h"3600sEvery 1 hour
"*/5s"5sEvery 5 seconds
"60"60sCustom seconds (plain digits)

Configuration Options

AsyncAgentScheduler Constructor

ParameterTypeDefaultDescription
agentAnyRequiredAgent instance to schedule
taskstrRequiredTask description to execute
configOptional[Dict[str, Any]]NoneOptional configuration dictionary
on_successOptional[Callable[[Any], None]]NoneCallback function on successful execution
on_failureOptional[Callable[[Exception], None]]NoneCallback function on failed execution

start() Method Options

ParameterTypeDefaultDescription
schedule_exprstrRequiredSchedule expression (e.g., “hourly”, ”*/1h”, “3600”)
max_retriesint3Maximum retry attempts on failure
run_immediatelyboolFalseIf True, run agent immediately before starting schedule

get_stats() Response

FieldTypeDescription
is_runningboolWhether scheduler is currently running
execution_countintTotal number of execution attempts
success_countintNumber of successful executions
failure_countintNumber of failed executions
agent_namestrName of the agent being scheduled
taskstrTask description being executed

Common Patterns

Running in FastAPI Application

from contextlib import asynccontextmanager
from fastapi import FastAPI
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

scheduler = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    global scheduler
    agent = Agent(name="BackgroundWorker", instructions="Process background tasks")
    scheduler = AsyncAgentScheduler(agent, task="Process pending tasks")
    await scheduler.start("*/5m", max_retries=2)
    
    yield
    
    # Shutdown
    if scheduler:
        await scheduler.stop()

app = FastAPI(lifespan=lifespan)

Error Handling with Logging

import asyncio
import logging
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

logging.basicConfig(level=logging.INFO)

def handle_failure(error):
    logging.error(f"Agent execution failed: {error}")
    # Send alert, write to database, etc.

async def main():
    agent = Agent(name="MonitoringAgent", instructions="Monitor system health")
    scheduler = AsyncAgentScheduler(
        agent, 
        task="Check system status",
        on_failure=handle_failure
    )
    
    await scheduler.start("*/10m")
    
    try:
        await asyncio.sleep(float('inf'))
    except KeyboardInterrupt:
        await scheduler.stop()

asyncio.run(main())

Graceful Shutdown on SIGINT

import asyncio
import signal
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

scheduler = None

def signal_handler():
    if scheduler:
        asyncio.create_task(scheduler.stop())

async def main():
    global scheduler
    
    # Setup signal handling
    for sig in [signal.SIGINT, signal.SIGTERM]:
        signal.signal(sig, lambda s, f: signal_handler())
    
    agent = Agent(name="LongRunningAgent", instructions="Process data continuously")
    scheduler = AsyncAgentScheduler(agent, task="Process data batch")
    
    await scheduler.start("*/15m", run_immediately=True)
    
    try:
        # Keep running until signal
        await asyncio.sleep(float('inf'))
    except KeyboardInterrupt:
        print("Received interrupt, shutting down gracefully...")
    finally:
        if scheduler:
            await scheduler.stop()
            stats = await scheduler.get_stats()
            print(f"Final stats: {stats}")

asyncio.run(main())

Best Practices

The stop() method waits up to 30 seconds for the current execution to complete before canceling. This prevents data corruption and ensures clean shutdown.
# Good
try:
    await scheduler.start("hourly")
    await asyncio.sleep(3600)
finally:
    await scheduler.stop()

# Bad - may interrupt agent mid-execution
await scheduler.start("hourly")
await asyncio.sleep(3600)
# Exit without stopping
Enable run_immediately=True to verify your agent works correctly before waiting for the first scheduled interval.
# Test immediately, then schedule
await scheduler.start("hourly", run_immediately=True)

# Good for smoke tests
await scheduler.start("daily", run_immediately=True)
Success and failure callbacks are called synchronously. Heavy operations should be offloaded to avoid blocking the scheduler.
# Good - lightweight logging
def on_success(result):
    logger.info(f"Agent completed: {result}")

# Bad - heavy database operations
def on_success(result):
    database.save_large_dataset(result)  # Blocks scheduler
    
# Better - offload heavy work
def on_success(result):
    asyncio.create_task(save_to_database(result))
For new code, use AsyncAgentScheduler instead of the legacy AgentScheduler. The async version provides better cancellation, no daemon threads, and fits naturally into async applications.
# New async approach
from praisonai.async_agent_scheduler import AsyncAgentScheduler
scheduler = AsyncAgentScheduler(agent, task)
await scheduler.start("hourly")

# Legacy thread-based (avoid for new code)
from praisonai.scheduler import AgentScheduler
scheduler = AgentScheduler(agent, task)
scheduler.start("hourly")

Migration from Legacy Scheduler: AsyncAgentScheduler replaces the thread-based AgentScheduler for new applications. The sync-looking public CLI (praisonai schedule ...) is unchanged and continues to work as before. Use AsyncAgentScheduler when embedding PraisonAI in async applications like FastAPI, or when you need cooperative cancellation.
Jupyter/Event Loop Compatibility: Starting with PR #1448, PraisonAI no longer calls nest_asyncio.apply() or asyncio.set_event_loop() on your behalf when ACP/LSP is enabled. If you embed PraisonAI inside a Jupyter kernel or another running event loop, either call nest_asyncio.apply() yourself at the top of your notebook, or run PraisonAI from a separate process.

Scheduler CLI

Command-line interface for scheduling agents

Background Tasks

Running agents as background processes