Background Tasks
Execute agent tasks and recipes asynchronously without blocking the main thread. Monitor progress, cancel running tasks, and manage concurrent execution.
Quick Start
Agent-Centric Usage
import asyncio
from praisonaiagents import Agent
from praisonaiagents.background import BackgroundRunner, BackgroundConfig
async def main():
# Create background runner
runner = BackgroundRunner(config=BackgroundConfig(max_concurrent_tasks=3))
# Agent with background task support
agent = Agent(
name="AsyncAssistant",
instructions="You are a research assistant.",
background=runner
)
# Submit agent task to run in background
task = await agent.background.submit_agent(
agent=agent,
prompt="Research AI trends in 2025",
name="research_task"
)
# Continue with other work while task runs...
await task.wait(timeout=60.0)
print(task.result)
asyncio.run(main())
Using Recipe Operations
from praisonai import recipe
# Submit recipe as background task
task = recipe.run_background(
"my-recipe",
input={"query": "What is AI?"},
config={"max_tokens": 1000},
session_id="session_123",
timeout_sec=300,
)
print(f"Task ID: {task.task_id}")
# Check status
status = await task.status()
# Wait for completion
result = await task.wait(timeout=600)
print(f"Result: {result}")
# Cancel if needed
await task.cancel()
Features
- Async Execution: Run tasks without blocking
- Concurrency Control: Limit concurrent tasks
- Progress Tracking: Monitor task status
- Timeout Support: Set execution time limits
- Cancellation: Cancel running tasks
Configuration
from praisonaiagents.background import BackgroundConfig
config = BackgroundConfig(
max_concurrent_tasks=5, # Max parallel tasks
default_timeout=300.0, # 5 minute default timeout
auto_cleanup=True # Auto-remove completed tasks
)
Task Status
from praisonaiagents.background import TaskStatus
# Check status
if task.status == TaskStatus.COMPLETED:
print(f"Result: {task.result}")
elif task.status == TaskStatus.FAILED:
print(f"Error: {task.error}")
elif task.status == TaskStatus.RUNNING:
print("Still running...")
CLI Usage
# Submit a recipe as background task
praisonai background submit --recipe my-recipe
# List tasks
praisonai background list
# Check status
praisonai background status <task_id>
# Cancel task
praisonai background cancel <task_id>
# Clear completed
praisonai background clear
Safe Defaults
| Setting | Default | Description |
|---|
timeout_sec | 300 | Maximum execution time (5 minutes) |
max_concurrent | 5 | Maximum concurrent tasks |
cleanup_delay_sec | 3600 | Time before completed tasks are cleaned up |
Low-level API Reference
BackgroundRunner Direct Usage
import asyncio
from praisonaiagents.background import BackgroundRunner, BackgroundConfig
async def main():
# Create runner with config
config = BackgroundConfig(max_concurrent_tasks=3)
runner = BackgroundRunner(config=config)
# Define a task
async def my_task(name: str) -> str:
await asyncio.sleep(2)
return f"Task {name} completed"
# Submit task
task = await runner.submit(my_task, args=("example",), name="my_task")
print(f"Submitted: {task.id[:8]}")
# Wait for completion
await task.wait(timeout=10.0)
print(f"Result: {task.result}")
asyncio.run(main())
Submitting Tasks
# Submit async function
task = await runner.submit(
func=my_async_function,
args=(arg1, arg2),
kwargs={"key": "value"},
name="descriptive_name",
timeout=60.0
)
# Submit sync function (runs in thread pool)
task = await runner.submit(
func=my_sync_function,
args=(arg1,),
name="sync_task"
)
Task Management
# List all tasks
for task in runner.tasks:
print(f"{task.name}: {task.status.value}")
# Get running tasks
running = runner.running_tasks
# Get pending tasks
pending = runner.pending_tasks
# Clear completed tasks
runner.clear_completed()
Synchronous Job Manager
For simpler use cases, use BackgroundJobManager for synchronous job management:
from praisonaiagents.background.job_manager import BackgroundJobManager, JobStatus
# Create manager with auto-background threshold
manager = BackgroundJobManager(auto_background_threshold=5.0)
# Start a job
job_id = manager.start_job(lambda: expensive_computation())
# Check status
status = manager.get_status(job_id)
if status == JobStatus.COMPLETED:
result = manager.get_result(job_id)
elif status == JobStatus.FAILED:
error = manager.get_error(job_id)
# List all jobs
for job_id, info in manager.list_jobs().items():
print(f"{job_id}: {info.status}")
# Cancel a running job
manager.cancel_job(job_id)
Architecture
The sync wrappers and ScheduleLoop bridge the gap between disconnected modules:
submit_sync() / submit_agent_sync() — let sync code submit background tasks without asyncio boilerplate
ScheduleLoop — polls for due jobs on a daemon thread and fires your callback
Sync Wrappers
For sync code (scripts, bot handlers, Agent.start() callbacks), use the sync-friendly methods that handle asyncio automatically:
submit_sync()
Submit any callable from synchronous code. A daemon event loop thread is created lazily on first call.
from praisonaiagents.background.runner import BackgroundRunner
import time
runner = BackgroundRunner()
def heavy_computation(data):
time.sleep(10)
return {"result": sum(data)}
# Non-blocking — returns immediately
task = runner.submit_sync(
func=heavy_computation,
args=([1, 2, 3, 4, 5],),
name="compute"
)
print(task.status) # "running"
# Check later
while not task.is_completed:
time.sleep(1)
print(f"Result: {task.result}") # {'result': 15}
submit_agent_sync()
Submit an Agent task from synchronous code. Resolves the agent’s callable (start → chat → run) automatically.
from praisonaiagents import Agent
from praisonaiagents.background.runner import BackgroundRunner
agent = Agent(name="researcher", instructions="Research AI trends")
runner = BackgroundRunner()
task = runner.submit_agent_sync(
agent=agent,
prompt="What are the top AI trends in 2026?",
name="research-task"
)
# task.result will contain the agent's response when done
| Parameter | Type | Required | Description |
|---|
func / agent | Callable / Agent | Yes | Function or Agent to execute |
args | tuple | No | Positional arguments (submit_sync only) |
prompt | str | Yes | Agent prompt (submit_agent_sync only) |
name | str | No | Human-readable task name |
timeout | float | No | Timeout in seconds |
on_complete | Callable | No | Callback when task completes |
ScheduleLoop
ScheduleLoop bridges scheduled jobs to actual execution. It runs a daemon thread that polls get_due_jobs() and fires your callback.
from praisonaiagents.scheduler import ScheduleLoop
def handle_job(job):
print(f"🔔 Firing: {job.name} — {job.message}")
loop = ScheduleLoop(
on_trigger=handle_job,
tick_seconds=30, # check every 30 seconds
)
loop.start() # daemon thread — won't block
# loop.stop() # clean shutdown when needed
Combined Example: Scheduler + Background
from praisonaiagents import Agent
from praisonaiagents.tools import schedule_add, schedule_list, schedule_remove
from praisonaiagents.scheduler import ScheduleLoop
from praisonaiagents.background.runner import BackgroundRunner
agent = Agent(
name="assistant",
instructions="You can set reminders and schedules.",
tools=[schedule_add, schedule_list, schedule_remove],
)
runner = BackgroundRunner()
def on_schedule_fire(job):
task = runner.submit_agent_sync(agent, job.message, name=f"schedule-{job.name}")
print(f"→ Background task {task.id} started for '{job.name}'")
loop = ScheduleLoop(on_trigger=on_schedule_fire, tick_seconds=30)
loop.start()
# Agent creates schedules that now actually fire!
agent.start("Remind me to check email every morning at 7am")
| Parameter | Type | Default | Description |
|---|
on_trigger | Callable | required | Called with each due ScheduleJob |
store | FileScheduleStore | Auto-created | Schedule store to poll |
tick_seconds | float | 30.0 | Poll interval in seconds |
| Method | Description |
|---|
loop.start() | Start polling (no-op if already running) |
loop.stop(timeout=5.0) | Signal stop and wait for thread exit |
loop.is_running | Whether the daemon thread is alive |
Error handling: If on_trigger() raises an exception, it’s logged but not propagated — the loop continues with remaining jobs and future ticks.
The background module uses lazy loading — no overhead when not used:
# Only loads when accessed
from praisonaiagents.background import BackgroundRunner
# Sync loop thread only created on first submit_sync() call
import praisonaiagents.background.runner as br
assert br._bg_loop is None # Not created until needed
See Also