Skip to main content
Stream AI responses token-by-token as they’re generated, instead of waiting for the complete response.

Quick Start

1

Install

pip install praisonaiagents
2

Auto-detect (Default)

from praisonaiagents import Agent

agent = Agent(instructions="You are a helpful assistant")
# No stream= argument — the SDK auto-detects what the provider supports
agent.start("Write a short story")
By default the SDK tries streaming first and silently falls back to non-streaming if your provider’s sync client doesn’t support it — multi-agent workflows on providers like Deepseek now Just Work.
3

Force Streaming

from praisonaiagents import Agent

agent = Agent(instructions="You are a helpful assistant")

for chunk in agent.start("Write a short story", stream=True):
    print(chunk, end="", flush=True)

Choosing the Right Method

MethodStreamsDisplayBest For
start() (auto-detect)🎯 Auto✅ AutoRecommended — works everywhere
start(stream=True)✅ Yes✅ AutoForce streaming, interactive chat
iter_stream()✅ Always❌ NoApp integration, custom UIs
run()❌ No❌ NoProduction, batch processing
chat(stream=True)ConfigurableConfigurableLow-level control

Common Patterns

Terminal Streaming

from praisonaiagents import Agent

agent = Agent(instructions="You are a helpful assistant")

# Tokens appear as they arrive
for chunk in agent.start("Explain quantum computing", stream=True):
    print(chunk, end="", flush=True)

App Integration with iter_stream()

Best for integrating into your own application — yields raw chunks with no display overhead.
from praisonaiagents import Agent

agent = Agent(instructions="You are a helpful assistant")

full_response = ""
for chunk in agent.iter_stream("Write a haiku"):
    full_response += chunk
    # Send to your UI, WebSocket, or processing pipeline

print(full_response)

Streaming with Callbacks

Hook into every streaming event for fine-grained control.
from praisonaiagents import Agent
from praisonaiagents.streaming import StreamEvent, StreamEventType

def on_event(event: StreamEvent):
    if event.type == StreamEventType.DELTA_TEXT:
        print(event.content, end="", flush=True)
    elif event.type == StreamEventType.FIRST_TOKEN:
        print("⚡ First token received!")
    elif event.type == StreamEventType.STREAM_END:
        print("\n✅ Done!")

agent = Agent(instructions="You are a helpful assistant")
agent.stream_emitter.add_callback(on_event)
agent.start("Tell me a joke", stream=True)

FastAPI SSE Integration

Pipe streaming tokens directly to a web client using Server-Sent Events.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from praisonaiagents import Agent

app = FastAPI()

@app.get("/stream")
async def stream_response(prompt: str):
    agent = Agent(instructions="You are a helpful assistant")
    
    def generate():
        for chunk in agent.iter_stream(prompt):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

Async Streaming

import asyncio
from praisonaiagents import Agent

async def main():
    agent = Agent(instructions="You are a helpful assistant")
    result = await agent.astart("Write a poem", stream=True)
    print(result)

asyncio.run(main())

Streaming with Tools

When your agent uses tools, streaming happens in two phases: the initial response that decides to call tools, and a follow-up response that synthesizes the tool results.
from praisonaiagents import Agent, tool

@tool
def get_weather(city: str) -> str:
    """Get weather for a city."""
    return f"Weather in {city}: 72°F, sunny"

agent = Agent(
    instructions="You are a weather assistant",
    tools=[get_weather]
)

for chunk in agent.start("What's the weather in Paris?", stream=True):
    print(chunk, end="", flush=True)
Both phases go through the same retry-wrapped LLM path, so transient rate-limit or network errors are retried automatically without any caller intervention.

Error Handling in the Stream

If the LLM call fails after retries, the stream ends with a visible error sentence instead of silently dropping. You may receive this exact sentinel string:
[Error: Failed to generate final response after tool execution (ref: followup-1713957912345). Please retry. If it continues, try reducing prompt size.]
PartMeaning
ref: followup-<timestamp>Correlation ID logged server-side — share this when reporting issues
Please retryRetries already ran internally; another attempt may succeed if the root cause was transient
reducing prompt sizeCommon root cause is context-length or provider capacity errors
Detect the error sentinel in your stream consumer:
from praisonaiagents import Agent

agent = Agent(instructions="You are a helpful assistant", tools=[...])

full = ""
for chunk in agent.iter_stream("Research and summarize quantum computing"):
    full += chunk
    print(chunk, end="", flush=True)

if "[Error:" in full and "ref:" in full:
    # Surface ref to your logs / retry externally
    print(f"\n⚠️ Error detected, check logs for correlation ID")
