Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Background Tasks

Execute agent tasks and recipes asynchronously without blocking the main thread. Monitor progress, cancel running tasks, and manage concurrent execution.

Quick Start

Agent-Centric Usage

import asyncio
from praisonaiagents import Agent
from praisonaiagents.background import BackgroundRunner, BackgroundConfig

async def main():
    # Create background runner
    runner = BackgroundRunner(config=BackgroundConfig(max_concurrent_tasks=3))
    
    # Agent with background task support
    agent = Agent(
        name="AsyncAssistant",
        instructions="You are a research assistant.",
        background=runner
    )
    
    # Submit agent task to run in background
    task = await agent.background.submit_agent(
        agent=agent,
        prompt="Research AI trends in 2025",
        name="research_task"
    )
    
    # Continue with other work while task runs...
    await task.wait(timeout=60.0)
    print(task.result)

asyncio.run(main())

Using Recipe Operations

from praisonai import recipe

# Submit recipe as background task
task = recipe.run_background(
    "my-recipe",
    input={"query": "What is AI?"},
    config={"max_tokens": 1000},
    session_id="session_123",
    timeout_sec=300,
)

print(f"Task ID: {task.task_id}")

# Check status
status = await task.status()

# Wait for completion
result = await task.wait(timeout=600)
print(f"Result: {result}")

# Cancel if needed
await task.cancel()

Features

  • Async Execution: Run tasks without blocking
  • Concurrency Control: Limit concurrent tasks
  • Progress Tracking: Monitor task status
  • Timeout Support: Set execution time limits
  • Cancellation: Cancel running tasks

Configuration

from praisonaiagents.background import BackgroundConfig

config = BackgroundConfig(
    max_concurrent_tasks=5,    # Max parallel tasks
    default_timeout=300.0,     # 5 minute default timeout
    auto_cleanup=True          # Auto-remove completed tasks
)

Task Status

from praisonaiagents.background import TaskStatus

# Check status
if task.status == TaskStatus.COMPLETED:
    print(f"Result: {task.result}")
elif task.status == TaskStatus.FAILED:
    print(f"Error: {task.error}")
elif task.status == TaskStatus.RUNNING:
    print("Still running...")

CLI Usage

# Submit a recipe as background task
praisonai background submit --recipe my-recipe

# List tasks
praisonai background list

# Check status
praisonai background status <task_id>

# Cancel task
praisonai background cancel <task_id>

# Clear completed
praisonai background clear

Safe Defaults

SettingDefaultDescription
timeout_sec300Maximum execution time (5 minutes)
max_concurrent5Maximum concurrent tasks
cleanup_delay_sec3600Time before completed tasks are cleaned up

Low-level API Reference

BackgroundRunner Direct Usage

import asyncio
from praisonaiagents.background import BackgroundRunner, BackgroundConfig

async def main():
    # Create runner with config
    config = BackgroundConfig(max_concurrent_tasks=3)
    runner = BackgroundRunner(config=config)
    
    # Define a task
    async def my_task(name: str) -> str:
        await asyncio.sleep(2)
        return f"Task {name} completed"
    
    # Submit task
    task = await runner.submit(my_task, args=("example",), name="my_task")
    print(f"Submitted: {task.id[:8]}")
    
    # Wait for completion
    await task.wait(timeout=10.0)
    print(f"Result: {task.result}")

asyncio.run(main())

Submitting Tasks

# Submit async function
task = await runner.submit(
    func=my_async_function,
    args=(arg1, arg2),
    kwargs={"key": "value"},
    name="descriptive_name",
    timeout=60.0
)

# Submit sync function (runs in thread pool)
task = await runner.submit(
    func=my_sync_function,
    args=(arg1,),
    name="sync_task"
)

Task Management

# List all tasks
for task in runner.tasks:
    print(f"{task.name}: {task.status.value}")

# Get running tasks
running = runner.running_tasks

# Get pending tasks
pending = runner.pending_tasks

# Clear completed tasks
runner.clear_completed()

Synchronous Job Manager

For simpler use cases, use BackgroundJobManager for synchronous job management:
from praisonaiagents.background.job_manager import BackgroundJobManager, JobStatus

# Create manager with auto-background threshold
manager = BackgroundJobManager(auto_background_threshold=5.0)

# Start a job
job_id = manager.start_job(lambda: expensive_computation())

