Documentation Index Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Send agent events to any destination - HTTP endpoints, databases, cloud services, or your own systems.
PraisonAI uses a protocol-driven design. Implement 3 methods and your sink works with any agent.
Quick Start
Create Your Sink
Implement 3 methods: emit(), flush(), close() class MySink :
def emit ( self , event ):
print ( f "Event: { event . event_type . value } " )
def flush ( self ):
pass
def close ( self ):
pass
Connect to Agent
Use trace_context() to capture all agent events from praisonaiagents import Agent , ContextTraceEmitter , trace_context
sink = MySink ()
emitter = ContextTraceEmitter ( sink = sink , session_id = " my-session " , enabled = True )
with trace_context ( emitter ):
agent = Agent ( instructions = " You are helpful " )
agent . chat ( " Hello! " ) # Events go to MySink
How It Works
The Protocol
Your sink must implement 3 methods. That’s it!
class ContextTraceSinkProtocol :
def emit ( self , event ): ... # Receive event
def flush ( self ): ... # Flush buffer
def close ( self ): ... # Release resources
ContextTraceSink is a backward-compatible alias for ContextTraceSinkProtocol.
Method Purpose emit(event)Receive a trace event flush()Flush buffered events close()Release resources
Example Sinks
Send events to a remote HTTP endpoint with batching. class HTTPSink :
def __init__ ( self , url , batch_size = 10 ):
self . url = url
self . buffer = []
self . batch_size = batch_size
def emit ( self , event ):
self . buffer . append ( event . to_dict ())
if len ( self . buffer ) >= self . batch_size :
self . flush ()
def flush ( self ):
if self . buffer :
import requests
requests . post ( self . url , json = self . buffer )
self . buffer . clear ()
def close ( self ):
self . flush ()
Store events in SQLite database for later analysis. class SQLiteSink :
def __init__ ( self , db_path = " traces.db " ):
import sqlite3
self . conn = sqlite3 . connect ( db_path )
self . conn . execute ( """
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
session_id TEXT,
event_type TEXT,
agent_name TEXT,
timestamp REAL,
data TEXT
)
""" )
def emit ( self , event ):
import json
self . conn . execute (
" INSERT INTO events VALUES (NULL, ?, ?, ?, ?, ?) " ,
( event . session_id , event . event_type . value ,
event . agent_name , event . timestamp , json . dumps ( event . data ))
)
def flush ( self ):
self . conn . commit ()
def close ( self ):
self . flush ()
self . conn . close ()
Print events to console for debugging. class ConsoleSink :
def emit ( self , event ):
print ( f "[ { event . event_type . value } ] { event . agent_name or ' session ' } " )
def flush ( self ):
pass
def close ( self ):
pass
Event Types
Event Description session_startSession begins session_endSession ends agent_startAgent starts agent_endAgent finishes agent_handoffAgent hands off message_addedMessage added tool_call_startTool starts tool_call_endTool finishes llm_requestLLM request sent llm_responseLLM response received memory_storeMemory stored memory_searchMemory searched knowledge_addKnowledge added knowledge_searchKnowledge searched
Event Data
event . event_type # ContextEventType enum
event . timestamp # Unix timestamp
event . session_id # Session identifier
event . agent_name # Agent name (if applicable)
event . sequence_num # Event sequence number
event . data # Event-specific data dict
event . to_dict () # Convert to dictionary
event . to_json () # Convert to JSON string
Architecture
Best Practices
Buffer Events Batch events before sending to reduce network calls
Handle Errors Sink errors are caught silently - tracing never crashes agents
Use trace_context Automatic cleanup even if exceptions occur
Thread Safety Use locks if your sink is accessed from multiple threads
Zero Overhead
When tracing is disabled, there’s zero performance impact :
# Disabled by default - zero overhead
emitter = ContextTraceEmitter ( sink = sink , enabled = False )
# Or use NoOpSink
from praisonaiagents import ContextNoOpSink
emitter = ContextTraceEmitter ( sink = ContextNoOpSink ())
API Reference
ContextTraceEmitter
from praisonaiagents import ContextTraceEmitter
emitter = ContextTraceEmitter (
sink = my_sink , # Your sink implementation
session_id = " my-session " , # Session identifier
enabled = True , # Enable/disable tracing
redact = True , # Redact sensitive data
)
trace_context
from praisonaiagents import trace_context
with trace_context ( emitter ) as ctx :
# All agent events go to your sink
agent . chat ( " Hello " )
# Automatically resets when done
Built-in Sinks
Sink Purpose ContextNoOpSinkDoes nothing (zero overhead) ContextListSinkCollects events in memory (testing) ContextTraceWriterWrites to JSONL files (in praisonai)
CLI Commands
Use the CLI to manage traces:
List Traces
Replay Session
Show Events
Cleanup Old Traces
Context Replay Debug agent execution step-by-step
Observability 20+ provider integrations