Documentation Index
Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Background Tasks
Execute agent tasks and recipes asynchronously without blocking the main thread. Monitor progress, cancel running tasks, and manage concurrent execution.
Quick Start
Agent-Centric Usage
import asyncio
from praisonaiagents import Agent
from praisonaiagents.background import BackgroundRunner, BackgroundConfig
async def main():
# Create background runner
runner = BackgroundRunner(config=BackgroundConfig(max_concurrent_tasks=3))
# Agent with background task support
agent = Agent(
name="AsyncAssistant",
instructions="You are a research assistant.",
background=runner
)
# Submit agent task to run in background
task = await agent.background.submit_agent(
agent=agent,
prompt="Research AI trends in 2025",
name="research_task"
)
# Continue with other work while task runs...
await task.wait(timeout=60.0)
print(task.result)
asyncio.run(main())
Using Recipe Operations
from praisonai import recipe
# Submit recipe as background task
task = recipe.run_background(
"my-recipe",
input={"query": "What is AI?"},
config={"max_tokens": 1000},
session_id="session_123",
timeout_sec=300,
)
print(f"Task ID: {task.task_id}")
# Check status
status = await task.status()
# Wait for completion
result = await task.wait(timeout=600)
print(f"Result: {result}")
# Cancel if needed
await task.cancel()
Features
- Async Execution: Run tasks without blocking
- Concurrency Control: Limit concurrent tasks
- Progress Tracking: Monitor task status
- Timeout Support: Set execution time limits
- Cancellation: Cancel running tasks
Configuration
from praisonaiagents.background import BackgroundConfig
config = BackgroundConfig(
max_concurrent_tasks=5, # Max parallel tasks
default_timeout=300.0, # 5 minute default timeout
auto_cleanup=True # Auto-remove completed tasks
)
Task Status
from praisonaiagents.background import TaskStatus
# Check status
if task.status == TaskStatus.COMPLETED:
print(f"Result: {task.result}")
elif task.status == TaskStatus.FAILED:
print(f"Error: {task.error}")
elif task.status == TaskStatus.RUNNING:
print("Still running...")
CLI Usage
# Submit a recipe as background task
praisonai background submit --recipe my-recipe
# List tasks
praisonai background list
# Check status
praisonai background status <task_id>
# Cancel task
praisonai background cancel <task_id>
# Clear completed
praisonai background clear
Safe Defaults
| Setting | Default | Description |
|---|
timeout_sec | 300 | Maximum execution time (5 minutes) |
max_concurrent | 5 | Maximum concurrent tasks |
cleanup_delay_sec | 3600 | Time before completed tasks are cleaned up |
Low-level API Reference
BackgroundRunner Direct Usage
import asyncio
from praisonaiagents.background import BackgroundRunner, BackgroundConfig
async def main():
# Create runner with config
config = BackgroundConfig(max_concurrent_tasks=3)
runner = BackgroundRunner(config=config)
# Define a task
async def my_task(name: str) -> str:
await asyncio.sleep(2)
return f"Task {name} completed"
# Submit task
task = await runner.submit(my_task, args=("example",), name="my_task")
print(f"Submitted: {task.id[:8]}")
# Wait for completion
await task.wait(timeout=10.0)
print(f"Result: {task.result}")
asyncio.run(main())
Submitting Tasks
# Submit async function
task = await runner.submit(
func=my_async_function,
args=(arg1, arg2),
kwargs={"key": "value"},
name="descriptive_name",
timeout=60.0
)
# Submit sync function (runs in thread pool)
task = await runner.submit(
func=my_sync_function,
args=(arg1,),
name="sync_task"
)
Task Management
# List all tasks
for task in runner.tasks:
print(f"{task.name}: {task.status.value}")
# Get running tasks
running = runner.running_tasks
# Get pending tasks
pending = runner.pending_tasks
# Clear completed tasks
runner.clear_completed()
Synchronous Job Manager
For simpler use cases, use BackgroundJobManager for synchronous job management:
from praisonaiagents.background.job_manager import BackgroundJobManager, JobStatus
# Create manager with auto-background threshold
manager = BackgroundJobManager(auto_background_threshold=5.0)
# Start a job
job_id = manager.start_job(lambda: expensive_computation())
# Check status
status = manager.get_status(job_id)
if status == JobStatus.COMPLETED:
result = manager.get_result(job_id)
elif status == JobStatus.FAILED:
error = manager.get_error(job_id)
# List all jobs
for job_id, info in manager.list_jobs().items():
print(f"{job_id}: {info.status}")
# Cancel a running job
manager.cancel_job(job_id)
Architecture
The sync wrappers and ScheduleLoop bridge the gap between disconnected modules:
submit_sync() / submit_agent_sync() β let sync code submit background tasks without asyncio boilerplate
ScheduleLoop β polls for due jobs on a daemon thread and fires your callback
Sync Wrappers
For sync code (scripts, bot handlers, Agent.start() callbacks), use the sync-friendly methods that handle asyncio automatically:
submit_sync()
Submit any callable from synchronous code. A daemon event loop thread is created lazily on first call.
from praisonaiagents.background.runner import BackgroundRunner
import time
runner = BackgroundRunner()
def heavy_computation(data):
time.sleep(10)
return {"result": sum(data)}
# Non-blocking β returns immediately
task = runner.submit_sync(
func=heavy_computation,
args=([1, 2, 3, 4, 5],),
name="compute"
)
print(task.status) # "running"
# Check later
while not task.is_completed:
time.sleep(1)
print(f"Result: {task.result}") # {'result': 15}
submit_agent_sync()
Submit an Agent task from synchronous code. Resolves the agentβs callable (start β chat β run) automatically.
from praisonaiagents import Agent
from praisonaiagents.background.runner import BackgroundRunner
agent = Agent(name="researcher", instructions="Research AI trends")
runner = BackgroundRunner()
task = runner.submit_agent_sync(
agent=agent,
prompt="What are the top AI trends in 2026?",
name="research-task"
)
# task.result will contain the agent's response when done
| Parameter | Type | Required | Description |
|---|
func / agent | Callable / Agent | Yes | Function or Agent to execute |
args | tuple | No | Positional arguments (submit_sync only) |
prompt | str | Yes | Agent prompt (submit_agent_sync only) |
name | str | No | Human-readable task name |
timeout | float | No | Timeout in seconds |
on_complete | Callable | No | Callback when task completes |
ScheduleLoop
ScheduleLoop bridges scheduled jobs to actual execution. It runs a daemon thread that polls get_due_jobs() and fires your callback.
from praisonaiagents.scheduler import ScheduleLoop
def handle_job(job):
print(f"π Firing: {job.name} β {job.message}")
loop = ScheduleLoop(
on_trigger=handle_job,
tick_seconds=30, # check every 30 seconds
)
loop.start() # daemon thread β won't block
# loop.stop() # clean shutdown when needed
Combined Example: Scheduler + Background
from praisonaiagents import Agent
from praisonaiagents.tools import schedule_add, schedule_list, schedule_remove
from praisonaiagents.scheduler import ScheduleLoop
from praisonaiagents.background.runner import BackgroundRunner
agent = Agent(
name="assistant",
instructions="You can set reminders and schedules.",
tools=[schedule_add, schedule_list, schedule_remove],
)
runner = BackgroundRunner()
def on_schedule_fire(job):
task = runner.submit_agent_sync(agent, job.message, name=f"schedule-{job.name}")
print(f"β Background task {task.id} started for '{job.name}'")
loop = ScheduleLoop(on_trigger=on_schedule_fire, tick_seconds=30)
loop.start()
# Agent creates schedules that now actually fire!
agent.start("Remind me to check email every morning at 7am")
| Parameter | Type | Default | Description |
|---|
on_trigger | Callable | required | Called with each due ScheduleJob |
store | FileScheduleStore | Auto-created | Schedule store to poll |
tick_seconds | float | 30.0 | Poll interval in seconds |
| Method | Description |
|---|
loop.start() | Start polling (no-op if already running) |
loop.stop(timeout=5.0) | Signal stop and wait for thread exit |
loop.is_running | Whether the daemon thread is alive |
Error handling: If on_trigger() raises an exception, itβs logged but not propagated β the loop continues with remaining jobs and future ticks.
The background module uses lazy loading β no overhead when not used:
# Only loads when accessed
from praisonaiagents.background import BackgroundRunner
# Sync loop thread only created on first submit_sync() call
import praisonaiagents.background.runner as br
assert br._bg_loop is None # Not created until needed
See Also