# Check status
status = manager.get_status(job_id)
if status == JobStatus.COMPLETED:
    result = manager.get_result(job_id)
elif status == JobStatus.FAILED:
    error = manager.get_error(job_id)

# List all jobs
for job_id, info in manager.list_jobs().items():
    print(f"{job_id}: {info.status}")

# Cancel a running job
manager.cancel_job(job_id)

Architecture

The sync wrappers and ScheduleLoop bridge the gap between disconnected modules:
  • submit_sync() / submit_agent_sync() β€” let sync code submit background tasks without asyncio boilerplate
  • ScheduleLoop β€” polls for due jobs on a daemon thread and fires your callback

Sync Wrappers

For sync code (scripts, bot handlers, Agent.start() callbacks), use the sync-friendly methods that handle asyncio automatically:

submit_sync()

Submit any callable from synchronous code. A daemon event loop thread is created lazily on first call.
from praisonaiagents.background.runner import BackgroundRunner
import time

runner = BackgroundRunner()

def heavy_computation(data):
    time.sleep(10)
    return {"result": sum(data)}

# Non-blocking β€” returns immediately
task = runner.submit_sync(
    func=heavy_computation,
    args=([1, 2, 3, 4, 5],),
    name="compute"
)

print(task.status)  # "running"

# Check later
while not task.is_completed:
    time.sleep(1)
print(f"Result: {task.result}")  # {'result': 15}

submit_agent_sync()

Submit an Agent task from synchronous code. Resolves the agent’s callable (start β†’ chat β†’ run) automatically.
from praisonaiagents import Agent
from praisonaiagents.background.runner import BackgroundRunner

agent = Agent(name="researcher", instructions="Research AI trends")
runner = BackgroundRunner()

task = runner.submit_agent_sync(
    agent=agent,
    prompt="What are the top AI trends in 2026?",
    name="research-task"
)
# task.result will contain the agent's response when done
ParameterTypeRequiredDescription
func / agentCallable / AgentYesFunction or Agent to execute
argstupleNoPositional arguments (submit_sync only)
promptstrYesAgent prompt (submit_agent_sync only)
namestrNoHuman-readable task name
timeoutfloatNoTimeout in seconds
on_completeCallableNoCallback when task completes

ScheduleLoop

ScheduleLoop bridges scheduled jobs to actual execution. It runs a daemon thread that polls get_due_jobs() and fires your callback.
from praisonaiagents.scheduler import ScheduleLoop

def handle_job(job):
    print(f"πŸ”” Firing: {job.name} β€” {job.message}")

loop = ScheduleLoop(
    on_trigger=handle_job,
    tick_seconds=30,  # check every 30 seconds
)
loop.start()   # daemon thread β€” won't block
# loop.stop()  # clean shutdown when needed

Combined Example: Scheduler + Background

from praisonaiagents import Agent
from praisonaiagents.tools import schedule_add, schedule_list, schedule_remove
from praisonaiagents.scheduler import ScheduleLoop
from praisonaiagents.background.runner import BackgroundRunner

agent = Agent(
    name="assistant",
    instructions="You can set reminders and schedules.",
    tools=[schedule_add, schedule_list, schedule_remove],
)

runner = BackgroundRunner()

def on_schedule_fire(job):
    task = runner.submit_agent_sync(agent, job.message, name=f"schedule-{job.name}")
    print(f"β†’ Background task {task.id} started for '{job.name}'")

loop = ScheduleLoop(on_trigger=on_schedule_fire, tick_seconds=30)
loop.start()

# Agent creates schedules that now actually fire!
agent.start("Remind me to check email every morning at 7am")
ParameterTypeDefaultDescription
on_triggerCallablerequiredCalled with each due ScheduleJob
storeFileScheduleStoreAuto-createdSchedule store to poll
tick_secondsfloat30.0Poll interval in seconds
MethodDescription
loop.start()Start polling (no-op if already running)
loop.stop(timeout=5.0)Signal stop and wait for thread exit
loop.is_runningWhether the daemon thread is alive
Error handling: If on_trigger() raises an exception, it’s logged but not propagated β€” the loop continues with remaining jobs and future ticks.

Zero Performance Impact

The background module uses lazy loading β€” no overhead when not used:
# Only loads when accessed
from praisonaiagents.background import BackgroundRunner

# Sync loop thread only created on first submit_sync() call
import praisonaiagents.background.runner as br
assert br._bg_loop is None  # Not created until needed

See Also