Control how many instances of the same agent can run concurrently:
from praisonaiagents import Agentfrom praisonaiagents.agent.concurrency import ConcurrencyRegistryregistry = ConcurrencyRegistry()registry.set_limit("researcher", 2) # at most 2 concurrent runsagent = Agent(name="researcher", instructions="Research topics")# Sync contextregistry.acquire_sync("researcher")try: agent.start("Research Mars exploration")finally: registry.release("researcher")
2
Same, async
Use async context for better resource utilization:
await registry.acquire("researcher")try: await agent.astart("Research Mars exploration")finally: registry.release("researcher")
3
Bound tool time with tool_timeout
Prevent slow tools from blocking agent execution:
from praisonaiagents import Agentagent = Agent( name="Assistant", instructions="Use tools to help users", tools=["get_weather"], tool_timeout=30, # seconds; slow tools return a timeout dict)agent.start("What's the weather in Tokyo?")
The concurrency registry enforces strict separation between sync and async contexts:
Context
Method
What happens if you mix
Sync
registry.acquire_sync(name)
✅ Works correctly
Async
await registry.acquire(name)
✅ Works correctly
Mixed
acquire_sync() in async
❌ Raises RuntimeError
Calling acquire_sync() from an async context raises RuntimeError("acquire_sync('<agent_name>') cannot be called with a running event loop; use async acquire() in async contexts."). Use await acquire() instead.
Any tool that does network IO should have a timeout:
# Tools that need timeoutsnetwork_tools = ["web_search", "api_call", "download_file"]local_tools = ["calculate", "format_text", "parse_json"]agent = Agent( name="Assistant", tools=network_tools + local_tools, tool_timeout=30 # Protects against slow network)
Use thread names for debugging
Filter logs by agent name using the thread prefix:
# Filter tool execution logs by agentgrep "tool-researcher" app.log# Or in Python loggingimport logginglogging.basicConfig(format='%(threadName)s: %(message)s')