Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Send agent events to any destination - HTTP endpoints, databases, cloud services, or your own systems.
PraisonAI uses a protocol-driven design. Implement 3 methods and your sink works with any agent.

Quick Start

1

Create Your Sink

Implement 3 methods: emit(), flush(), close()
class MySink:
    def emit(self, event):
        print(f"Event: {event.event_type.value}")
    
    def flush(self):
        pass
    
    def close(self):
        pass
2

Connect to Agent

Use trace_context() to capture all agent events
from praisonaiagents import Agent, ContextTraceEmitter, trace_context

sink = MySink()
emitter = ContextTraceEmitter(sink=sink, session_id="my-session", enabled=True)

with trace_context(emitter):
    agent = Agent(instructions="You are helpful")
    agent.chat("Hello!")  # Events go to MySink

How It Works

The Protocol

Your sink must implement 3 methods. That’s it!
class ContextTraceSinkProtocol:
    def emit(self, event): ...   # Receive event
    def flush(self): ...         # Flush buffer
    def close(self): ...         # Release resources
ContextTraceSink is a backward-compatible alias for ContextTraceSinkProtocol.

Example Sinks

Send events to a remote HTTP endpoint with batching.
class HTTPSink:
    def __init__(self, url, batch_size=10):
        self.url = url
        self.buffer = []
        self.batch_size = batch_size
    
    def emit(self, event):
        self.buffer.append(event.to_dict())
        if len(self.buffer) >= self.batch_size:
            self.flush()
    
    def flush(self):
        if self.buffer:
            import requests
            requests.post(self.url, json=self.buffer)
            self.buffer.clear()
    
    def close(self):
        self.flush()
Store events in SQLite database for later analysis.
class SQLiteSink:
    def __init__(self, db_path="traces.db"):
        import sqlite3
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS events (
                id INTEGER PRIMARY KEY,
                session_id TEXT,
                event_type TEXT,
                agent_name TEXT,
                timestamp REAL,
                data TEXT
            )
        """)
    
    def emit(self, event):
        import json
        self.conn.execute(
            "INSERT INTO events VALUES (NULL, ?, ?, ?, ?, ?)",
            (event.session_id, event.event_type.value, 
             event.agent_name, event.timestamp, json.dumps(event.data))
        )
    
    def flush(self):
        self.conn.commit()
    
    def close(self):
        self.flush()
        self.conn.close()
Print events to console for debugging.
class ConsoleSink:
    def emit(self, event):
        print(f"[{event.event_type.value}] {event.agent_name or 'session'}")
    
    def flush(self):
        pass
    
    def close(self):
        pass

Event Types

EventDescription
session_startSession begins
session_endSession ends
agent_startAgent starts
agent_endAgent finishes
agent_handoffAgent hands off
message_addedMessage added
tool_call_startTool starts
tool_call_endTool finishes
llm_requestLLM request sent
llm_responseLLM response received
memory_storeMemory stored
memory_searchMemory searched
knowledge_addKnowledge added
knowledge_searchKnowledge searched

Event Data

event.event_type      # ContextEventType enum
event.timestamp       # Unix timestamp
event.session_id      # Session identifier
event.agent_name      # Agent name (if applicable)
event.sequence_num    # Event sequence number
event.data            # Event-specific data dict
event.to_dict()       # Convert to dictionary
event.to_json()       # Convert to JSON string

Architecture

Best Practices

Buffer Events

Batch events before sending to reduce network calls

Handle Errors

Sink errors are caught silently - tracing never crashes agents

Use trace_context

Automatic cleanup even if exceptions occur

Thread Safety

Use locks if your sink is accessed from multiple threads

Zero Overhead

When tracing is disabled, there’s zero performance impact:
# Disabled by default - zero overhead
emitter = ContextTraceEmitter(sink=sink, enabled=False)

# Or use NoOpSink
from praisonaiagents import ContextNoOpSink
emitter = ContextTraceEmitter(sink=ContextNoOpSink())

API Reference

ContextTraceEmitter

from praisonaiagents import ContextTraceEmitter

emitter = ContextTraceEmitter(
    sink=my_sink,           # Your sink implementation
    session_id="my-session", # Session identifier
    enabled=True,           # Enable/disable tracing
    redact=True,            # Redact sensitive data
)

trace_context

from praisonaiagents import trace_context

with trace_context(emitter) as ctx:
    # All agent events go to your sink
    agent.chat("Hello")
# Automatically resets when done

Built-in Sinks

SinkPurpose
ContextNoOpSinkDoes nothing (zero overhead)
ContextListSinkCollects events in memory (testing)
ContextTraceWriterWrites to JSONL files (in praisonai)

CLI Commands

Use the CLI to manage traces:
praisonai replay list

Context Replay

Debug agent execution step-by-step

Observability

20+ provider integrations