Queue System
The PraisonAI Queue System enables managing multiple agent runs with priority ordering, concurrency limits, and persistence.Overview
The queue system provides:- Priority-based FIFO - URGENT > HIGH > NORMAL > LOW ordering
- Concurrency limits - Global, per-agent, and per-workspace limits
- Cancel/retry - Full lifecycle management
- Persistence - SQLite-backed crash recovery
- Streaming - Token-by-token output with backpressure
Python Usage
Basic Queue Operations
Copy
import asyncio
from praisonai.cli.features.queue import (
QueueManager,
QueueConfig,
RunPriority,
)
async def main():
# Configure the queue
config = QueueConfig(
max_concurrent_global=4,
max_concurrent_per_agent=2,
max_queue_size=100,
enable_persistence=True,
)
# Create manager with callbacks
async def on_output(run_id: str, chunk: str):
print(chunk, end="", flush=True)
async def on_complete(run_id: str, run):
print(f"\nRun {run_id} completed: {run.state.value}")
manager = QueueManager(
config=config,
on_output=on_output,
on_complete=on_complete,
)
# Start the manager
await manager.start()
# Submit runs with different priorities
run1 = await manager.submit(
input_content="Analyze the codebase",
agent_name="Analyst",
priority=RunPriority.HIGH,
)
run2 = await manager.submit(
input_content="Write documentation",
agent_name="Writer",
priority=RunPriority.NORMAL,
)
# Wait for completion
await asyncio.sleep(30)
# Stop the manager
await manager.stop()
asyncio.run(main())
Queue Configuration
Copy
from praisonai.cli.features.queue import QueueConfig, RunPriority
config = QueueConfig(
# Concurrency limits
max_concurrent_global=4, # Max runs across all agents
max_concurrent_per_agent=2, # Max runs per agent type
max_concurrent_per_workspace=4, # Max runs per workspace
# Queue limits
max_queue_size=100, # Max queued runs
default_priority=RunPriority.NORMAL,
# Persistence
enable_persistence=True,
db_path=".praison/queue.db",
# Autosave
autosave_interval=30.0, # Seconds between autosaves
)
Run States
Copy
from praisonai.cli.features.queue import RunState
# Available states
RunState.QUEUED # Waiting in queue
RunState.RUNNING # Currently executing
RunState.PAUSED # Paused (can resume)
RunState.SUCCEEDED # Completed successfully
RunState.FAILED # Failed with error
RunState.CANCELLED # Cancelled by user
# Check state properties
state = RunState.RUNNING
state.is_terminal() # False - still active
state.is_active() # True - not finished
Priority Levels
Copy
from praisonai.cli.features.queue import RunPriority
# Priority ordering (highest to lowest)
RunPriority.URGENT # 3 - Process immediately
RunPriority.HIGH # 2 - High priority
RunPriority.NORMAL # 1 - Default priority
RunPriority.LOW # 0 - Background tasks
# Parse from string
priority = RunPriority.from_string("high") # RunPriority.HIGH
Cancel and Retry
Copy
# Cancel a running task
success = await manager.cancel(run_id)
# Retry a failed task
new_run_id = await manager.retry(run_id)
# Check if run can be retried
run = manager.get_run(run_id)
if run.can_retry():
await manager.retry(run_id)
Queue Statistics
Copy
# Get statistics
stats = manager.get_stats()
print(f"Queued: {stats.queued_count}")
print(f"Running: {stats.running_count}")
print(f"Succeeded: {stats.succeeded_count}")
print(f"Failed: {stats.failed_count}")
print(f"Total: {stats.total_runs}")
print(f"Avg wait: {stats.avg_wait_seconds:.1f}s")
print(f"Avg duration: {stats.avg_duration_seconds:.1f}s")
Data Model
QueuedRun
Copy
from praisonai.cli.features.queue import QueuedRun
run = QueuedRun(
run_id="abc123", # Unique identifier
agent_name="Assistant", # Agent to execute
input_content="Hello", # Input prompt
state=RunState.QUEUED, # Current state
priority=RunPriority.NORMAL,
# Attribution
session_id="session123",
trace_id="trace456",
workspace="/path/to/workspace",
# Retry configuration
retry_count=0,
max_retries=3,
# Custom configuration
config={"model": "gpt-4"},
)
# Properties
run.duration_seconds # Time from start to end
run.wait_seconds # Time from created to started
run.can_retry() # Check if retryable
# Serialization
data = run.to_dict()
run2 = QueuedRun.from_dict(data)
Persistence
The queue system uses SQLite for persistence:Copy
.praison/
├── queue.db # Queue state, runs, messages
└── sessions/
└── {session_id}/
└── state.json # Session state
Crash Recovery
Copy
# Start with recovery enabled
await manager.start(recover=True)
# This will:
# 1. Load pending runs from database
# 2. Mark interrupted runs as failed
# 3. Re-queue recoverable runs
Manual Persistence Operations
Copy
from praisonai.cli.features.queue import QueuePersistence
persistence = QueuePersistence(db_path=".praison/queue.db")
persistence.initialize()
# Save/load runs
persistence.save_run(run)
loaded = persistence.load_run(run_id)
# List runs
runs = persistence.list_runs(state=RunState.QUEUED, limit=10)
# Cleanup old runs
deleted = persistence.cleanup_old_runs(days=30)
persistence.close()
CLI Usage
See TUI Commands for complete CLI reference.Copy
# List queue
praisonai queue ls
# Cancel a run
praisonai queue cancel abc123
# Retry a failed run
praisonai queue retry abc123
# Show statistics
praisonai queue stats
# Clear queue
praisonai queue clear --force

