Skip to main content
Concurrency controls let you limit parallel agent execution and set timeouts for tool calls to prevent resource exhaustion.

Quick Start

1

Limit parallel runs of an agent

Control how many instances of the same agent can run concurrently:
from praisonaiagents import Agent
from praisonaiagents.agent.concurrency import ConcurrencyRegistry

registry = ConcurrencyRegistry()
registry.set_limit("researcher", 2)  # at most 2 concurrent runs

agent = Agent(name="researcher", instructions="Research topics")

# Sync context
registry.acquire_sync("researcher")
try:
    agent.start("Research Mars exploration")
finally:
    registry.release("researcher")
2

Same, async

Use async context for better resource utilization:
await registry.acquire("researcher")
try:
    await agent.astart("Research Mars exploration")
finally:
    registry.release("researcher")
3

Bound tool time with ToolConfig

Prevent slow tools from blocking agent execution:
from praisonaiagents import Agent, ToolConfig

agent = Agent(
    name="Assistant",
    instructions="Use tools to help users",
    tools=["get_weather"],
    tool_config=ToolConfig(timeout=30),  # seconds; slow tools return a timeout dict
)
agent.start("What's the weather in Tokyo?")

How It Works

ComponentPurposeThread Safety
ConcurrencyRegistryLimits parallel agent runs✅ Thread-safe
Tool ExecutorRuns tools with timeout✅ Per-agent pool
Plugin APIsEnable/disable plugins✅ Lock-protected

Sync vs Async Rule

The concurrency registry enforces strict separation between sync and async contexts:
ContextMethodWhat happens if you mix
Syncregistry.acquire_sync(name)✅ Works correctly
Asyncawait registry.acquire(name)✅ Works correctly
Mixedacquire_sync() in async❌ Raises RuntimeError
Calling acquire_sync() from an async context raises RuntimeError("acquire_sync('<agent_name>') cannot be called with a running event loop; use async acquire() in async contexts."). Use await acquire() instead.
Example error:
async def bad_example():
    registry.acquire_sync("agent")  # RuntimeError!

async def good_example():
    await registry.acquire("agent")  # ✅ Correct

Tool Timeout Behavior

When tool_config=ToolConfig(timeout=...) is set, tools run in a dedicated executor with these characteristics: In YAML the field name is still tool_timeout:; in Python use tool_config=ToolConfig(timeout=…).

Timeout Return Shape

On timeout, tools return different shapes depending on which layer enforces the timeout:
LayerTriggerReturn shape
SDK Agent executorAgent(tool_config=ToolConfig(timeout=30)) directly in Python{"error": "Tool timed out after 30s", "timeout": True}
Wrapper boundaryYAML tool_timeout: 30 or CLI --tool-timeout 30 (framework: praisonai){"error": "tool_timeout", "tool": <fn name>, "timeout_seconds": 30} (returned as a JSON string from the wrapper)

Executor Details

  • One executor per Agent instance (lazy creation)
  • max_workers=2 threads per agent
  • Thread name prefix: tool-<agent_name> — useful for log filtering
  • Reused across calls — no resource leak
Which timeout to choose:

Common Patterns

Limit FastAPI Route Concurrency

from praisonaiagents import Agent
from praisonaiagents.agent.concurrency import ConcurrencyRegistry

registry = ConcurrencyRegistry()
registry.set_limit("chat_agent", 5)

@app.post("/chat")
async def chat_endpoint(message: str):
    await registry.acquire("chat_agent")
    try:
        agent = Agent(name="chat_agent", instructions="Help users")
        response = await agent.astart(message)
        return {"response": response}
    finally:
        registry.release("chat_agent")

Async Context Manager Helper

from contextlib import asynccontextmanager

@asynccontextmanager
async def throttled_agent(name: str, max_concurrent: int = 3):
    registry = ConcurrencyRegistry()
    registry.set_limit(name, max_concurrent)
    await registry.acquire(name)
    try:
        yield
    finally:
        registry.release(name)

# Usage
async with throttled_agent("researcher", 2):
    agent = Agent(name="researcher", instructions="Research topics")
    result = await agent.astart("Study quantum computing")

Timeout Selection by Tool Type

from praisonaiagents import Agent, ToolConfig

def get_agent_with_timeouts():
    return Agent(
        name="MultiTool Assistant",
        instructions="Help with various tasks",
        tools=[
            "web_search",      # Network IO
            "file_processor",  # Local computation  
            "simple_math"      # Fast operation
        ],
        tool_config=ToolConfig(timeout=45)  # Good balance for mixed workload
    )

Best Practices

Prevents deadlocks when exceptions occur:
registry.acquire_sync("agent")
try:
    # Agent work here
    agent.start("task")
finally:
    registry.release("agent")  # Always runs
Keep acquisition method consistent with execution context:
# ✅ Good - sync context, sync acquire
def sync_handler():
    registry.acquire_sync("agent")
    try:
        agent.start("task")
    finally:
        registry.release("agent")

# ✅ Good - async context, async acquire  
async def async_handler():
    await registry.acquire("agent")
    try:
        await agent.astart("task")
    finally:
        registry.release("agent")
Any tool that does network IO should have a timeout:
from praisonaiagents import Agent, ToolConfig

# Tools that need timeouts
network_tools = ["web_search", "api_call", "download_file"]
local_tools = ["calculate", "format_text", "parse_json"]

agent = Agent(
    name="Assistant",
    tools=network_tools + local_tools,
    tool_config=ToolConfig(timeout=30)  # Protects against slow network
)
Filter logs by agent name using the thread prefix:
# Filter tool execution logs by agent
grep "tool-researcher" app.log

# Or in Python logging
import logging
logging.basicConfig(format='%(threadName)s: %(message)s')

Retries

Tool failures can be automatically retried using the retry policy feature. This works alongside timeouts to handle transient errors:
from praisonaiagents import Agent, ToolConfig  
from praisonaiagents.tools.retry import RetryPolicy

# Enable retry with defaults
agent = Agent(
    name="resilient_agent",
    tools=[web_search, api_tool],
    tool_retry_policy=RetryPolicy()
)

# Using consolidated tool config (preferred)
agent = Agent(
    name="modern_agent",
    tools=[flaky_tool],
    tool_config=ToolConfig(
        timeout=30,
        parallel=True,
        retry_policy=RetryPolicy(
            max_attempts=3,
            retry_on={"timeout", "rate_limit", "connection_error"},
            backoff_factor=2.0,
            jitter=True
        )
    )
)
For complete retry configuration and error handling strategies, see Tool Retry Policy.

Tool Retry Policy

Automatically retry failed tool calls with exponential backoff

Tool Configuration

Tool timeout settings and performance tuning

Async Bridge

Safe sync↔async boundary crossing utilities

Thread Safety

Chat history and state protection mechanisms