> ## Documentation Index
> Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Background Tasks

> Run agent tasks and recipes asynchronously in the background

# Background Tasks

Execute agent tasks and recipes asynchronously without blocking the main thread. Monitor progress, cancel running tasks, and manage concurrent execution.

## Quick Start

### Agent-Centric Usage

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import asyncio
from praisonaiagents import Agent
from praisonaiagents.background import BackgroundRunner, BackgroundConfig

async def main():
    # Create background runner
    runner = BackgroundRunner(config=BackgroundConfig(max_concurrent_tasks=3))
    
    # Agent with background task support
    agent = Agent(
        name="AsyncAssistant",
        instructions="You are a research assistant.",
        background=runner
    )
    
    # Submit agent task to run in background
    task = await agent.background.submit_agent(
        agent=agent,
        prompt="Research AI trends in 2025",
        name="research_task"
    )
    
    # Continue with other work while task runs...
    await task.wait(timeout=60.0)
    print(task.result)

asyncio.run(main())
```

### Using Recipe Operations

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonai import recipe

# Submit recipe as background task
task = recipe.run_background(
    "my-recipe",
    input={"query": "What is AI?"},
    config={"max_tokens": 1000},
    session_id="session_123",
    timeout_sec=300,
)

print(f"Task ID: {task.task_id}")

# Check status
status = await task.status()

# Wait for completion
result = await task.wait(timeout=600)
print(f"Result: {result}")

# Cancel if needed
await task.cancel()
```

## Features

* **Async Execution**: Run tasks without blocking
* **Concurrency Control**: Limit concurrent tasks
* **Progress Tracking**: Monitor task status
* **Timeout Support**: Set execution time limits
* **Cancellation**: Cancel running tasks

## Configuration

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents.background import BackgroundConfig

config = BackgroundConfig(
    max_concurrent_tasks=5,    # Max parallel tasks
    default_timeout=300.0,     # 5 minute default timeout
    auto_cleanup=True          # Auto-remove completed tasks
)
```

## Task Status

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents.background import TaskStatus

# Check status
if task.status == TaskStatus.COMPLETED:
    print(f"Result: {task.result}")
elif task.status == TaskStatus.FAILED:
    print(f"Error: {task.error}")
elif task.status == TaskStatus.RUNNING:
    print("Still running...")
```

## CLI Usage

```bash theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Submit a recipe as background task
praisonai background submit --recipe my-recipe

# List tasks
praisonai background list

# Check status
praisonai background status <task_id>

# Cancel task
praisonai background cancel <task_id>

# Clear completed
praisonai background clear
```

## Safe Defaults

| Setting             | Default | Description                                |
| ------------------- | ------- | ------------------------------------------ |
| `timeout_sec`       | 300     | Maximum execution time (5 minutes)         |
| `max_concurrent`    | 5       | Maximum concurrent tasks                   |
| `cleanup_delay_sec` | 3600    | Time before completed tasks are cleaned up |

***

## Low-level API Reference

### BackgroundRunner Direct Usage

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import asyncio
from praisonaiagents.background import BackgroundRunner, BackgroundConfig

async def main():
    # Create runner with config
    config = BackgroundConfig(max_concurrent_tasks=3)
    runner = BackgroundRunner(config=config)
    
    # Define a task
    async def my_task(name: str) -> str:
        await asyncio.sleep(2)
        return f"Task {name} completed"
    
    # Submit task
    task = await runner.submit(my_task, args=("example",), name="my_task")
    print(f"Submitted: {task.id[:8]}")
    
    # Wait for completion
    await task.wait(timeout=10.0)
    print(f"Result: {task.result}")

asyncio.run(main())
```

### Submitting Tasks

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Submit async function
task = await runner.submit(
    func=my_async_function,
    args=(arg1, arg2),
    kwargs={"key": "value"},
    name="descriptive_name",
    timeout=60.0
)

# Submit sync function (runs in thread pool)
task = await runner.submit(
    func=my_sync_function,
    args=(arg1,),
    name="sync_task"
)
```

### Task Management

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# List all tasks
for task in runner.tasks:
    print(f"{task.name}: {task.status.value}")

# Get running tasks
running = runner.running_tasks

# Get pending tasks
pending = runner.pending_tasks

# Clear completed tasks
runner.clear_completed()
```

### Synchronous Job Manager

For simpler use cases, use `BackgroundJobManager` for synchronous job management:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents.background.job_manager import BackgroundJobManager, JobStatus

# Create manager with auto-background threshold
manager = BackgroundJobManager(auto_background_threshold=5.0)

# Start a job
job_id = manager.start_job(lambda: expensive_computation())

# Check status
status = manager.get_status(job_id)
if status == JobStatus.COMPLETED:
    result = manager.get_result(job_id)
elif status == JobStatus.FAILED:
    error = manager.get_error(job_id)

# List all jobs
for job_id, info in manager.list_jobs().items():
    print(f"{job_id}: {info.status}")

# Cancel a running job
manager.cancel_job(job_id)
```

## Architecture

The sync wrappers and `ScheduleLoop` bridge the gap between disconnected modules:

