Skip to main content

Thread-Safe Agent State

PraisonAI Agents v0.5.0+ includes thread-safe management of chat history and caches, enabling safe concurrent access from multiple threads.

Thread-Safe Components

Chat History

The chat_history list is protected by a threading.Lock:
from praisonaiagents import Agent
import threading

agent = Agent(
    name="ThreadSafeAgent",
    instructions="You are helpful."
)

def worker(prompt):
    # Safe to call from multiple threads
    response = agent.chat(prompt)
    print(f"Response: {response[:50]}...")

# Create multiple threads
threads = [
    threading.Thread(target=worker, args=(f"Question {i}",))
    for i in range(5)
]

# Start all threads
for t in threads:
    t.start()

# Wait for completion
for t in threads:
    t.join()

Caches

Internal caches use threading.RLock for reentrant locking:
  • _system_prompt_cache - Cached system prompts
  • _formatted_tools_cache - Cached tool definitions

LiteAgent Thread Safety

The lite package also provides thread-safe operations:
from praisonaiagents.lite import LiteAgent, create_openai_llm_fn
import threading

llm_fn = create_openai_llm_fn(model="gpt-4o-mini")
agent = LiteAgent(name="LiteThreadSafe", llm_fn=llm_fn)

def concurrent_chat(message):
    return agent.chat(message)

# Safe concurrent access
with threading.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(concurrent_chat, f"Q{i}") for i in range(10)]
    results = [f.result() for f in futures]

Implementation Details

Lock Types

ComponentLock TypeReason
chat_historyLockSimple mutual exclusion
cachesRLockAllows reentrant access

Lock Usage Pattern

# Internal implementation pattern
class Agent:
    def __init__(self):
        self._history_lock = threading.Lock()
        self._cache_lock = threading.RLock()
        self.chat_history = []
    
    def _add_to_history(self, message):
        with self._history_lock:
            self.chat_history.append(message)
    
    def _get_cached_prompt(self):
        with self._cache_lock:
            # Safe reentrant access
            return self._system_prompt_cache.get(key)

Best Practices

Do: Use Agent Methods

# Good - thread-safe
response = agent.chat("Hello")

Don’t: Directly Modify State

# Bad - bypasses locks
agent.chat_history.append({"role": "user", "content": "Hello"})

Do: Clear History Safely

# Good - use provided method
agent.clear_history()  # Thread-safe

Async Considerations

For async code, use asyncio locks instead:
import asyncio
from praisonaiagents import Agent

agent = Agent(name="AsyncAgent")
lock = asyncio.Lock()

async def async_chat(prompt):
    async with lock:
        # Ensure sequential access in async context
        return agent.chat(prompt)

async def main():
    tasks = [async_chat(f"Question {i}") for i in range(5)]
    results = await asyncio.gather(*tasks)

Verifying Thread Safety

Test thread safety with concurrent access:
import threading
from praisonaiagents.lite import LiteAgent

def test_thread_safety():
    agent = LiteAgent(
        name="Test",
        llm_fn=lambda m: "Response"
    )
    
    errors = []
    
    def worker():
        try:
            for _ in range(100):
                agent.chat("Test")
        except Exception as e:
            errors.append(e)
    
    threads = [threading.Thread(target=worker) for _ in range(10)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    assert len(errors) == 0, f"Thread safety errors: {errors}"
    print("Thread safety test passed!")

test_thread_safety()