> ## Documentation Index
> Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Async Agent Scheduler

> Run agents on a schedule with async-native execution, cancellation, and retries

Run an agent on a recurring schedule with async-native execution, cooperative cancellation, and built-in retries.

```mermaid theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
graph LR
    subgraph "Async Agent Scheduler"
        A[📋 Agent] --> B[⏰ Scheduler]
        B --> C[🔄 Executor]
        C --> D[✅ Success]
        C --> E[❌ Failure]
        D --> F[📊 Stats]
        E --> F
    end
    
    classDef agent fill:#8B0000,stroke:#7C90A0,color:#fff
    classDef scheduler fill:#F59E0B,stroke:#7C90A0,color:#fff
    classDef executor fill:#189AB4,stroke:#7C90A0,color:#fff
    classDef success fill:#10B981,stroke:#7C90A0,color:#fff
    classDef failure fill:#8B0000,stroke:#7C90A0,color:#fff
    classDef stats fill:#6366F1,stroke:#7C90A0,color:#fff
    
    class A agent
    class B scheduler
    class C executor
    class D success
    class E failure
    class F stats
```

## Quick Start

<Steps>
  <Step title="Simple Usage">
    ```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
    import asyncio
    from praisonaiagents import Agent
    from praisonai.async_agent_scheduler import AsyncAgentScheduler

    # Migration note: this import will move to praisonai.scheduler.async_agent_scheduler
    # See import paths section below

    async def main():
        agent = Agent(
            name="NewsChecker",
            instructions="Summarise today's AI news in 3 bullet points.",
        )

        scheduler = AsyncAgentScheduler(agent, task="Check the latest AI news")
        await scheduler.start("hourly", max_retries=3, run_immediately=True)

        # ... run your app ...

        await scheduler.stop()
        print(await scheduler.get_stats())

    asyncio.run(main())
    ```
  </Step>

  <Step title="With Callbacks">
    ```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
    import asyncio
    from praisonaiagents import Agent
    from praisonai.async_agent_scheduler import AsyncAgentScheduler

    def on_success(result):
        print(f"Agent completed successfully: {result}")

    def on_failure(error):
        print(f"Agent failed: {error}")

    async def main():
        agent = Agent(
            name="DataProcessor",
            instructions="Process incoming data efficiently",
        )

        scheduler = AsyncAgentScheduler(
            agent, 
            task="Process latest batch of data",
            on_success=on_success,
            on_failure=on_failure
        )

        # Run every 30 minutes
        await scheduler.start("*/30m", max_retries=3, run_immediately=True)

        # Keep running
        try:
            await asyncio.sleep(3600)  # Run for 1 hour
        finally:
            await scheduler.stop()
            stats = await scheduler.get_stats()
            print(f"Completed {stats['success_count']} successful executions")

    asyncio.run(main())
    ```
  </Step>
</Steps>

***

## How It Works

```mermaid theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
sequenceDiagram
    participant User
    participant Scheduler
    participant Agent
    participant Executor
    
    User->>Scheduler: start(schedule_expr)
    Scheduler->>Scheduler: Parse interval
    
    loop Every Interval
        Scheduler->>Executor: execute(task)
        Executor->>Agent: start(task) or astart(task)
        Agent-->>Executor: Result
        alt Success
            Executor-->>Scheduler: Success callback
        else Failure
            Executor-->>Scheduler: Retry with backoff
        end
    end
    
    User->>Scheduler: stop()
    Scheduler->>Scheduler: Cancel gracefully (30s timeout)
```

The AsyncAgentScheduler uses async-native execution with cooperative cancellation, replacing the old thread-based scheduler.

<Note>
  The scheduler's async primitives (`_stop_event`, `_cancel_event`, `_stats_lock`) are now created lazily inside `_ensure_async_primitives()` and bound to the loop that `start()` runs on. Tests that call `stop()` without first calling `start()` must invoke `scheduler._ensure_async_primitives()` explicitly — see `tests/unit/scheduler/test_async_agent_scheduler.py` in PR #1583 for the canonical pattern.