The initial LLM call and the follow-up LLM call (after tool execution) now share the same retry and rate-limiting behavior — users no longer need to add their own retry wrapper around streaming + tools.

StreamEvent Protocol

Every streaming chunk emits a StreamEvent with full context.
EventWhen
REQUEST_STARTBefore API call
HEADERS_RECEIVEDHTTP 200 arrives
FIRST_TOKENFirst content delta (TTFT marker)
DELTA_TEXTEach text chunk
DELTA_TOOL_CALLTool call streaming
LAST_TOKENFinal content delta
STREAM_ENDStream completed

Metrics

Track Time To First Token (TTFT) and throughput.
from praisonaiagents import Agent
from praisonaiagents.streaming import StreamEvent, StreamEventType, StreamMetrics

metrics = StreamMetrics()

def on_event(event: StreamEvent):
    metrics.update_from_event(event)
    if event.type == StreamEventType.DELTA_TEXT:
        print(event.content, end="", flush=True)

agent = Agent(instructions="You are a helpful assistant")
agent.stream_emitter.add_callback(on_event)
agent.start("Explain AI briefly", stream=True)

print(metrics.format_summary())
# Output: TTFT: 245ms | Stream: 1200ms | Total: 1445ms | Tokens: 150 (125.0/s)
MetricDescription
TTFTTime from request to first token (provider latency)
Stream DurationFrom first to last token
Total TimeEnd-to-end request time
Tokens/sToken generation rate

Key Concepts

Time To First Token (TTFT)

Request → [TTFT] → First Token → [Streaming] → Last Token → Done
TTFT is the time before the first token arrives. This is provider latency — the model must process your prompt before generating. Streaming does NOT reduce TTFT, but it shows progress immediately.

Streaming vs Non-Streaming

ModeBehaviorUse Case
stream=None (default)Try streaming, fall back to non-streaming if unsupportedRecommended — works across all providers
stream=TrueForce streaming (errors on sync adapters that don’t support it)When you definitely want tokens
stream=FalseForce non-streamingBatch jobs, structured output, sync providers
Sync vs Async Adapters: Async methods (achat, astart, _execute_unified_achat_completion) still default to stream=True because async adapters universally support streaming. Sync methods (chat, start, run) use the new smart-fallback default. Some adapters (e.g., sync OpenAI/Deepseek adapter) currently do NOT support sync streaming and will trigger the fallback.

CLI Usage

# Stream responses in terminal
praisonai chat --stream "Tell me a joke"

# With verbose output
praisonai chat --stream --verbose "Explain quantum computing"

Best Practices

Omit the stream argument (or pass stream=None) and the SDK will choose streaming where supported and silently fall back where it isn’t. Only override when you have a specific reason.
iter_stream() yields raw chunks with zero display overhead — ideal for piping into FastAPI, WebSocket, or custom UIs.
start() handles display automatically. Pass stream=True for real-time token output in interactive sessions.
High TTFT indicates model or network issues. Use StreamMetrics to track and optimize.
Two layers of error handling. Callback exceptions are still caught by the emitter to avoid breaking the stream — log them inside your callback. LLM call failures, however, are now retried automatically and, on persistent failure, surface as a visible [Error: ... (ref: ...)] sentence at the end of the stream — check for this sentinel when consuming iter_stream().

Troubleshooting

”Streaming seems to buffer before showing anything”

This is TTFT, not buffering. The model is generating the first token. Check:
  • Model complexity (larger models have higher TTFT)
  • Prompt length (longer prompts take longer to process)
  • Network latency to the API

”Tokens appear in chunks, not one at a time”

Normal. Providers may batch tokens for efficiency.

”Stream ends with [Error: Failed to generate final response after tool execution (ref: followup-...)]

The follow-up LLM call (the one that synthesizes tool results into a final answer) failed after the built-in retries. Common causes:
  • Persistent rate limit — pair streaming with a Rate Limiter at higher RPM, or back off the caller.
  • Context-length overflow — reduce conversation history or tool-result size.
  • Provider outage — include the ref: ID when reporting. The internal log line (ref=..., model=..., error=...) makes it searchable.

”Streaming is not supported in sync OpenAIAdapter” / Deepseek multi-agent crash

Fixed in PraisonAI 4.6.47+ (PR #1734). Earlier versions defaulted sync chat to stream=True, which crashed on sync-only providers like Deepseek. Upgrade, or pass stream=False explicitly if you can’t.

Output & Display

Output formatting options

Async

Async agent execution

Rate Limiter

Control request rates across initial and follow-up LLM calls