Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Thread-Safe Agent State

PraisonAI Agents v0.5.0+ includes thread-safe management of chat history and caches; PR #1567 makes the underlying lock re-entrant and adds a per-event-loop async lock.
Behaviour change in PR #1548: run_sync() now raises RuntimeError when called from inside a running event loop. Previously it would auto-fallback to a background loop.
# Before PR #1548 (worked, but unsafe)
async def handler():
    result = run_sync(some_coro())  # silently used background loop

# After PR #1548 (raises RuntimeError)
async def handler():
    result = await some_coro()  # use this from async context

Thread-Safe Components

Chat History

The chat_history property is now fully thread-safe with automatic locking. The SDK protects chat history mutations through internal helper methods and a locked setter:
from praisonaiagents import Agent
import threading

agent = Agent(
    name="ThreadSafeAgent",
    instructions="You are helpful."
)

def worker(prompt):
    # Safe to call from multiple threads
    response = agent.chat(prompt)
    print(f"Response: {response[:50]}...")

# Create multiple threads
threads = [
    threading.Thread(target=worker, args=(f"Question {i}",))
    for i in range(5)
]

# Start all threads
for t in threads:
    t.start()

# Wait for completion
for t in threads:
    t.join()

What changed in PR #1488

Prior to PR #1488, chat_history mutations bypassed thread-safety locks at 31+ call sites. The SDK now uses internal helper methods that properly acquire locks:
  • _append_to_chat_history(message) - Thread-safe message appending
  • _truncate_chat_history(length) - Thread-safe history truncation
  • _replace_chat_history(new_history) - Thread-safe full replacement
  • chat_history setter now acquires the AsyncSafeState lock for assignments

What changed in PR #1514

PR #1514 enhanced thread-safety in three key areas:1. Locked Memory Initialization: Task.initialize_memory() now uses threading.Lock with double-checked locking pattern. A new async variant initialize_memory_async() uses asyncio.Lock and offloads construction with asyncio.to_thread() to prevent event loop blocking.2. Async-Locked Workflow State: New _set_workflow_finished(value) method uses async locks to safely update workflow completion status across concurrent tasks.3. Non-Mutating Task Context: Task execution no longer mutates task.description during runs. Per-execution context is stored in _execution_context field, keeping the user-facing task.description stable across multiple executions.

Safe operations

# These operations are now thread-safe out of the box:
agent.chat_history = []  # Full replacement - uses locked setter
agent.chat("Hello")      # Appends safely via internal methods

# Reading is always safe:
history = agent.chat_history
print(f"History has {len(history)} messages")

Caches

Internal caches use threading.RLock for reentrant locking:
  • _system_prompt_cache - Cached system prompts
  • _formatted_tools_cache - Cached tool definitions

Rate Limiter

RateLimiter can be shared across threads and agents. Both the sync and async method families are fully locked — see Rate Limiter → Thread Safety & Multi-Agent Use for patterns.

LiteAgent Thread Safety

The lite package also provides thread-safe operations:
from praisonaiagents.lite import LiteAgent, create_openai_llm_fn
import threading

llm_fn = create_openai_llm_fn(model="gpt-4o-mini")
agent = LiteAgent(name="LiteThreadSafe", llm_fn=llm_fn)

def concurrent_chat(message):
    return agent.chat(message)

# Safe concurrent access
with threading.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(concurrent_chat, f"Q{i}") for i in range(10)]
    results = [f.result() for f in futures]

Implementation Details

Lock Types

ComponentLock TypeReason
chat_historyAsyncSafeState (DualLock: RLock + per-loop asyncio.Lock)Re-entrant on same thread; non-blocking in async contexts
_cache_lockthreading.RLockAllows reentrant access from cached helpers
PersistenceOrchestrator._cache_lockthreading.RLockProtects _session_cache reads/writes; reads return deepcopy() to prevent shared mutable state
RateLimiter (sync)threading.LockProtects _tokens, _api_tokens, and refill state from races in multi-threaded acquire calls
RateLimiter (async)asyncio.LockSame protection for coroutine contexts

Lock Usage Pattern

# Internal implementation pattern
class Agent:
    def __init__(self):
        self.__chat_history_state = AsyncSafeState([])
        self._cache_lock = threading.RLock()
    
    @property
    def _history_lock(self):
        return self.__chat_history_state
    
    def _append_to_chat_history(self, message):
        with self._history_lock.lock():
            self._history_lock.value.append(message)

Why a re-entrant lock?

Nested calls (e.g. a helper that holds the lock and then assigns chat_history, which itself acquires the lock) used to deadlock. RLock permits the same thread to re-enter. See PR #1567 for details.

Persistence Orchestrator session cache

