Skip to main content

Queue System

The PraisonAI Queue System enables managing multiple agent runs with priority ordering, concurrency limits, and persistence.

Overview

The queue system provides:
  • Priority-based FIFO - URGENT > HIGH > NORMAL > LOW ordering
  • Concurrency limits - Global, per-agent, and per-workspace limits
  • Cancel/retry - Full lifecycle management
  • Persistence - SQLite-backed crash recovery
  • Streaming - Token-by-token output with backpressure

Python Usage

Basic Queue Operations

import asyncio
from praisonai.cli.features.queue import (
    QueueManager,
    QueueConfig,
    RunPriority,
)

async def main():
    # Configure the queue
    config = QueueConfig(
        max_concurrent_global=4,
        max_concurrent_per_agent=2,
        max_queue_size=100,
        enable_persistence=True,
    )
    
    # Create manager with callbacks
    async def on_output(run_id: str, chunk: str):
        print(chunk, end="", flush=True)
    
    async def on_complete(run_id: str, run):
        print(f"\nRun {run_id} completed: {run.state.value}")
    
    manager = QueueManager(
        config=config,
        on_output=on_output,
        on_complete=on_complete,
    )
    
    # Start the manager
    await manager.start()
    
    # Submit runs with different priorities
    run1 = await manager.submit(
        input_content="Analyze the codebase",
        agent_name="Analyst",
        priority=RunPriority.HIGH,
    )
    
    run2 = await manager.submit(
        input_content="Write documentation",
        agent_name="Writer",
        priority=RunPriority.NORMAL,
    )
    
    # Wait for completion
    await asyncio.sleep(30)
    
    # Stop the manager
    await manager.stop()

asyncio.run(main())

Queue Configuration

from praisonai.cli.features.queue import QueueConfig, RunPriority

config = QueueConfig(
    # Concurrency limits
    max_concurrent_global=4,      # Max runs across all agents
    max_concurrent_per_agent=2,   # Max runs per agent type
    max_concurrent_per_workspace=4,  # Max runs per workspace
    
    # Queue limits
    max_queue_size=100,           # Max queued runs
    default_priority=RunPriority.NORMAL,
    
    # Persistence
    enable_persistence=True,
    db_path=".praison/queue.db",
    
    # Autosave
    autosave_interval=30.0,       # Seconds between autosaves
)

Run States

from praisonai.cli.features.queue import RunState

# Available states
RunState.QUEUED      # Waiting in queue
RunState.RUNNING     # Currently executing
RunState.PAUSED      # Paused (can resume)
RunState.SUCCEEDED   # Completed successfully
RunState.FAILED      # Failed with error
RunState.CANCELLED   # Cancelled by user

# Check state properties
state = RunState.RUNNING
state.is_terminal()  # False - still active
state.is_active()    # True - not finished

Priority Levels

from praisonai.cli.features.queue import RunPriority

# Priority ordering (highest to lowest)
RunPriority.URGENT   # 3 - Process immediately
RunPriority.HIGH     # 2 - High priority
RunPriority.NORMAL   # 1 - Default priority
RunPriority.LOW      # 0 - Background tasks

# Parse from string
priority = RunPriority.from_string("high")  # RunPriority.HIGH

Cancel and Retry

# Cancel a running task
success = await manager.cancel(run_id)

# Retry a failed task
new_run_id = await manager.retry(run_id)

# Check if run can be retried
run = manager.get_run(run_id)
if run.can_retry():
    await manager.retry(run_id)

Queue Statistics

# Get statistics
stats = manager.get_stats()

print(f"Queued: {stats.queued_count}")
print(f"Running: {stats.running_count}")
print(f"Succeeded: {stats.succeeded_count}")
print(f"Failed: {stats.failed_count}")
print(f"Total: {stats.total_runs}")
print(f"Avg wait: {stats.avg_wait_seconds:.1f}s")
print(f"Avg duration: {stats.avg_duration_seconds:.1f}s")

Data Model

QueuedRun

from praisonai.cli.features.queue import QueuedRun

run = QueuedRun(
    run_id="abc123",           # Unique identifier
    agent_name="Assistant",    # Agent to execute
    input_content="Hello",     # Input prompt
    state=RunState.QUEUED,     # Current state
    priority=RunPriority.NORMAL,
    
    # Attribution
    session_id="session123",
    trace_id="trace456",
    workspace="/path/to/workspace",
    
    # Retry configuration
    retry_count=0,
    max_retries=3,
    
    # Custom configuration
    config={"model": "gpt-4"},
)

# Properties
run.duration_seconds  # Time from start to end
run.wait_seconds      # Time from created to started
run.can_retry()       # Check if retryable

# Serialization
data = run.to_dict()
run2 = QueuedRun.from_dict(data)

Persistence

The queue system uses SQLite for persistence:
.praison/
├── queue.db           # Queue state, runs, messages
└── sessions/
    └── {session_id}/
        └── state.json # Session state

Crash Recovery

# Start with recovery enabled
await manager.start(recover=True)

# This will:
# 1. Load pending runs from database
# 2. Mark interrupted runs as failed
# 3. Re-queue recoverable runs

Manual Persistence Operations

from praisonai.cli.features.queue import QueuePersistence

persistence = QueuePersistence(db_path=".praison/queue.db")
persistence.initialize()

# Save/load runs
persistence.save_run(run)
loaded = persistence.load_run(run_id)

# List runs
runs = persistence.list_runs(state=RunState.QUEUED, limit=10)

# Cleanup old runs
deleted = persistence.cleanup_old_runs(days=30)

persistence.close()

CLI Usage

See TUI Commands for complete CLI reference.
# List queue
praisonai queue ls

# Cancel a run
praisonai queue cancel abc123

# Retry a failed run
praisonai queue retry abc123

# Show statistics
praisonai queue stats

# Clear queue
praisonai queue clear --force