Skip to main content

Workflow Patterns

PraisonAI provides four powerful workflow patterns that can be combined to create complex, production-ready workflows.

Quick Comparison

PatternPurposeUse When
route()Decision-based branchingOutput determines next steps
parallel()Concurrent executionIndependent tasks can run together
loop()Iterate over dataProcessing lists, CSV files
repeat()Repeat until conditionIterative improvement

Import

from praisonaiagents import Workflow, WorkflowContext, StepResult
# Or use Pipeline (alias for Workflow)
from praisonaiagents import Pipeline
from praisonaiagents.workflows import route, parallel, loop, repeat
Pipeline and Workflow are the same class. Use whichever term you prefer!

Pattern Overview

1. Routing (Decision Branching)

Route to different steps based on previous output:
workflow = Workflow(steps=[
    classifier,
    route({
        "approve": [approve_handler],
        "reject": [reject_handler],
        "default": [fallback]
    })
])
📖 Full Documentation →

2. Parallel (Concurrent Execution)

Execute multiple steps at the same time:
workflow = Workflow(steps=[
    parallel([research_a, research_b, research_c]),
    aggregator  # Combines results
])
📖 Full Documentation →

3. Loop (Iterate Over Data)

Process each item in a list or file:
# From list
workflow = Workflow(
    steps=[loop(processor, over="items")],
    variables={"items": ["a", "b", "c"]}
)

# From CSV
workflow = Workflow(steps=[
    loop(processor, from_csv="data.csv")
])
📖 Full Documentation →

4. Repeat (Evaluator-Optimizer)

Repeat until a condition is met:
workflow = Workflow(steps=[
    repeat(
        generator,
        until=lambda ctx: "done" in ctx.previous_result,
        max_iterations=5
    )
])
📖 Full Documentation →

Combining Patterns

Patterns can be combined for complex workflows:
workflow = Workflow(steps=[
    # Step 1: Parallel research from multiple sources
    parallel([
        research_market,
        research_competitors,
        research_customers
    ]),
    
    # Step 2: Route based on findings
    analyze_findings,
    route({
        "positive": [expand_analysis],
        "negative": [summarize_concerns],
        "default": [standard_report]
    }),
    
    # Step 3: Process each recommendation
    loop(process_recommendation, over="recommendations"),
    
    # Step 4: Refine until quality threshold
    repeat(
        refine_output,
        until=meets_quality_threshold,
        max_iterations=3
    ),
    
    # Step 5: Final output
    format_final_report
])

Common Workflow Architectures

Orchestrator-Worker

workflow = Workflow(steps=[
    orchestrator,  # Decides which workers to use
    route({
        "type_a": [worker_a],
        "type_b": [worker_b],
        "type_c": [worker_c]
    }),
    synthesizer  # Combines worker outputs
])

Fan-Out/Fan-In

workflow = Workflow(steps=[
    splitter,  # Splits work into parts
    parallel([processor_1, processor_2, processor_3]),
    aggregator  # Combines results
])

Batch Processing Pipeline

workflow = Workflow(steps=[
    loop(validate_item, from_csv="input.csv"),
    loop(transform_item, over="validated_items"),
    loop(load_item, over="transformed_items"),
    generate_report
])

Self-Improving Agent

workflow = Workflow(steps=[
    initial_generator,
    repeat(
        improve_output,
        until=quality_check,
        max_iterations=5
    ),
    final_polish
])

Pattern Selection Guide

Best Practices

1. Start Simple

# Start with sequential
workflow = Workflow(steps=[step1, step2, step3])

# Add patterns as needed
workflow = Workflow(steps=[
    step1,
    parallel([step2a, step2b]),  # Optimize with parallel
    step3
])

2. Handle Errors

def safe_step(ctx: WorkflowContext) -> StepResult:
    try:
        # Your logic
        return StepResult(output="Success")
    except Exception as e:
        return StepResult(output=f"Error: {e}")

3. Use Verbose Mode

result = workflow.start("input", verbose=True)
# Shows step-by-step progress

4. Track State with Variables

def my_step(ctx: WorkflowContext) -> StepResult:
    count = ctx.variables.get("count", 0) + 1
    return StepResult(
        output=f"Count: {count}",
        variables={"count": count}
    )

API Reference

FunctionSignature
route()route(routes: Dict[str, List], default: Optional[List] = None)
parallel()parallel(steps: List)
loop()loop(step, over=None, from_csv=None, from_file=None, var_name="item")
repeat()repeat(step, until=None, max_iterations=10)

See Also