PersistenceOrchestrator now guards its in-memory session cache with threading.RLock and returns deep copies on read, so concurrent agents can share an orchestrator without corrupting cached ConversationSession objects.
from praisonaiagents import Agent
from praisonai.persistence import PersistenceOrchestrator
import threading

# Create shared orchestrator
orchestrator = PersistenceOrchestrator()

def agent_worker(agent_name, session_id):
    agent = Agent(name=agent_name)
    
    # Safe: orchestrator returns deep copy of cached session
    history = orchestrator.on_agent_start(agent, session_id=session_id)
    
    # Multiple agents can safely read/update sessions concurrently
    orchestrator.on_message(session_id, "user", "Hello from " + agent_name)
    orchestrator.on_agent_end(agent, session_id)

# Multiple threads can safely share the orchestrator
threads = [
    threading.Thread(target=agent_worker, args=(f"Agent{i}", "session-123"))
    for i in range(3)
]

for t in threads:
    t.start()
for t in threads:
    t.join()
Reference: PraisonAI PR #1609. The session cache uses defensive copying to prevent shared mutable state between concurrent operations.

Best Practices

Do: Use Agent Methods

# Good - thread-safe
response = agent.chat("Hello")

Don’t: Bypass the Property Interface

# Bad - bypasses locks (direct list mutation)
agent.chat_history.append({"role": "user", "content": "Hello"})

# Good - uses locked setter
agent.chat_history = agent.chat_history + [{"role": "user", "content": "Hello"}]

# Better - use agent methods
agent.chat("Hello")
Reads and full replacements via agent.chat_history = [...] are now safe out-of-the-box. The wrapper is only needed for custom compound operations that require atomic read-modify-write sequences.

Do: Clear History Safely

# Good - use provided method
agent.clear_history()  # Thread-safe

Async Considerations

agent.chat_history is async-aware out of the box — no external asyncio.Lock is required when all calls are inside the same event loop.
import asyncio
from praisonaiagents import Agent

agent = Agent(name="AsyncAgent")

async def async_chat(prompt):
    # No external lock needed - AsyncSafeState handles this
    return agent.chat(prompt)

async def main():
    tasks = [async_chat(f"Question {i}") for i in range(5)]
    results = await asyncio.gather(*tasks)
Since PR #1567, DualLock.sync() and DualLock.async_lock() use independent locks. A sync caller holding the lock will not block an async caller from acquiring it, and vice versa. Within a single context (all-sync or all-async) the lock works as expected; across contexts it does not coordinate. If you mutate agent.chat_history from both sync and async code paths, serialise the boundary yourself.
An external lock is still useful for serialising chat-history mutations from a thread pool that mixes sync and async callers:
import asyncio
import threading
from praisonaiagents import Agent

agent = Agent(name="MixedAgent")
external_lock = threading.Lock()

def sync_worker(prompt):
    with external_lock:
        return agent.chat(prompt)

async def async_worker(prompt):
    # Convert to sync context for coordination
    loop = asyncio.get_event_loop()
    with external_lock:
        return await loop.run_in_executor(None, agent.chat, prompt)

Verifying Thread Safety

Test thread safety with concurrent access:
import threading
from praisonaiagents.lite import LiteAgent

