Skip to main content

Background Tasks

Execute agent tasks and recipes asynchronously without blocking the main thread. Monitor progress, cancel running tasks, and manage concurrent execution.

Quick Start

Agent-Centric Usage

import asyncio
from praisonaiagents import Agent
from praisonaiagents.background import BackgroundRunner, BackgroundConfig

async def main():
    # Create background runner
    runner = BackgroundRunner(config=BackgroundConfig(max_concurrent_tasks=3))
    
    # Agent with background task support
    agent = Agent(
        name="AsyncAssistant",
        instructions="You are a research assistant.",
        background=runner
    )
    
    # Submit agent task to run in background
    task = await agent.background.submit_agent(
        agent=agent,
        prompt="Research AI trends in 2025",
        name="research_task"
    )
    
    # Continue with other work while task runs...
    await task.wait(timeout=60.0)
    print(task.result)

asyncio.run(main())

Using Recipe Operations

from praisonai import recipe

# Submit recipe as background task
task = recipe.run_background(
    "my-recipe",
    input={"query": "What is AI?"},
    config={"max_tokens": 1000},
    session_id="session_123",
    timeout_sec=300,
)

print(f"Task ID: {task.task_id}")

# Check status
status = await task.status()

# Wait for completion
result = await task.wait(timeout=600)
print(f"Result: {result}")

# Cancel if needed
await task.cancel()

Features

  • Async Execution: Run tasks without blocking
  • Concurrency Control: Limit concurrent tasks
  • Progress Tracking: Monitor task status
  • Timeout Support: Set execution time limits
  • Cancellation: Cancel running tasks

Configuration

from praisonaiagents.background import BackgroundConfig

config = BackgroundConfig(
    max_concurrent_tasks=5,    # Max parallel tasks
    default_timeout=300.0,     # 5 minute default timeout
    auto_cleanup=True          # Auto-remove completed tasks
)

Task Status

from praisonaiagents.background import TaskStatus

# Check status
if task.status == TaskStatus.COMPLETED:
    print(f"Result: {task.result}")
elif task.status == TaskStatus.FAILED:
    print(f"Error: {task.error}")
elif task.status == TaskStatus.RUNNING:
    print("Still running...")

CLI Usage

# Submit a recipe as background task
praisonai background submit --recipe my-recipe

# List tasks
praisonai background list

# Check status
praisonai background status <task_id>

# Cancel task
praisonai background cancel <task_id>

# Clear completed
praisonai background clear

Safe Defaults

SettingDefaultDescription
timeout_sec300Maximum execution time (5 minutes)
max_concurrent5Maximum concurrent tasks
cleanup_delay_sec3600Time before completed tasks are cleaned up

Low-level API Reference

BackgroundRunner Direct Usage

import asyncio
from praisonaiagents.background import BackgroundRunner, BackgroundConfig

async def main():
    # Create runner with config
    config = BackgroundConfig(max_concurrent_tasks=3)
    runner = BackgroundRunner(config=config)
    
    # Define a task
    async def my_task(name: str) -> str:
        await asyncio.sleep(2)
        return f"Task {name} completed"
    
    # Submit task
    task = await runner.submit(my_task, args=("example",), name="my_task")
    print(f"Submitted: {task.id[:8]}")
    
    # Wait for completion
    await task.wait(timeout=10.0)
    print(f"Result: {task.result}")

asyncio.run(main())

Submitting Tasks

# Submit async function
task = await runner.submit(
    func=my_async_function,
    args=(arg1, arg2),
    kwargs={"key": "value"},
    name="descriptive_name",
    timeout=60.0
)

# Submit sync function (runs in thread pool)
task = await runner.submit(
    func=my_sync_function,
    args=(arg1,),
    name="sync_task"
)

Task Management

# List all tasks
for task in runner.tasks:
    print(f"{task.name}: {task.status.value}")

# Get running tasks
running = runner.running_tasks

# Get pending tasks
pending = runner.pending_tasks

# Clear completed tasks
runner.clear_completed()

Synchronous Job Manager

For simpler use cases, use BackgroundJobManager for synchronous job management:
from praisonaiagents.background.job_manager import BackgroundJobManager, JobStatus

# Create manager with auto-background threshold
manager = BackgroundJobManager(auto_background_threshold=5.0)

# Start a job
job_id = manager.start_job(lambda: expensive_computation())

