Skip to main content
Push notifications deliver real-time messages from a PraisonAI gateway to subscribed clients over WebSocket (with HTTP polling fallback).

Quick Start

1

Enable Push on Gateway

from praisonaiagents import GatewayConfig, PushConfig

config = GatewayConfig(
    push=PushConfig(enabled=True)
)
2

Connect PushClient

from praisonaiagents.push import PushClient

client = PushClient("ws://localhost:8765/ws", auth_token="my-token")
await client.connect()
3

Subscribe and Receive Messages

@client.on("channel_message")
async def on_message(msg):
    print(f"Received on {msg.channel}: {msg.data}")

await client.subscribe("alerts")
await client.wait_closed()

Agent-Centric Example

from praisonaiagents import Agent
from praisonaiagents.push import PushClient

agent = Agent(
    name="alerts-agent",
    instructions="Watch the 'alerts' channel and summarise incoming events"
)

client = PushClient("ws://localhost:8765/ws", auth_token="my-token")
await client.connect()

@client.on("channel_message")
async def on_msg(msg):
    summary = agent.start(f"Summarise this alert: {msg.data}")
    print(summary)

await client.subscribe("alerts")
await client.wait_closed()

How It Works

ComponentPurpose
PushClientClient SDK with auto-reconnect and transport fallback
WebSocketPrimary real-time transport
HTTP PollingFallback transport for restricted networks
ChannelsNamed message streams for pub/sub
PresenceTrack online/offline status of clients
Delivery GuaranteesAt-least-once message delivery with ACKs

Configuration Options

PushConfig

OptionTypeDefaultDescription
enabledboolFalseFeature toggle (push is opt-in; zero overhead when disabled)
redisOptional[RedisConfig]NoneRedis config for cross-server scaling
presencePresenceConfigPresenceConfig()Presence tracking settings
deliveryDeliveryConfigDeliveryConfig()Delivery guarantee settings
pollingPollingConfigPollingConfig()Polling fallback settings

RedisConfig

OptionTypeDefaultDescription
urlOptional[str]NoneFull Redis URL (takes precedence over host/port)
hoststr"localhost"Redis host
portint6379Redis port
dbint0Redis database number
passwordOptional[str]NoneRedis password
prefixstr"praison:push:"Key prefix namespace
max_connectionsint20Connection pool size

PresenceConfig

OptionTypeDefaultDescription
enabledboolTrueToggle presence tracking
heartbeat_intervalint15Expected heartbeat frequency (seconds)
offline_timeoutint45Mark offline after this many seconds without heartbeat
broadcast_changesboolTrueBroadcast presence changes to subscribed channels

DeliveryConfig

OptionTypeDefaultDescription
enabledboolTrueToggle delivery guarantees
ack_timeoutint30Seconds to wait for ACK before retrying
max_retriesint3Maximum retry attempts
retry_backofffloat2.0Exponential backoff multiplier
message_ttlint86400How long to retain unacknowledged messages (seconds)
store_backendstr"memory""memory" or "redis"

PollingConfig

OptionTypeDefaultDescription
enabledboolTrueToggle polling fallback
long_poll_timeoutint30Long-poll hang duration (seconds)
max_batch_sizeint100Max messages per poll response

Common Patterns

Publish/Subscribe

from praisonaiagents.push import PushClient

# Publisher
publisher = PushClient("ws://localhost:8765/ws")
await publisher.connect()
await publisher.publish("events", {"type": "user_login", "user_id": 123})

# Subscriber
subscriber = PushClient("ws://localhost:8765/ws")
await subscriber.connect()

@subscriber.on("channel_message")
async def handle_event(msg):
    print(f"Event: {msg.data}")

await subscriber.subscribe("events")

One-shot Wait

from praisonaiagents.push import PushClient

client = PushClient("ws://localhost:8765/ws")
await client.connect()

# Wait for next message on channel with timeout
try:
    msg = await client.wait_for("notifications", timeout=60.0)
    print(f"Got notification: {msg.data}")
except asyncio.TimeoutError:
    print("No notification received within 60 seconds")

Presence Tracking

from praisonaiagents.push import PushClient

client = PushClient("ws://localhost:8765/ws")
await client.connect()

# Set your status
await client.set_status("available", {"role": "agent"})

# Query who's online
@client.on("presence_list")
async def on_presence(data):
    for user in data.get("clients", []):
        print(f"{user['client_id']}: {user['status']}")

await client.get_presence()

Best Practices

Enable Redis when running multiple gateway servers that need to share push channels:
from praisonaiagents import GatewayConfig, PushConfig, RedisConfig

config = GatewayConfig(
    push=PushConfig(
        enabled=True,
        redis=RedisConfig(
            host="redis.example.com",
            password="your-redis-password"
        )
    )
)
Without Redis, channels only exist on a single gateway instance.
Use memory for low-latency scenarios where message loss on restart is acceptable:
delivery=DeliveryConfig(store_backend="memory")
Use redis for high-availability scenarios where messages must survive server restarts:
delivery=DeliveryConfig(store_backend="redis")
Adjust based on network conditions and desired responsiveness:
# Fast networks, quick presence updates
presence=PresenceConfig(
    heartbeat_interval=10,
    offline_timeout=30
)

# Slow networks, conservative timeouts
presence=PresenceConfig(
    heartbeat_interval=30,
    offline_timeout=90
)
Polling is crucial for corporate firewalls and mobile networks that block WebSocket connections:
# Ensure fallback is enabled (default)
client = PushClient(
    "ws://localhost:8765/ws",
    fallback_to_polling=True
)
Clients automatically switch to HTTP polling if WebSocket connection fails.

Choosing a Transport


Gateway

The gateway that hosts push notification channels

A2A Push Notifications

Webhook-based task updates (different from real-time channels)