> ## Documentation Index
> Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Thread-Safe Agent State

> Thread-safe chat history and cache management

# Thread-Safe Agent State

PraisonAI Agents v0.5.0+ includes thread-safe management of chat history and caches; PR #1567 makes the underlying lock re-entrant and adds a per-event-loop async lock.

<Warning>
  **Behaviour change in PR #1548**: `run_sync()` now raises `RuntimeError` when called from inside a running event loop. Previously it would auto-fallback to a background loop.

  ```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
  # Before PR #1548 (worked, but unsafe)
  async def handler():
      result = run_sync(some_coro())  # silently used background loop

  # After PR #1548 (raises RuntimeError)
  async def handler():
      result = await some_coro()  # use this from async context
  ```
</Warning>

## Thread-Safe Components

### Chat History

The `chat_history` property is now fully thread-safe with automatic locking. The SDK protects chat history mutations through internal helper methods and a locked setter:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents import Agent
import threading

agent = Agent(
    name="ThreadSafeAgent",
    instructions="You are helpful."
)

def worker(prompt):
    # Safe to call from multiple threads
    response = agent.chat(prompt)
    print(f"Response: {response[:50]}...")

# Create multiple threads
threads = [
    threading.Thread(target=worker, args=(f"Question {i}",))
    for i in range(5)
]

# Start all threads
for t in threads:
    t.start()

# Wait for completion
for t in threads:
    t.join()
```

#### What changed in PR #1488

<Note>
  Prior to PR #1488, chat\_history mutations bypassed thread-safety locks at 31+ call sites. The SDK now uses internal helper methods that properly acquire locks:

  * `_append_to_chat_history(message)` - Thread-safe message appending
  * `_truncate_chat_history(length)` - Thread-safe history truncation
  * `_replace_chat_history(new_history)` - Thread-safe full replacement
  * `chat_history` setter now acquires the `AsyncSafeState` lock for assignments
</Note>

#### What changed in PR #1514

<Note>
  PR #1514 enhanced thread-safety in three key areas:

  **1. Locked Memory Initialization**: `Task.initialize_memory()` now uses `threading.Lock` with double-checked locking pattern. A new async variant `initialize_memory_async()` uses `asyncio.Lock` and offloads construction with `asyncio.to_thread()` to prevent event loop blocking.

  **2. Async-Locked Workflow State**: New `_set_workflow_finished(value)` method uses async locks to safely update workflow completion status across concurrent tasks.

  **3. Non-Mutating Task Context**: Task execution no longer mutates `task.description` during runs. Per-execution context is stored in `_execution_context` field, keeping the user-facing `task.description` stable across multiple executions.
</Note>

#### Safe operations

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# These operations are now thread-safe out of the box:
agent.chat_history = []  # Full replacement - uses locked setter
agent.chat("Hello")      # Appends safely via internal methods

# Reading is always safe:
history = agent.chat_history
print(f"History has {len(history)} messages")
```

### Caches

Internal caches use `threading.RLock` for reentrant locking:

* `_system_prompt_cache` - Cached system prompts
* `_formatted_tools_cache` - Cached tool definitions

### Rate Limiter

`RateLimiter` can be shared across threads and agents. Both the sync and async method families are fully locked — see [Rate Limiter → Thread Safety & Multi-Agent Use](/docs/features/rate-limiter#thread-safety--multi-agent-use) for patterns.

## LiteAgent Thread Safety

The lite package also provides thread-safe operations:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents.lite import LiteAgent, create_openai_llm_fn
import threading

llm_fn = create_openai_llm_fn(model="gpt-4o-mini")
agent = LiteAgent(name="LiteThreadSafe", llm_fn=llm_fn)

def concurrent_chat(message):
    return agent.chat(message)

# Safe concurrent access
with threading.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(concurrent_chat, f"Q{i}") for i in range(10)]
    results = [f.result() for f in futures]
```

## Implementation Details

### Lock Types

| Component             | Lock Type                                                    | Reason                                                                                         |
| --------------------- | ------------------------------------------------------------ | ---------------------------------------------------------------------------------------------- |
| `chat_history`        | `AsyncSafeState` (DualLock: RLock + per-loop `asyncio.Lock`) | Re-entrant on same thread; non-blocking in async contexts                                      |
| `_cache_lock`         | `threading.RLock`                                            | Allows reentrant access from cached helpers                                                    |
| `RateLimiter` (sync)  | `threading.Lock`                                             | Protects `_tokens`, `_api_tokens`, and refill state from races in multi-threaded acquire calls |
| `RateLimiter` (async) | `asyncio.Lock`                                               | Same protection for coroutine contexts                                                         |

### Lock Usage Pattern

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Internal implementation pattern
class Agent:
    def __init__(self):
        self.__chat_history_state = AsyncSafeState([])
        self._cache_lock = threading.RLock()
    
    @property
    def _history_lock(self):
        return self.__chat_history_state
    
    def _append_to_chat_history(self, message):
        with self._history_lock.lock():
            self._history_lock.value.append(message)
```