# Check status
status = manager.get_status(job_id)
if status == JobStatus.COMPLETED:
    result = manager.get_result(job_id)
elif status == JobStatus.FAILED:
    error = manager.get_error(job_id)

# List all jobs
for job_id, info in manager.list_jobs().items():
    print(f"{job_id}: {info.status}")

# Cancel a running job
manager.cancel_job(job_id)

Architecture

The sync wrappers and ScheduleLoop bridge the gap between disconnected modules:
  • submit_sync() / submit_agent_sync() — let sync code submit background tasks without asyncio boilerplate
  • ScheduleLoop — polls for due jobs on a daemon thread and fires your callback

Sync Wrappers

For sync code (scripts, bot handlers, Agent.start() callbacks), use the sync-friendly methods that handle asyncio automatically:

submit_sync()

Submit any callable from synchronous code. A daemon event loop thread is created lazily on first call.
from praisonaiagents.background.runner import BackgroundRunner
import time

runner = BackgroundRunner()

def heavy_computation(data):
    time.sleep(10)
    return {"result": sum(data)}

# Non-blocking — returns immediately
task = runner.submit_sync(
    func=heavy_computation,
    args=([1, 2, 3, 4, 5],),
    name="compute"
)

print(task.status)  # "running"

# Check later
while not task.is_completed:
    time.sleep(1)
print(f"Result: {task.result}")  # {'result': 15}

submit_agent_sync()

Submit an Agent task from synchronous code. Resolves the agent’s callable (startchatrun) automatically.
from praisonaiagents import Agent
from praisonaiagents.background.runner import BackgroundRunner

agent = Agent(name="researcher", instructions="Research AI trends")
runner = BackgroundRunner()

task = runner.submit_agent_sync(
    agent=agent,
    prompt="What are the top AI trends in 2026?",
    name="research-task"
)
# task.result will contain the agent's response when done
ParameterTypeRequiredDescription
func / agentCallable / AgentYesFunction or Agent to execute
argstupleNoPositional arguments (submit_sync only)
promptstrYesAgent prompt (submit_agent_sync only)
namestrNoHuman-readable task name
timeoutfloatNoTimeout in seconds
on_completeCallableNoCallback when task completes

ScheduleLoop

ScheduleLoop bridges scheduled jobs to actual execution. It runs a daemon thread that polls get_due_jobs() and fires your callback.
from praisonaiagents.scheduler import ScheduleLoop

def handle_job(job):
    print(f"🔔 Firing: {job.name}{job.message}")

loop = ScheduleLoop(
    on_trigger=handle_job,
    tick_seconds=30,  # check every 30 seconds
)
loop.start()   # daemon thread — won't block
# loop.stop()  # clean shutdown when needed

Combined Example: Scheduler + Background

from praisonaiagents import Agent
from praisonaiagents.tools import schedule_add, schedule_list, schedule_remove
from praisonaiagents.scheduler import ScheduleLoop
from praisonaiagents.background.runner import BackgroundRunner

agent = Agent(
    name="assistant",
    instructions="You can set reminders and schedules.",
    tools=[schedule_add, schedule_list, schedule_remove],
)

runner = BackgroundRunner()

def on_schedule_fire(job):
    task = runner.submit_agent_sync(agent, job.message, name=f"schedule-{job.name}")
    print(f"→ Background task {task.id} started for '{job.name}'")

loop = ScheduleLoop(on_trigger=on_schedule_fire, tick_seconds=30)
loop.start()

# Agent creates schedules that now actually fire!
agent.start("Remind me to check email every morning at 7am")
ParameterTypeDefaultDescription
on_triggerCallablerequiredCalled with each due ScheduleJob
storeFileScheduleStoreAuto-createdSchedule store to poll
tick_secondsfloat30.0Poll interval in seconds
MethodDescription
loop.start()Start polling (no-op if already running)
loop.stop(timeout=5.0)Signal stop and wait for thread exit
loop.is_runningWhether the daemon thread is alive
Error handling: If on_trigger() raises an exception, it’s logged but not propagated — the loop continues with remaining jobs and future ticks.

Zero Performance Impact

The background module uses lazy loading — no overhead when not used:
# Only loads when accessed
from praisonaiagents.background import BackgroundRunner

# Sync loop thread only created on first submit_sync() call
import praisonaiagents.background.runner as br
assert br._bg_loop is None  # Not created until needed

See Also