Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Queue System

The PraisonAI Queue System enables managing multiple agent runs with priority ordering, concurrency limits, and persistence.

Overview

The queue system provides:
  • Priority-based FIFO - URGENT > HIGH > NORMAL > LOW ordering
  • Concurrency limits - Global, per-agent, and per-workspace limits
  • Cancel/retry - Full lifecycle management
  • Persistence - SQLite-backed crash recovery
  • Streaming - Token-by-token output with backpressure

Python Usage

Basic Queue Operations

import asyncio
from praisonai.cli.features.queue import (
    QueueManager,
    QueueConfig,
    RunPriority,
)

async def main():
    # Configure the queue
    config = QueueConfig(
        max_concurrent_global=4,
        max_concurrent_per_agent=2,
        max_queue_size=100,
        enable_persistence=True,
    )
    
    # Create manager with callbacks
    async def on_output(run_id: str, chunk: str):
        print(chunk, end="", flush=True)
    
    async def on_complete(run_id: str, run):
        print(f"\nRun {run_id} completed: {run.state.value}")
    
    manager = QueueManager(
        config=config,
        on_output=on_output,
        on_complete=on_complete,
    )
    
    # Start the manager
    await manager.start()
    
    # Submit runs with different priorities
    run1 = await manager.submit(
        input_content="Analyze the codebase",
        agent_name="Analyst",
        priority=RunPriority.HIGH,
    )
    
    run2 = await manager.submit(
        input_content="Write documentation",
        agent_name="Writer",
        priority=RunPriority.NORMAL,
    )
    
    # Wait for completion
    await asyncio.sleep(30)
    
    # Stop the manager
    await manager.stop()

asyncio.run(main())

Queue Configuration

from praisonai.cli.features.queue import QueueConfig, RunPriority

config = QueueConfig(
    # Concurrency limits
    max_concurrent_global=4,      # Max runs across all agents
    max_concurrent_per_agent=2,   # Max runs per agent type
    max_concurrent_per_workspace=4,  # Max runs per workspace
    
    # Queue limits
    max_queue_size=100,           # Max queued runs
    default_priority=RunPriority.NORMAL,
    
    # Persistence
    enable_persistence=True,
    db_path=".praison/queue.db",
    
    # Autosave
    autosave_interval=30.0,       # Seconds between autosaves
)

Run States

from praisonai.cli.features.queue import RunState

# Available states
RunState.QUEUED      # Waiting in queue
RunState.RUNNING     # Currently executing
RunState.PAUSED      # Paused (can resume)
RunState.SUCCEEDED   # Completed successfully
RunState.FAILED      # Failed with error
RunState.CANCELLED   # Cancelled by user

# Check state properties
state = RunState.RUNNING
state.is_terminal()  # False - still active
state.is_active()    # True - not finished

Priority Levels

from praisonai.cli.features.queue import RunPriority

# Priority ordering (highest to lowest)
RunPriority.URGENT   # 3 - Process immediately
RunPriority.HIGH     # 2 - High priority
RunPriority.NORMAL   # 1 - Default priority
RunPriority.LOW      # 0 - Background tasks

# Parse from string
priority = RunPriority.from_string("high")  # RunPriority.HIGH

Cancel and Retry

# Cancel a running task
success = await manager.cancel(run_id)

# Retry a failed task
new_run_id = await manager.retry(run_id)

# Check if run can be retried
run = manager.get_run(run_id)
if run.can_retry():
    await manager.retry(run_id)

Queue Statistics

# Get statistics
stats = manager.get_stats()

print(f"Queued: {stats.queued_count}")
print(f"Running: {stats.running_count}")
print(f"Succeeded: {stats.succeeded_count}")
print(f"Failed: {stats.failed_count}")
print(f"Total: {stats.total_runs}")
print(f"Avg wait: {stats.avg_wait_seconds:.1f}s")
print(f"Avg duration: {stats.avg_duration_seconds:.1f}s")

Data Model

QueuedRun

from praisonai.cli.features.queue import QueuedRun

run = QueuedRun(
    run_id="abc123",           # Unique identifier
    agent_name="Assistant",    # Agent to execute
    input_content="Hello",     # Input prompt
    state=RunState.QUEUED,     # Current state
    priority=RunPriority.NORMAL,
    
    # Attribution
    session_id="session123",
    trace_id="trace456",
    workspace="/path/to/workspace",
    
    # Retry configuration
    retry_count=0,
    max_retries=3,
    
    # Custom configuration
    config={"model": "gpt-4"},
)

# Properties
run.duration_seconds  # Time from start to end
run.wait_seconds      # Time from created to started
run.can_retry()       # Check if retryable

# Serialization
data = run.to_dict()
run2 = QueuedRun.from_dict(data)

Persistence

The queue system uses SQLite for persistence:
.praison/
├── queue.db           # Queue state, runs, messages
└── sessions/
    └── {session_id}/
        └── state.json # Session state

Crash Recovery

# Start with recovery enabled
await manager.start(recover=True)

# This will:
# 1. Load pending runs from database
# 2. Mark interrupted runs as failed
# 3. Re-queue recoverable runs

Manual Persistence Operations

from praisonai.cli.features.queue import QueuePersistence

persistence = QueuePersistence(db_path=".praison/queue.db")
persistence.initialize()

# Save/load runs
persistence.save_run(run)
loaded = persistence.load_run(run_id)

# List runs
runs = persistence.list_runs(state=RunState.QUEUED, limit=10)

# Cleanup old runs
deleted = persistence.cleanup_old_runs(days=30)

persistence.close()

CLI Usage

See TUI Commands for complete CLI reference.
# List queue
praisonai queue ls

# Cancel a run
praisonai queue cancel abc123

# Retry a failed run
praisonai queue retry abc123

# Show statistics
praisonai queue stats

# Clear queue
praisonai queue clear --force