Skip to main content
Async-native agent scheduler that replaces daemon threads with proper async execution and cooperative cancellation.

Quick Start

1

Simple Usage

Create and start an async scheduler with basic configuration.
import asyncio
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

async def main():
    agent = Agent(
        name="NewsChecker", 
        instructions="Summarise the latest AI news."
    )
    
    scheduler = AsyncAgentScheduler(
        agent=agent, 
        task="Summarise top 3 AI stories"
    )
    
    await scheduler.start("hourly", max_retries=3, run_immediately=True)
    
    # Let it run for 2 hours
    await asyncio.sleep(3600 * 2)
    await scheduler.stop()

asyncio.run(main())
2

With Callbacks and Configuration

Add success/failure callbacks and custom configuration.
import asyncio
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

def success_callback(result):
    print(f"✅ Success: {result}")

async def async_failure_callback(exception):
    print(f"❌ Async failure: {exception}")

async def main():
    agent = Agent(
        name="DataProcessor",
        instructions="Process and analyze data efficiently."
    )
    
    scheduler = AsyncAgentScheduler(
        agent=agent,
        task="Process latest data batch",
        config={"timeout": 120},
        on_success=success_callback,
        on_failure=async_failure_callback
    )
    
    await scheduler.start("*/30m", max_retries=5)
    
    # Monitor stats
    stats = await scheduler.get_stats()
    print(f"Stats: {stats}")
    
    await scheduler.stop()

asyncio.run(main())

How It Works

PhaseDescription
InitializationCreates async primitives lazily on first use
SchedulingRuns agent at specified intervals with exponential backoff
ExecutionUses thread pool for sync agents, direct await for async agents
CancellationCooperative cancellation via asyncio.Event

Configuration Options

AsyncAgentScheduler API Reference

Complete parameter documentation and examples

Constructor Parameters

ParameterTypeDefaultDescription
agentAnyRequiredAgent instance to schedule
taskstrRequiredTask description to execute
configDict[str, Any]{}Optional configuration
on_successCallableNoneSuccess callback (sync or async)
on_failureCallableNoneFailure callback (sync or async)

Start Parameters

ParameterTypeDefaultDescription
schedule_exprstrRequiredSchedule interval expression
max_retriesint3Total attempts (1 initial + retries)
run_immediatelyboolFalseExecute immediately before scheduling

Common Patterns

Pattern 1: Event Loop Safety

The scheduler is safe to construct outside an event loop:
# Safe to create in sync code
scheduler = AsyncAgentScheduler(agent, task)

async def run_later():
    # Async primitives created here
    await scheduler.start("hourly")

Pattern 2: FastAPI Integration

from fastapi import FastAPI
from contextlib import asynccontextmanager

scheduler = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global scheduler
    scheduler = AsyncAgentScheduler(agent, "Background task")
    await scheduler.start("*/10m")
    yield
    await scheduler.stop()

app = FastAPI(lifespan=lifespan)

Pattern 3: Cancellation Handling

async def graceful_shutdown():
    try:
        await scheduler.start("hourly")
    except asyncio.CancelledError:
        print("Scheduler cancelled")
    finally:
        await scheduler.stop()

Best Practices

Always create async primitives lazily. The scheduler binds asyncio.Event and asyncio.Lock to the caller’s loop on first async entry, preventing “different loop” errors.
# ✅ Good: Constructor safe in sync code
scheduler = AsyncAgentScheduler(agent, task)

async def later():
    # ✅ Good: Async primitives created here
    await scheduler.start("hourly")
Use both sync and async callbacks safely. The safe_call utility handles both types automatically.
def sync_callback(result):
    print(f"Sync: {result}")

async def async_callback(result):
    await log_to_database(result)

# ✅ Both work seamlessly
scheduler = AsyncAgentScheduler(
    agent=agent,
    task=task,
    on_success=async_callback,
    on_failure=sync_callback
)
Use exponential backoff with jitter. Both sync and async schedulers share the same backoff_delay algorithm for consistency.
# Delay formula: min(max(30, 2 ** attempt), 300) * jitter
# - Floor: ~27s, Cap: 300s
# - Same behavior as sync AgentScheduler
await scheduler.start("hourly", max_retries=5)
Always await stop() for proper cleanup and statistics reporting.
try:
    await scheduler.start("hourly")
    await asyncio.sleep(3600)
finally:
    stats = await scheduler.get_stats()
    await scheduler.stop()
    print(f"Final stats: {stats}")

Sync Agent Scheduler

Thread-based scheduler for sync environments

Shared Scheduler Utilities

Common primitives used by both schedulers