### Why a re-entrant lock?

Nested calls (e.g. a helper that holds the lock and then assigns `chat_history`, which itself acquires the lock) used to deadlock. `RLock` permits the same thread to re-enter. See [PR #1567](https://github.com/MervinPraison/PraisonAI/pull/1567) for details.

## Best Practices

### Do: Use Agent Methods

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Good - thread-safe
response = agent.chat("Hello")
```

### Don't: Bypass the Property Interface

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Bad - bypasses locks (direct list mutation)
agent.chat_history.append({"role": "user", "content": "Hello"})

# Good - uses locked setter
agent.chat_history = agent.chat_history + [{"role": "user", "content": "Hello"}]

# Better - use agent methods
agent.chat("Hello")
```

<Note>
  Reads and full replacements via `agent.chat_history = [...]` are now safe out-of-the-box. The wrapper is only needed for custom compound operations that require atomic read-modify-write sequences.
</Note>

### Do: Clear History Safely

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Good - use provided method
agent.clear_history()  # Thread-safe
```

## Async Considerations

`agent.chat_history` is async-aware out of the box — no external `asyncio.Lock` is required when all calls are inside the same event loop.

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import asyncio
from praisonaiagents import Agent

agent = Agent(name="AsyncAgent")

async def async_chat(prompt):
    # No external lock needed - AsyncSafeState handles this
    return agent.chat(prompt)

async def main():
    tasks = [async_chat(f"Question {i}") for i in range(5)]
    results = await asyncio.gather(*tasks)
```

<Warning>
  Since PR #1567, `DualLock.sync()` and `DualLock.async_lock()` use **independent** locks. A sync caller holding the lock will **not** block an async caller from acquiring it, and vice versa. Within a single context (all-sync or all-async) the lock works as expected; across contexts it does not coordinate. If you mutate `agent.chat_history` from both sync and async code paths, serialise the boundary yourself.
</Warning>

```mermaid theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
graph TB
    subgraph "Sync Context"
        SyncCaller[👨‍💻 Sync Caller] --> SyncLock[🔒 threading.RLock]
        SyncLock --> ChatHistory[💬 chat_history]
    end
    
    subgraph "Async Context"
        AsyncCaller[🚀 Async Caller] --> AsyncLock[🔄 asyncio.Lock<br/>per event loop]
        AsyncLock --> ChatHistory
    end
    
    SyncLock -.->|no cross-context<br/>coordination| AsyncLock
    
    classDef sync fill:#189AB4,stroke:#7C90A0,color:#fff
    classDef async fill:#F59E0B,stroke:#7C90A0,color:#fff
    classDef shared fill:#10B981,stroke:#7C90A0,color:#fff
    
    class SyncCaller,SyncLock sync
    class AsyncCaller,AsyncLock async
    class ChatHistory shared
```

An external lock **is** still useful for serialising chat-history mutations from a thread pool that mixes sync and async callers:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import asyncio
import threading
from praisonaiagents import Agent

agent = Agent(name="MixedAgent")
external_lock = threading.Lock()

def sync_worker(prompt):
    with external_lock:
        return agent.chat(prompt)

async def async_worker(prompt):
    # Convert to sync context for coordination
    loop = asyncio.get_event_loop()
    with external_lock:
        return await loop.run_in_executor(None, agent.chat, prompt)
```

## Verifying Thread Safety

Test thread safety with concurrent access:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import threading
from praisonaiagents.lite import LiteAgent

def test_thread_safety():
    agent = LiteAgent(
        name="Test",
        llm_fn=lambda m: "Response"
    )
    
    errors = []
    
    def worker():
        try:
            for _ in range(100):
                agent.chat("Test")
        except Exception as e:
            errors.append(e)
    
    threads = [threading.Thread(target=worker) for _ in range(10)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    assert len(errors) == 0, f"Thread safety errors: {errors}"
    print("Thread safety test passed!")

test_thread_safety()
```

### Multi-team HTTP launch

PraisonAI provides comprehensive thread-safety for HTTP server deployment:

* Multiple `Agent` / `Agents` instances may call `.launch(port=N)` concurrently from different threads — registration is atomic.
* If two launch calls use the same path on the same port, the second gets an auto-suffixed path (`/path_abc123`) and a warning is logged.
* Server readiness is signalled deterministically (no fixed sleep); `.launch()` returns only after the port is accepting connections (5s timeout).
* `aworkflow()` state lock is created inside the running async context, so workflows remain stable when invoked under pytest-asyncio or when nested inside another loop.

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import threading
from praisonaiagents import AgentTeam

def launch_team(team_name, port, path):
    team = AgentTeam(name=team_name)
    team.launch(port=port, path=path)

# Safe concurrent launches
thread1 = threading.Thread(target=launch_team, args=("TeamA", 8000, "/team_a"))
thread2 = threading.Thread(target=launch_team, args=("TeamB", 8000, "/team_b"))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

# Both teams available at:
# - http://localhost:8000/team_a
# - http://localhost:8000/team_b
```

## Wrapper-layer thread safety (`praisonai` package)

The `praisonai` wrapper layer (distinct from the `praisonaiagents` content above) provides thread-safe OpenAI client management and CLI command discovery.

```mermaid theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
graph LR
    subgraph "Thread-Safe OpenAI Client"
        T1[🧵 Thread 1] --> K1[🔑 API Key A]
        T2[🧵 Thread 2] --> K2[🔑 API Key B]
        K1 --> C1[📡 Client A]
        K2 --> C2[📡 Client B]
        L[🔒 Lock] -.-> C1
        L -.-> C2
    end
    
    classDef thread fill:#8B0000,stroke:#7C90A0,color:#fff
    classDef key fill:#F59E0B,stroke:#7C90A0,color:#fff
    classDef client fill:#189AB4,stroke:#7C90A0,color:#fff
    classDef lock fill:#6366F1,stroke:#7C90A0,color:#fff
    
    class T1,T2 thread
    class K1,K2 key
    class C1,C2 client
    class L lock
```

### Key-aware OpenAI client

The OpenAI client is now cached per `(api_key, base_url)` tuple, allowing multiple keys in the same process without cross-talk:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonai import PraisonAI
import threading

def worker_with_different_key(api_key, task_name):
    # Each thread gets its own client based on the key
    praisonai = PraisonAI(
        auto=f"Create a {task_name}",
        api_key=api_key  # Different key per thread
    )
    result = praisonai.run()
    print(f"{task_name} completed")

# Two threads with different OpenAI keys
thread1 = threading.Thread(
    target=worker_with_different_key,
    args=("sk-key-team-a", "marketing plan")
)
thread2 = threading.Thread(
    target=worker_with_different_key, 
    args=("sk-key-team-b", "technical doc")
)

thread1.start()
thread2.start()
thread1.join()
thread2.join()
```

### Thread-safe Typer command discovery

Embedding `python -m praisonai` from multiple threads is now safe. The CLI command discovery uses a double-check lock pattern and doesn't poison the cache on failure:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import threading
import subprocess

def run_cli_command(command):
    # Safe to call from multiple threads
    result = subprocess.run(
        ["python", "-m", "praisonai"] + command,
        capture_output=True, text=True
    )
    return result.stdout

# Multiple threads can safely use the CLI
threads = [
    threading.Thread(target=run_cli_command, args=(["--version"],))
    for _ in range(5)
]

for t in threads:
    t.start()
for t in threads:
    t.join()
```

### Failure-safe cache

A transient discovery error does not lock the CLI into a broken state — the next call retries instead of permanently breaking dispatch. This ensures reliable operation in multi-threaded server environments where temporary import failures might occur.

### New Thread-Safe Components in PR #1548

**AsyncAgentScheduler** is now loop-aware. The `start()` method binds its async primitives (`asyncio.Event`, `asyncio.Lock`) to the running loop, and `stop()` raises `RuntimeError` if called from a different loop than `start()`.

**Lazy loaders in `praisonai/auto.py`** are now thread-safe. A single `_load_optional(key, loader)` helper with a module-level lock replaces the previous unguarded module-level globals.

**Integration registry** (`praisonai/integrations/registry.py`) now has a per-instance `threading.Lock` guarding `register`/`unregister`/`create`/`list_registered` operations.

## Related

* [Thread Safety CLI](/docs/cli/thread-safety)
* [Lite Package](/docs/features/lite-package)
* [Agent Module](/docs/sdk/praisonaiagents/agent/agent)
