Skip to main content

Event Bus Module

The Event Bus provides a typed, publish-subscribe event system for PraisonAI Agents. It enables decoupled communication between components with support for both synchronous and asynchronous subscribers.

Features

  • Typed Events - Predefined event types for common operations
  • Sync/Async Subscribers - Support for both synchronous and async handlers
  • Event Filtering - Subscribe to specific event types
  • Event History - Optional event history tracking
  • Global Default Bus - Shared bus instance for application-wide events

Installation

The Event Bus is included in the core praisonaiagents package:
pip install praisonaiagents

Quick Start

from praisonaiagents.bus import EventBus, Event, EventType

# Create an event bus
bus = EventBus()

# Subscribe to events
def on_message(event):
    print(f"Received: {event.data}")

bus.subscribe(on_message, event_type=EventType.MESSAGE_CREATED)

# Publish an event
bus.publish(Event(
    type=EventType.MESSAGE_CREATED,
    data={"text": "Hello, World!"}
))

Event Types

The following event types are available:
Event TypeDescription
SESSION_CREATEDNew session created
SESSION_UPDATEDSession modified
SESSION_DELETEDSession deleted
SESSION_FORKEDSession forked
MESSAGE_CREATEDNew message added
TOOL_STARTEDTool execution started
TOOL_COMPLETEDTool execution completed
AGENT_STARTEDAgent execution started
AGENT_COMPLETEDAgent execution completed
SNAPSHOT_CREATEDFile snapshot created
COMPACTION_COMPLETEDContext compaction done
CUSTOMCustom event type

API Reference

EventBus

class EventBus:
    def subscribe(
        self,
        callback: Callable[[Event], None],
        event_type: Optional[EventType] = None
    ) -> str:
        """Subscribe to events. Returns subscription ID."""
    
    def unsubscribe(self, subscription_id: str) -> bool:
        """Unsubscribe from events."""
    
    def publish(self, event: Event) -> None:
        """Publish an event to all subscribers."""
    
    async def publish_async(self, event: Event) -> None:
        """Publish an event asynchronously."""
    
    def get_history(self, limit: int = 100) -> List[Event]:
        """Get recent event history."""

Event

@dataclass
class Event:
    type: EventType  # Event type
    data: Dict[str, Any]  # Event payload
    source: Optional[str]  # Source identifier
    timestamp: float  # Unix timestamp
    id: str  # Unique event ID

Examples

Async Subscriber

import asyncio
from praisonaiagents.bus import EventBus, Event, EventType

bus = EventBus()

async def async_handler(event):
    await asyncio.sleep(0.1)
    print(f"Async received: {event.data}")

bus.subscribe(async_handler, event_type=EventType.TOOL_COMPLETED)

# Publish asynchronously
await bus.publish_async(Event(
    type=EventType.TOOL_COMPLETED,
    data={"tool": "bash", "result": "success"}
))

Global Event Bus

from praisonaiagents.bus import get_default_bus, Event, EventType

# Get the global bus instance
bus = get_default_bus()

# All components can share this bus
bus.publish(Event(type=EventType.CUSTOM, data={"action": "startup"}))

Event History

bus = EventBus(history_size=1000)

# Publish some events
for i in range(10):
    bus.publish(Event(type=EventType.CUSTOM, data={"index": i}))

# Get recent history
history = bus.get_history(limit=5)
print(f"Last 5 events: {len(history)}")

Integration with Agents

The Event Bus integrates with PraisonAI Agents to provide real-time notifications:
from praisonaiagents import Agent
from praisonaiagents.bus import get_default_bus, EventType

bus = get_default_bus()

# Monitor agent activity
bus.subscribe(
    lambda e: print(f"Agent started: {e.data}"),
    event_type=EventType.AGENT_STARTED
)

bus.subscribe(
    lambda e: print(f"Tool used: {e.data}"),
    event_type=EventType.TOOL_COMPLETED
)

# Agent will emit events during execution
agent = Agent(name="Assistant")
agent.chat("Hello!")