def test_thread_safety():
    agent = LiteAgent(
        name="Test",
        llm_fn=lambda m: "Response"
    )
    
    errors = []
    
    def worker():
        try:
            for _ in range(100):
                agent.chat("Test")
        except Exception as e:
            errors.append(e)
    
    threads = [threading.Thread(target=worker) for _ in range(10)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    assert len(errors) == 0, f"Thread safety errors: {errors}"
    print("Thread safety test passed!")

test_thread_safety()

Multi-team HTTP launch

PraisonAI provides comprehensive thread-safety for HTTP server deployment:
  • Multiple Agent / Agents instances may call .launch(port=N) concurrently from different threads — registration is atomic.
  • If two launch calls use the same path on the same port, the second gets an auto-suffixed path (/path_abc123) and a warning is logged.
  • Server readiness is signalled deterministically (no fixed sleep); .launch() returns only after the port is accepting connections. The wait defaults to 5 seconds and is configurable via the PRAISONAI_SERVER_READY_TIMEOUT environment variable. If the server doesn’t become ready in time, .launch() still returns and a warning is logged — check server logs for startup errors.
  • aworkflow() state lock is created inside the running async context, so workflows remain stable when invoked under pytest-asyncio or when nested inside another loop.
import threading
from praisonaiagents import AgentTeam

def launch_team(team_name, port, path):
    team = AgentTeam(name=team_name)
    team.launch(port=port, path=path)

# Safe concurrent launches
thread1 = threading.Thread(target=launch_team, args=("TeamA", 8000, "/team_a"))
thread2 = threading.Thread(target=launch_team, args=("TeamB", 8000, "/team_b"))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

# Both teams available at:
# - http://localhost:8000/team_a
# - http://localhost:8000/team_b

Wrapper-layer thread safety (praisonai package)

The praisonai wrapper layer (distinct from the praisonaiagents content above) provides thread-safe OpenAI client management and CLI command discovery.

Per-instance OpenAI client lifecycle

Each BaseAutoGenerator owns its own OpenAI client — no cross-instance sharing, no LRU eviction surprises.
from praisonai import PraisonAI

# Each PraisonAI instance constructs its own auto-generator, which owns
# a single OpenAI client. Two concurrent instances cannot evict each
# other's client — even with identical API keys.
team_a = PraisonAI(auto="Draft a marketing plan", api_key="sk-team-a")
team_b = PraisonAI(auto="Draft a technical doc", api_key="sk-team-b")

team_a.run()  # safe to run in parallel threads
team_b.run()
The client is created lazily on first structured-completion call. The instance’s __del__ makes a best-effort client.close(), but the canonical path is explicit close() on the generator if you build one directly. This matters in long-lived server processes that spawn many generators.
For power users building generators directly:
from praisonai.auto import BaseAutoGenerator

gen = BaseAutoGenerator(config_list=[{
    "model": "gpt-4o-mini",
    "api_key": "sk-...",
    "base_url": None,
}])
try:
    # _get_openai_client() is double-checked-locked; calling it from
    # multiple threads on the same instance returns the same client.
    result = gen._structured_completion(MyModel, messages=[...])
finally:
    gen.close()  # explicit cleanup — recommended over relying on __del__
Behaviour change in PR #1681: the module-level functions praisonai.auto._get_openai_client(api_key, base_url) and the _openai_clients / _openai_clients_lock globals have been removed. If you imported them, switch to constructing an OpenAI client yourself or call BaseAutoGenerator(...).\_get_openai_client(). Each generator now owns exactly one client; the previous bug — an in-use client being evicted from a process-wide LRU and closed while other threads still held a reference — is no longer possible.

Thread-safe Typer command discovery

Embedding python -m praisonai from multiple threads is now safe. The CLI command discovery uses a double-check lock pattern and doesn’t poison the cache on failure:
import threading
import subprocess

def run_cli_command(command):
    # Safe to call from multiple threads
    result = subprocess.run(
        ["python", "-m", "praisonai"] + command,
        capture_output=True, text=True
    )
    return result.stdout

# Multiple threads can safely use the CLI
threads = [
    threading.Thread(target=run_cli_command, args=(["--version"],))
    for _ in range(5)
]

for t in threads:
    t.start()
for t in threads:
    t.join()

Failure-safe cache

A transient discovery error does not lock the CLI into a broken state — the next call retries instead of permanently breaking dispatch. This ensures reliable operation in multi-threaded server environments where temporary import failures might occur.

New Thread-Safe Components in PR #1548

AsyncAgentScheduler is now loop-aware. The start() method binds its async primitives (asyncio.Event, asyncio.Lock) to the running loop, and stop() raises RuntimeError if called from a different loop than start(). Lazy loaders in praisonai/auto.py are now thread-safe. A single _load_optional(key, loader) helper with a module-level lock replaces the previous unguarded module-level globals. inbuilt_tools lazy import (PR #1681) now routes through praisonai.auto._load_optional("inbuilt_autogen_tools", ...) instead of a hand-rolled re-entry guard. Negative results are cached, so a missing crewai or autogen install no longer pays the find_spec cost on every attribute access. PRAISONAI_TOOLS_AVAILABLE is now resolved lazily via __getattr__. Integration registry (praisonai/integrations/registry.py) now has a per-instance threading.Lock guarding register/unregister/create/list_registered operations.

New Thread-Safe Components in PR #1673

InMemoryJobStore — locked reads and async get_stats() All read methods (get, get_by_idempotency_key, list_jobs, count, get_stats) now hold an asyncio.Lock while reading internal dicts, so concurrent saves cannot tear a read.
Breaking change: get_stats() is now a coroutine. Update your code:
# Before PR #1673
stats = store.get_stats()

# After PR #1673
stats = await store.get_stats()
AgentScheduler — interruptible retry backoff (sync scheduler) stop() now becomes responsive within milliseconds even during retry backoff. The sync scheduler also adopts the shared backoff_delay() curve so sync and async retries are identical.
from praisonai.scheduler import AgentScheduler

scheduler = AgentScheduler(agent, "Hourly news check")
scheduler.start("hourly", max_retries=3)
# ... later, from another thread or signal handler:
scheduler.stop()  # returns within ms, even if a backoff sleep was in progress
ToolRegistry — thread-safe registry operations ToolRegistry now holds a threading.Lock around all reads and mutations, matching PluginRegistry / integration registry. Eliminates RuntimeError: dictionary changed size during iteration when registering tools concurrently with iteration. Reference: PR #1673.