</Note>

***

## Schedule Expression Reference

| Expression | Interval | Description                   |
| ---------- | -------- | ----------------------------- |
| `"daily"`  | 86400s   | Every 24 hours                |
| `"hourly"` | 3600s    | Every hour                    |
| `"*/30m"`  | 1800s    | Every 30 minutes              |
| `"*/1h"`   | 3600s    | Every 1 hour                  |
| `"*/5s"`   | 5s       | Every 5 seconds               |
| `"60"`     | 60s      | Custom seconds (plain digits) |

***

## Configuration Options

### AsyncAgentScheduler Constructor

| Parameter    | Type                                    | Default  | Description                               |
| ------------ | --------------------------------------- | -------- | ----------------------------------------- |
| `agent`      | `Any`                                   | Required | Agent instance to schedule                |
| `task`       | `str`                                   | Required | Task description to execute               |
| `config`     | `Optional[Dict[str, Any]]`              | `None`   | Optional configuration dictionary         |
| `on_success` | `Optional[Callable[[Any], None]]`       | `None`   | Callback function on successful execution |
| `on_failure` | `Optional[Callable[[Exception], None]]` | `None`   | Callback function on failed execution     |

### start() Method Options

| Parameter         | Type   | Default  | Description                                             |
| ----------------- | ------ | -------- | ------------------------------------------------------- |
| `schedule_expr`   | `str`  | Required | Schedule expression (e.g., "hourly", "\*/1h", "3600")   |
| `max_retries`     | `int`  | `3`      | Maximum retry attempts on failure                       |
| `run_immediately` | `bool` | `False`  | If True, run agent immediately before starting schedule |

### get\_stats() Response

| Field             | Type   | Description                            |
| ----------------- | ------ | -------------------------------------- |
| `is_running`      | `bool` | Whether scheduler is currently running |
| `execution_count` | `int`  | Total number of execution attempts     |
| `success_count`   | `int`  | Number of successful executions        |
| `failure_count`   | `int`  | Number of failed executions            |
| `agent_name`      | `str`  | Name of the agent being scheduled      |
| `task`            | `str`  | Task description being executed        |

***

## Common Patterns

### Running in FastAPI Application

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from contextlib import asynccontextmanager
from fastapi import FastAPI
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

scheduler = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    global scheduler
    agent = Agent(name="BackgroundWorker", instructions="Process background tasks")
    scheduler = AsyncAgentScheduler(agent, task="Process pending tasks")
    await scheduler.start("*/5m", max_retries=2)
    
    yield
    
    # Shutdown
    if scheduler:
        await scheduler.stop()

app = FastAPI(lifespan=lifespan)
```

### Error Handling with Logging

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import asyncio
import logging
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

logging.basicConfig(level=logging.INFO)

def handle_failure(error):
    logging.error(f"Agent execution failed: {error}")
    # Send alert, write to database, etc.

async def main():
    agent = Agent(name="MonitoringAgent", instructions="Monitor system health")
    scheduler = AsyncAgentScheduler(
        agent, 
        task="Check system status",
        on_failure=handle_failure
    )
    
    await scheduler.start("*/10m")
    
    try:
        await asyncio.sleep(float('inf'))
    except KeyboardInterrupt:
        await scheduler.stop()

asyncio.run(main())
```

### Graceful Shutdown on SIGINT

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import asyncio
import signal
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

scheduler = None

def signal_handler():
    if scheduler:
        asyncio.create_task(scheduler.stop())

async def main():
    global scheduler
    
    # Setup signal handling
    for sig in [signal.SIGINT, signal.SIGTERM]:
        signal.signal(sig, lambda s, f: signal_handler())
    
    agent = Agent(name="LongRunningAgent", instructions="Process data continuously")
    scheduler = AsyncAgentScheduler(agent, task="Process data batch")
    
    await scheduler.start("*/15m", run_immediately=True)
    
    try:
        # Keep running until signal
        await asyncio.sleep(float('inf'))
    except KeyboardInterrupt:
        print("Received interrupt, shutting down gracefully...")
    finally:
        if scheduler:
            await scheduler.stop()
            stats = await scheduler.get_stats()
            print(f"Final stats: {stats}")

