Documentation Index
Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Queue System
The PraisonAI Queue System enables managing multiple agent runs with priority ordering, concurrency limits, and persistence.
Overview
The queue system provides:
- Priority-based FIFO - URGENT > HIGH > NORMAL > LOW ordering
- Concurrency limits - Global, per-agent, and per-workspace limits
- Cancel/retry - Full lifecycle management
- Persistence - SQLite-backed crash recovery
- Streaming - Token-by-token output with backpressure
Python Usage
Basic Queue Operations
import asyncio
from praisonai.cli.features.queue import (
QueueManager,
QueueConfig,
RunPriority,
)
async def main():
# Configure the queue
config = QueueConfig(
max_concurrent_global=4,
max_concurrent_per_agent=2,
max_queue_size=100,
enable_persistence=True,
)
# Create manager with callbacks
async def on_output(run_id: str, chunk: str):
print(chunk, end="", flush=True)
async def on_complete(run_id: str, run):
print(f"\nRun {run_id} completed: {run.state.value}")
manager = QueueManager(
config=config,
on_output=on_output,
on_complete=on_complete,
)
# Start the manager
await manager.start()
# Submit runs with different priorities
run1 = await manager.submit(
input_content="Analyze the codebase",
agent_name="Analyst",
priority=RunPriority.HIGH,
)
run2 = await manager.submit(
input_content="Write documentation",
agent_name="Writer",
priority=RunPriority.NORMAL,
)
# Wait for completion
await asyncio.sleep(30)
# Stop the manager
await manager.stop()
asyncio.run(main())
Queue Configuration
from praisonai.cli.features.queue import QueueConfig, RunPriority
config = QueueConfig(
# Concurrency limits
max_concurrent_global=4, # Max runs across all agents
max_concurrent_per_agent=2, # Max runs per agent type
max_concurrent_per_workspace=4, # Max runs per workspace
# Queue limits
max_queue_size=100, # Max queued runs
default_priority=RunPriority.NORMAL,
# Persistence
enable_persistence=True,
db_path=".praison/queue.db",
# Autosave
autosave_interval=30.0, # Seconds between autosaves
)
Run States
from praisonai.cli.features.queue import RunState
# Available states
RunState.QUEUED # Waiting in queue
RunState.RUNNING # Currently executing
RunState.PAUSED # Paused (can resume)
RunState.SUCCEEDED # Completed successfully
RunState.FAILED # Failed with error
RunState.CANCELLED # Cancelled by user
# Check state properties
state = RunState.RUNNING
state.is_terminal() # False - still active
state.is_active() # True - not finished
Priority Levels
from praisonai.cli.features.queue import RunPriority
# Priority ordering (highest to lowest)
RunPriority.URGENT # 3 - Process immediately
RunPriority.HIGH # 2 - High priority
RunPriority.NORMAL # 1 - Default priority
RunPriority.LOW # 0 - Background tasks
# Parse from string
priority = RunPriority.from_string("high") # RunPriority.HIGH
Cancel and Retry
# Cancel a running task
success = await manager.cancel(run_id)
# Retry a failed task
new_run_id = await manager.retry(run_id)
# Check if run can be retried
run = manager.get_run(run_id)
if run.can_retry():
await manager.retry(run_id)
Queue Statistics
# Get statistics
stats = manager.get_stats()
print(f"Queued: {stats.queued_count}")
print(f"Running: {stats.running_count}")
print(f"Succeeded: {stats.succeeded_count}")
print(f"Failed: {stats.failed_count}")
print(f"Total: {stats.total_runs}")
print(f"Avg wait: {stats.avg_wait_seconds:.1f}s")
print(f"Avg duration: {stats.avg_duration_seconds:.1f}s")
Data Model
QueuedRun
from praisonai.cli.features.queue import QueuedRun
run = QueuedRun(
run_id="abc123", # Unique identifier
agent_name="Assistant", # Agent to execute
input_content="Hello", # Input prompt
state=RunState.QUEUED, # Current state
priority=RunPriority.NORMAL,
# Attribution
session_id="session123",
trace_id="trace456",
workspace="/path/to/workspace",
# Retry configuration
retry_count=0,
max_retries=3,
# Custom configuration
config={"model": "gpt-4"},
)
# Properties
run.duration_seconds # Time from start to end
run.wait_seconds # Time from created to started
run.can_retry() # Check if retryable
# Serialization
data = run.to_dict()
run2 = QueuedRun.from_dict(data)
Persistence
The queue system uses SQLite for persistence:
.praison/
├── queue.db # Queue state, runs, messages
└── sessions/
└── {session_id}/
└── state.json # Session state
Crash Recovery
# Start with recovery enabled
await manager.start(recover=True)
# This will:
# 1. Load pending runs from database
# 2. Mark interrupted runs as failed
# 3. Re-queue recoverable runs
Manual Persistence Operations
from praisonai.cli.features.queue import QueuePersistence
persistence = QueuePersistence(db_path=".praison/queue.db")
persistence.initialize()
# Save/load runs
persistence.save_run(run)
loaded = persistence.load_run(run_id)
# List runs
runs = persistence.list_runs(state=RunState.QUEUED, limit=10)
# Cleanup old runs
deleted = persistence.cleanup_old_runs(days=30)
persistence.close()
CLI Usage
See TUI Commands for complete CLI reference.
# List queue
praisonai queue ls
# Cancel a run
praisonai queue cancel abc123
# Retry a failed run
praisonai queue retry abc123
# Show statistics
praisonai queue stats
# Clear queue
praisonai queue clear --force