Skip to main content

praisonaiagents.bus

Core SDK Event Bus Module for PraisonAI Agents. Provides a typed publish/subscribe event system for real-time communication between components. Extends the existing hooks system with a more general event-driven architecture. Features:
  • Typed event definitions with dataclass payloads
  • Sync and async subscribers
  • Event filtering by type
  • Global and scoped event buses
  • SSE-compatible event streaming
Zero Performance Impact:
  • All imports are lazy loaded
  • No overhead when not subscribed
  • Optional dependency for server features
Usage: from praisonaiagents.bus import EventBus, Event

Create event bus

bus = EventBus()

Subscribe to events

@bus.on(“session.created”) def handle_session(event): print(f”Session created: {event.data}“)

Publish events

bus.publish(“session.created”, {“session_id”: “abc123”})

Overview

This module provides components for bus.