Skip to main content
GatewayClient is a reconnecting WebSocket client that handles version negotiation, exponential backoff with jitter, and event-sequence gap detection — so your integration stays connected without you writing the socket loop.

Quick Start

1

Simplest Usage

import asyncio
from praisonai.gateway import GatewayClient

async def main():
    client = GatewayClient(
        url="ws://localhost:8765",
        agent_id="my-agent",
        reconnect=True
    )

    await client.connect()

    async for event in client.events():
        print(f"Event: {event.type}, Data: {event.data}")

asyncio.run(main())
2

With Backoff Tuning

import asyncio
from praisonai.gateway import GatewayClient, BackoffConfig

async def main():
    client = GatewayClient(
        url="ws://localhost:8765",
        agent_id="my-agent",
        reconnect=True,
        backoff=BackoffConfig(
            initial=1.0,
            max=30.0,
            multiplier=2.0,
            jitter=0.2
        )
    )

    await client.connect()

    async for event in client.events():
        print(f"Event: {event.type}")

asyncio.run(main())
3

With Gap and State Callbacks

import asyncio
from praisonai.gateway import GatewayClient, BackoffConfig

async def main():
    client = GatewayClient(
        url="ws://localhost:8765",
        agent_id="my-agent",
        reconnect=True,
    )

    def on_gap(expected: int, received: int):
        print(f"Gap detected: expected {expected}, got {received}")
        asyncio.create_task(client.resync())

    def on_state_change(state: str):
        print(f"Connection state changed: {state}")

    client.on_gap = on_gap
    client.on_state_change = on_state_change

    await client.connect()

    async for event in client.events():
        print(f"Event: {event.type}")

asyncio.run(main())

How It Works

StageWhat happens
ConnectTCP/WS opened; join includes min_version, max_version, optional agent_id, token
NegotiateServer picks min(client_max, MAX_PROTOCOL_VERSION) or rejects with version_unsupported
StreamEach event carries a monotonic sequence; client tracks _expected_sequence
Dropon_state_change("reconnecting") fires; backoff with jitter (initial * multiplier^attempts, clamped to max)
ResumeRe-join with session_id + since=cursor; server replays from cursor; presence + health returned
GapIf received sequence ≠ expected, on_gap(expected, received) fires; caller can await client.resync()

Configuration Options

GatewayClient constructor:
OptionTypeDefaultDescription
urlstrWebSocket URL (e.g. ws://localhost:8765)
agent_idstrAgent ID to join as
tokenOptional[str]NoneAuth token, appended as ?token= query param
reconnectboolTrueAuto-reconnect on disconnect
backoffOptional[BackoffConfig]BackoffConfig()Backoff configuration
max_reconnect_attemptsOptional[int]None (infinite)Max reconnect attempts before giving up
BackoffConfig fields:
OptionTypeDefaultDescription
initialfloat1.0Initial delay in seconds
maxfloat30.0Maximum delay in seconds
multiplierfloat2.0Backoff multiplier per attempt
jitterfloat0.2Random jitter factor (0–1)
Connection states (from ConnectionState):
StateMeaning
disconnectedNo active connection
connectingAttempting initial connection
connectedJoined and streaming events
reconnectingWaiting before next retry
Callbacks (set as attributes on the client instance):
AttributeSignatureWhen it fires
on_gapCallable[[int, int], None]A monotonic sequence gap is detected (expected, received)
on_state_changeCallable[[str], None]Connection state changes

Protocol Version Negotiation

The client and server negotiate a protocol version during the join handshake.
  • Constants in protocols.py: PROTOCOL_VERSION = 1, MIN_PROTOCOL_VERSION = 1, MAX_PROTOCOL_VERSION = 1.
  • Client sends min_version and max_version in the join message.
  • Server replies in joined with protocol_version, server_min_version, server_max_version.
version_unsupported is a permanent error. When the server returns {"type": "error", "code": "version_unsupported"}, GatewayClient.connect() raises ValueError and does not retry. Wrap your connect() call in a try/except ValueError and do not loop on this error.
Invalid min_version/max_version fields (non-integer, or min > max) produce code: "invalid_protocol_hello" and raise ConnectionError.

Gap Detection

Every event carries a monotonic sequence field; the client tracks _expected_sequence and fires on_gap when there’s a mismatch.
from praisonaiagents.gateway import GatewayEvent

# Events from the server carry sequence numbers:
# event.sequence = 1, 2, 3, ...
# If the client receives sequence=5 when expecting 4, on_gap(4, 5) fires.
on_gap is a synchronous Callable[[int, int], None]. You cannot await inside it. To call client.resync() from within the callback, schedule it as a task:
def on_gap(expected: int, received: int):
    asyncio.create_task(client.resync())

client.on_gap = on_gap
client.resync() resets the cursor to 0 and reconnects, triggering a full state reload from the server.

Resume Snapshot

When reconnecting with a stored session_id and since=cursor, the server replies with a single joined payload that restores full client state in one round trip. The joined payload includes:
FieldDescription
session_idSession identifier
cursorCurrent event cursor position
resumedTrue if this is a reconnection
sequenceCurrent sequence number (aligned with replay)
protocol_versionNegotiated protocol version
server_min_versionServer’s minimum supported version
server_max_versionServer’s maximum supported version
presenceList of presence dicts for all connected clients
healthGateway health dict
This means a reconnecting client learns current presence and health without extra requests — one round trip restores all state.

Common Patterns

import asyncio
from praisonai.gateway import GatewayClient

async def main():
    client = GatewayClient(
        url="ws://localhost:8765",
        agent_id="consumer",
        reconnect=True,
    )

    await client.connect()

    async for event in client.events():
        print(f"{event.type}: {event.data}")

asyncio.run(main())

Network Blip: What Users See

When a network blip occurs, the client handles reconnection silently — users see no errors and events resume from the cursor automatically. The bot keeps responding because:
  1. Events are buffered until the cursor is confirmed
  2. The since=cursor on reconnect replays any events delivered while offline
  3. Users see continuous responses with no error messages

Best Practices

The defaults (initial=1.0, max=30.0, multiplier=2.0) suit residential and cellular networks. For LAN-only deployments with fast recovery, tighten initial to 0.1 and max to 5.0.
from praisonai.gateway import BackoffConfig

lan_backoff = BackoffConfig(initial=0.1, max=5.0, multiplier=2.0, jitter=0.1)
connect() raises ValueError on version_unsupported and stops retrying. Wrap it and alert your ops team — this means the server and client are incompatible and need a coordinated upgrade.
try:
    await client.connect()
except ValueError as e:
    # Permanent — do not loop, alert immediately
    alert_ops(f"Protocol mismatch: {e}")
    raise
If your app already has its own snapshot mechanism, prefer it over resync(). A targeted snapshot is faster than a full cursor reset.
def on_gap(expected: int, received: int):
    if received - expected > 100:
        # Large gap — use app-level snapshot
        asyncio.create_task(app.load_snapshot())
    else:
        # Small gap — let cursor replay handle it
        pass
Long-running batch jobs should not retry forever on a dead gateway. Set max_reconnect_attempts so the job fails fast and can be requeued.
client = GatewayClient(
    url="ws://localhost:8765",
    agent_id="batch-processor",
    max_reconnect_attempts=3,
)

Gateway

WebSocket control plane for multi-agent coordination

Gateway Overview

Architecture and deployment patterns

Session Persistence

How sessions survive across processes

Push Notifications

Channel-based pub/sub and delivery guarantees