```mermaid theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
graph TB
    subgraph Scheduler["scheduler/ module"]
        S1["schedule_add()"]
        S2["FileScheduleStore"]
        S3["ScheduleRunner"]
        S4["ScheduleLoop ✨"]
        S1 --> S2
        S3 --> S2
        S4 -->|"polls"| S3
    end

    subgraph Background["background/ module"]
        B1["BackgroundRunner"]
        B2["BackgroundTask"]
        B3["submit_sync() ✨"]
        B4["submit_agent_sync() ✨"]
        B1 --> B2
        B3 --> B1
        B4 --> B1
    end

    subgraph YourApp["Your Application"]
        C1["Sync Code"]
        C2["on_trigger callback"]
    end

    S4 -->|"fires"| C2
    C2 -->|"can use"| B3
    C1 -->|"direct use"| B3
    C1 -->|"agent tasks"| B4
```

* **`submit_sync()` / `submit_agent_sync()`** — let sync code submit background tasks without asyncio boilerplate
* **`ScheduleLoop`** — polls for due jobs on a daemon thread and fires your callback

## Sync Wrappers

For sync code (scripts, bot handlers, `Agent.start()` callbacks), use the sync-friendly methods that handle asyncio automatically:

### submit\_sync()

Submit any callable from synchronous code. A daemon event loop thread is created lazily on first call.

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents.background.runner import BackgroundRunner
import time

runner = BackgroundRunner()

def heavy_computation(data):
    time.sleep(10)
    return {"result": sum(data)}

# Non-blocking — returns immediately
task = runner.submit_sync(
    func=heavy_computation,
    args=([1, 2, 3, 4, 5],),
    name="compute"
)

print(task.status)  # "running"

# Check later
while not task.is_completed:
    time.sleep(1)
print(f"Result: {task.result}")  # {'result': 15}
```

### submit\_agent\_sync()

Submit an Agent task from synchronous code. Resolves the agent's callable (`start` → `chat` → `run`) automatically.

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents import Agent
from praisonaiagents.background.runner import BackgroundRunner

agent = Agent(name="researcher", instructions="Research AI trends")
runner = BackgroundRunner()

task = runner.submit_agent_sync(
    agent=agent,
    prompt="What are the top AI trends in 2026?",
    name="research-task"
)
# task.result will contain the agent's response when done
```

| Parameter        | Type             | Required | Description                              |
| ---------------- | ---------------- | -------- | ---------------------------------------- |
| `func` / `agent` | Callable / Agent | Yes      | Function or Agent to execute             |
| `args`           | tuple            | No       | Positional arguments (submit\_sync only) |
| `prompt`         | str              | Yes      | Agent prompt (submit\_agent\_sync only)  |
| `name`           | str              | No       | Human-readable task name                 |
| `timeout`        | float            | No       | Timeout in seconds                       |
| `on_complete`    | Callable         | No       | Callback when task completes             |

***

## ScheduleLoop

`ScheduleLoop` bridges scheduled jobs to actual execution. It runs a daemon thread that polls `get_due_jobs()` and fires your callback.

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents.scheduler import ScheduleLoop

def handle_job(job):
    print(f"🔔 Firing: {job.name} — {job.message}")

loop = ScheduleLoop(
    on_trigger=handle_job,
    tick_seconds=30,  # check every 30 seconds
)
loop.start()   # daemon thread — won't block
# loop.stop()  # clean shutdown when needed
```

### Combined Example: Scheduler + Background

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents import Agent
from praisonaiagents.tools import schedule_add, schedule_list, schedule_remove
from praisonaiagents.scheduler import ScheduleLoop
from praisonaiagents.background.runner import BackgroundRunner

agent = Agent(
    name="assistant",
    instructions="You can set reminders and schedules.",
    tools=[schedule_add, schedule_list, schedule_remove],
)

runner = BackgroundRunner()

def on_schedule_fire(job):
    task = runner.submit_agent_sync(agent, job.message, name=f"schedule-{job.name}")
    print(f"→ Background task {task.id} started for '{job.name}'")

loop = ScheduleLoop(on_trigger=on_schedule_fire, tick_seconds=30)
loop.start()

# Agent creates schedules that now actually fire!
agent.start("Remind me to check email every morning at 7am")
```

| Parameter      | Type              | Default      | Description                        |
| -------------- | ----------------- | ------------ | ---------------------------------- |
| `on_trigger`   | Callable          | *required*   | Called with each due `ScheduleJob` |
| `store`        | FileScheduleStore | Auto-created | Schedule store to poll             |
| `tick_seconds` | float             | `30.0`       | Poll interval in seconds           |

| Method                   | Description                              |
| ------------------------ | ---------------------------------------- |
| `loop.start()`           | Start polling (no-op if already running) |
| `loop.stop(timeout=5.0)` | Signal stop and wait for thread exit     |
| `loop.is_running`        | Whether the daemon thread is alive       |

<Info>
  **Error handling:** If `on_trigger()` raises an exception, it's logged but not propagated — the loop continues with remaining jobs and future ticks.
</Info>

***

## Zero Performance Impact

The background module uses lazy loading — no overhead when not used:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Only loads when accessed
from praisonaiagents.background import BackgroundRunner

# Sync loop thread only created on first submit_sync() call
import praisonaiagents.background.runner as br
assert br._bg_loop is None  # Not created until needed
```

## See Also

* [Background Tasks CLI](/docs/cli/background)
* [Schedule Tools](/docs/tools/schedule-tools)
* [Scheduler CLI](/docs/cli/scheduler)
