Skip to main content
Inbound Journal tracks messages before agents process them, providing deduplication of webhook redeliveries and crash-safe replay of in-flight messages.

Quick Start

1

Create journal that survives restarts

from praisonaiagents import Agent
from praisonai.bots import InboundJournal, BotSessionManager

# 1. Create a journal that survives restarts
journal = InboundJournal(path="~/.praisonai/state/ingress.sqlite")
2

Enable on your bot

# 2. Plug it into the session manager
session = BotSessionManager(platform="telegram", ingress_journal=journal)

# 3. Use your agent normally — duplicates and crashes are now handled for you
agent = Agent(name="Support Bot", instructions="Help users with their questions")

How It Works

StagePurposeWhat happens
receiveDeduplicationReturns key for new messages, None for duplicates
claimCrash protectionMarks entry as being processed
completeCleanupMarks successful processing

When to use which option

FeatureInboundJournalInboundDLQ
When triggeredEvery inbound messageOnly on agent failure
SolvesWebhook redeliveries, crash recoveryFailed LLM calls
PerformanceFast dedup checkNo overhead until failure
Use together✅ Journal → agent → DLQ on exception✅ Complete durability stack

Configuration Options

OptionTypeDefaultDescription
pathstr | PathrequiredSQLite file path. Parent dirs auto-created. ~ is expanded.
max_sizeint50_000Max entries kept. Excess evicted: completed first, then oldest pending.
ttl_secondsint2_592_000 (30 days)Completed entries older than this are evicted.
claim_timeoutint300 (5 min)Claimed entries older than this are considered stale and replayed.

Per-platform examples

from praisonaiagents import Agent
from praisonai.bots import InboundJournal, BotSessionManager

journal = InboundJournal(path="~/.praisonai/telegram-journal.sqlite")
session = BotSessionManager(platform="telegram", ingress_journal=journal)

agent = Agent(
    name="Telegram Bot",
    instructions="Respond helpfully to Telegram users"
)

# In your webhook handler:
response = await session.chat(
    agent=agent,
    user_id=update.effective_user.id,
    prompt=update.message.text,
    message_id=str(update.message.message_id),
    account="my_telegram_bot"
)

Common Patterns

Pattern 1: Dedup-only (webhook redelivery protection)

# Minimal setup for webhook deduplication
journal = InboundJournal(
    path="~/.praisonai/dedup.sqlite",
    claim_timeout=60  # Fast timeout for quick processing
)

session = BotSessionManager(platform="telegram", ingress_journal=journal)
# Duplicate webhooks now return empty string and skip agent execution

Pattern 2: Crash recovery + replay loop on startup

# Setup with crash recovery
journal = InboundJournal(
    path="~/.praisonai/durable.sqlite",
    claim_timeout=600  # 10 minutes for complex agent tasks
)

# On startup, replay any stale claimed entries
replayed_count = journal.replay()
print(f"Replayed {replayed_count} stale entries from previous crash")

session = BotSessionManager(platform="discord", ingress_journal=journal)

Pattern 3: Combining with InboundDLQ for full durability stack

from praisonai.bots import InboundJournal, InboundDLQ, BotSessionManager

# Both journal and DLQ for complete durability
journal = InboundJournal(path="~/.praisonai/journal.sqlite")
dlq = InboundDLQ(path="~/.praisonai/dlq.sqlite")

session = BotSessionManager(
    platform="slack",
    ingress_journal=journal,  # Handles dedup + crash recovery
    dlq=dlq                   # Handles agent failures
)

# Flow: webhook → journal.receive() → agent.chat() → dlq on exception

Best Practices

The journal’s SQLite file must survive restarts for crash recovery to work. Use an absolute path or a location that persists across deployments.
# ✅ Good: persists across restarts
journal = InboundJournal(path="/var/lib/myapp/journal.sqlite")

# ❌ Bad: temporary path, lost on restart
journal = InboundJournal(path="/tmp/journal.sqlite")
Set claim_timeout to be longer than your p99 agent.chat() latency to avoid false stale entry detection.
# If your agent takes up to 30 seconds, use a longer timeout
journal = InboundJournal(
    path="~/.praisonai/journal.sqlite",
    claim_timeout=900  # 15 minutes safety margin
)
Always replay stale entries when your bot starts to recover from crashes.
def startup_hook():
    # Replay any messages that were claimed but not completed
    replayed = journal.replay()
    logger.info(f"Startup replay: {replayed} messages recovered")

# Call this before starting your bot's event loop
startup_hook()
The account parameter is part of the deduplication key. Keep it consistent for each bot instance.
# ✅ Good: stable account identifier
ACCOUNT = "production_telegram_bot_v1"

await session.chat(..., account=ACCOUNT)

# ❌ Bad: changing account breaks deduplication
await session.chat(..., account=f"bot_{random.randint(1,100)}")

Inbound DLQ

Failure-side durability when agent execution fails

Bot Routing

Multi-channel session routing for complex bot setups