Skip to main content
Concurrency controls let you limit parallel agent execution and set timeouts for tool calls to prevent resource exhaustion.

Quick Start

1

Limit parallel runs of an agent

Control how many instances of the same agent can run concurrently:
from praisonaiagents import Agent
from praisonaiagents.agent.concurrency import ConcurrencyRegistry

registry = ConcurrencyRegistry()
registry.set_limit("researcher", 2)  # at most 2 concurrent runs

agent = Agent(name="researcher", instructions="Research topics")

# Sync context
registry.acquire_sync("researcher")
try:
    agent.start("Research Mars exploration")
finally:
    registry.release("researcher")
2

Same, async

Use async context for better resource utilization:
await registry.acquire("researcher")
try:
    await agent.astart("Research Mars exploration")
finally:
    registry.release("researcher")
3

Bound tool time with tool_timeout

Prevent slow tools from blocking agent execution:
from praisonaiagents import Agent

agent = Agent(
    name="Assistant",
    instructions="Use tools to help users",
    tools=["get_weather"],
    tool_timeout=30,  # seconds; slow tools return a timeout dict
)
agent.start("What's the weather in Tokyo?")

How It Works

ComponentPurposeThread Safety
ConcurrencyRegistryLimits parallel agent runs✅ Thread-safe
Tool ExecutorRuns tools with timeout✅ Per-agent pool
Plugin APIsEnable/disable plugins✅ Lock-protected

Sync vs Async Rule

The concurrency registry enforces strict separation between sync and async contexts:
ContextMethodWhat happens if you mix
Syncregistry.acquire_sync(name)✅ Works correctly
Asyncawait registry.acquire(name)✅ Works correctly
Mixedacquire_sync() in async❌ Raises RuntimeError
Calling acquire_sync() from an async context raises RuntimeError("acquire_sync('<agent_name>') cannot be called with a running event loop; use async acquire() in async contexts."). Use await acquire() instead.
Example error:
async def bad_example():
    registry.acquire_sync("agent")  # RuntimeError!

async def good_example():
    await registry.acquire("agent")  # ✅ Correct

Tool Timeout Behavior

When tool_timeout is set, tools run in a dedicated executor with these characteristics:

Timeout Return Shape

On timeout, tools return a dict (not an exception):
{
    "error": "Tool timed out after 30s", 
    "timeout": True
}

Executor Details

  • One executor per Agent instance (lazy creation)
  • max_workers=2 threads per agent
  • Thread name prefix: tool-<agent_name> — useful for log filtering
  • Reused across calls — no resource leak
Which timeout to choose:

Common Patterns

Limit FastAPI Route Concurrency

from praisonaiagents import Agent
from praisonaiagents.agent.concurrency import ConcurrencyRegistry

registry = ConcurrencyRegistry()
registry.set_limit("chat_agent", 5)

@app.post("/chat")
async def chat_endpoint(message: str):
    await registry.acquire("chat_agent")
    try:
        agent = Agent(name="chat_agent", instructions="Help users")
        response = await agent.astart(message)
        return {"response": response}
    finally:
        registry.release("chat_agent")

Async Context Manager Helper

from contextlib import asynccontextmanager

@asynccontextmanager
async def throttled_agent(name: str, max_concurrent: int = 3):
    registry = ConcurrencyRegistry()
    registry.set_limit(name, max_concurrent)
    await registry.acquire(name)
    try:
        yield
    finally:
        registry.release(name)

# Usage
async with throttled_agent("researcher", 2):
    agent = Agent(name="researcher", instructions="Research topics")
    result = await agent.astart("Study quantum computing")

Timeout Selection by Tool Type

def get_agent_with_timeouts():
    return Agent(
        name="MultiTool Assistant",
        instructions="Help with various tasks",
        tools=[
            "web_search",      # Network IO
            "file_processor",  # Local computation  
            "simple_math"      # Fast operation
        ],
        tool_timeout=45  # Good balance for mixed workload
    )

Best Practices

Prevents deadlocks when exceptions occur:
registry.acquire_sync("agent")
try:
    # Agent work here
    agent.start("task")
finally:
    registry.release("agent")  # Always runs
Keep acquisition method consistent with execution context:
# ✅ Good - sync context, sync acquire
def sync_handler():
    registry.acquire_sync("agent")
    try:
        agent.start("task")
    finally:
        registry.release("agent")

# ✅ Good - async context, async acquire  
async def async_handler():
    await registry.acquire("agent")
    try:
        await agent.astart("task")
    finally:
        registry.release("agent")
Any tool that does network IO should have a timeout:
# Tools that need timeouts
network_tools = ["web_search", "api_call", "download_file"]
local_tools = ["calculate", "format_text", "parse_json"]

agent = Agent(
    name="Assistant",
    tools=network_tools + local_tools,
    tool_timeout=30  # Protects against slow network
)
Filter logs by agent name using the thread prefix:
# Filter tool execution logs by agent
grep "tool-researcher" app.log

# Or in Python logging
import logging
logging.basicConfig(format='%(threadName)s: %(message)s')

Plugins

Plugin thread-safety and concurrent execution

Tool Configuration

Tool timeout settings and performance tuning