asyncio.run(main())
```

***

## Best Practices

<AccordionGroup>
  <Accordion title="Always await scheduler.stop() before exiting">
    The `stop()` method waits up to 30 seconds for the current execution to complete before canceling. This prevents data corruption and ensures clean shutdown.

    ```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
    # Good
    try:
        await scheduler.start("hourly")
        await asyncio.sleep(3600)
    finally:
        await scheduler.stop()

    # Bad - may interrupt agent mid-execution
    await scheduler.start("hourly")
    await asyncio.sleep(3600)
    # Exit without stopping
    ```
  </Accordion>

  <Accordion title="Use run_immediately=True for testing">
    Enable `run_immediately=True` to verify your agent works correctly before waiting for the first scheduled interval.

    ```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
    # Test immediately, then schedule
    await scheduler.start("hourly", run_immediately=True)

    # Good for smoke tests
    await scheduler.start("daily", run_immediately=True)
    ```
  </Accordion>

  <Accordion title="Keep callbacks lightweight">
    Success and failure callbacks are called synchronously. Heavy operations should be offloaded to avoid blocking the scheduler.

    ```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
    # Good - lightweight logging
    def on_success(result):
        logger.info(f"Agent completed: {result}")

    # Bad - heavy database operations
    def on_success(result):
        database.save_large_dataset(result)  # Blocks scheduler
        
    # Better - offload heavy work
    def on_success(result):
        asyncio.create_task(save_to_database(result))
    ```
  </Accordion>

  <Accordion title="Prefer AsyncAgentScheduler over legacy thread-based scheduler">
    For new code, use `AsyncAgentScheduler` instead of the legacy `AgentScheduler`. The async version provides better cancellation, no daemon threads, and fits naturally into async applications.

    ```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
    # New async approach
    from praisonai.async_agent_scheduler import AsyncAgentScheduler
    scheduler = AsyncAgentScheduler(agent, task)
    await scheduler.start("hourly")

    # Legacy thread-based (avoid for new code)
    from praisonai.scheduler import AgentScheduler
    scheduler = AgentScheduler(agent, task)
    scheduler.start("hourly")
    ```
  </Accordion>
</AccordionGroup>

***

<Note>
  **Import paths (PR #1552):**

  * **Pending deprecation** (still works, emits `PendingDeprecationWarning` — will move to `praisonai.scheduler.async_agent_scheduler` in a future release): `from praisonai.async_agent_scheduler import AsyncAgentScheduler`
  * **Canonical:** `from praisonai.scheduler import AgentScheduler` (sync scheduler)
  * **Deprecated** (still works, emits `DeprecationWarning`): `from praisonai.agent_scheduler import AgentScheduler`

  **Migration from Legacy Scheduler:** `AsyncAgentScheduler` replaces the thread-based `AgentScheduler` for new applications. The sync-looking public CLI (`praisonai schedule ...`) is unchanged and continues to work as before.
</Note>

<Warning>
  **Jupyter/Event Loop Compatibility:** Starting with [PR #1448](https://github.com/MervinPraison/PraisonAI/pull/1448), PraisonAI no longer calls `nest_asyncio.apply()` or `asyncio.set_event_loop()` on your behalf when ACP/LSP is enabled. If you embed PraisonAI inside a Jupyter kernel or another running event loop, either call `nest_asyncio.apply()` yourself at the top of your notebook, or run PraisonAI from a separate process.
</Warning>

***

## Related

<CardGroup cols={2}>
  <Card title="Scheduler CLI" icon="terminal" href="/docs/cli/scheduler">
    Command-line interface for scheduling agents
  </Card>

  <Card title="Background Tasks" icon="play" href="/docs/features/background-tasks">
    Running agents as background processes
  </Card>
</CardGroup>
