Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Workflow Parallel Execution

Execute multiple steps concurrently and combine their results. This pattern is ideal for independent tasks that can run simultaneously.

Quick Start

from praisonaiagents import AgentFlow, WorkflowContext, StepResult
from praisonaiagents import parallel
import time

# Parallel workers
def research_market(ctx: WorkflowContext) -> StepResult:
    time.sleep(0.1)  # Simulate work
    return StepResult(output="📊 Market: Growth 15% YoY")

def research_competitors(ctx: WorkflowContext) -> StepResult:
    time.sleep(0.1)  # Simulate work
    return StepResult(output="🏢 Competitors: 3 major players")

def research_customers(ctx: WorkflowContext) -> StepResult:
    time.sleep(0.1)  # Simulate work
    return StepResult(output="👥 Customers: 85% satisfaction")

# Aggregator
def summarize(ctx: WorkflowContext) -> StepResult:
    outputs = ctx.variables.get("parallel_outputs", [])
    summary = "📋 SUMMARY:\n" + "\n".join(f"  • {o}" for o in outputs)
    return StepResult(output=summary)

# Create workflow
workflow = AgentFlow(steps=[
    parallel([research_market, research_competitors, research_customers]),
    summarize
])

result = workflow.start("Analyze business", )
print(result["output"])
Output:
⚡ Running 3 steps in parallel...
✅ Parallel complete: 3 results
✅ summarize: 📋 SUMMARY:...

📋 SUMMARY:
  • 📊 Market: Growth 15% YoY
  • 🏢 Competitors: 3 major players
  • 👥 Customers: 85% satisfaction

API Reference

parallel()

parallel(
    steps: List, 
    max_workers: Optional[int] = None,
    on_failure: str = "partial_ok"
) -> Parallel

Parameters

ParameterTypeDefaultDescription
stepsListList of steps to execute concurrently
max_workersOptional[int]NoneCap on ThreadPoolExecutor workers. Defaults to min(3, len(steps))
on_failure"partial_ok" | "fail_fast" | "fail_all""partial_ok"Failure-handling strategy

Accessing Results

After parallel execution, results are available in ctx.variables:
VariableTypeDescription
parallel_outputsList[str]List of all outputs in order
def aggregator(ctx: WorkflowContext) -> StepResult:
    outputs = ctx.variables["parallel_outputs"]
    # outputs = ["Result A", "Result B", "Result C"]
    return StepResult(output=f"Combined: {len(outputs)} results")

Examples

With Agents

from praisonaiagents import Agent

researcher = Agent(name="Researcher", role="Research topics")
analyst = Agent(name="Analyst", role="Analyze data")
writer = Agent(name="Writer", role="Write content")

workflow = AgentFlow(steps=[
    parallel([researcher, analyst, writer]),
    final_aggregator
])

Mixed Steps

workflow = AgentFlow(steps=[
    parallel([
        my_function,           # Function
        Agent(name="Bot"),     # Agent
        Task(...)      # Task
    ]),
    aggregator
])

Nested Parallel

workflow = AgentFlow(steps=[
    parallel([
        parallel([task_a1, task_a2]),  # Group A
        parallel([task_b1, task_b2])   # Group B
    ]),
    final_aggregator
])

Performance

Parallel execution uses Python’s ThreadPoolExecutor:
  • Concurrent I/O: Ideal for API calls, file operations
  • Thread-safe: Each step gets its own copy of variables
  • Automatic joining: All results collected before next step
# Performance comparison
# Sequential: 3 steps × 1s each = 3s total
# Parallel:   3 steps × 1s each = ~1s total (concurrent)

Use Cases

Use CaseDescription
Multi-source ResearchQuery multiple APIs simultaneously
Data ProcessingProcess independent data chunks
Report GenerationGenerate sections in parallel
ValidationRun multiple validators concurrently
A/B ComparisonRun different approaches and compare

Best Practices

  1. Independent tasks only - Parallel steps shouldn’t depend on each other
  2. Handle errors gracefully - One failure shouldn’t break all tasks
  3. Aggregate results - Always follow with a step that combines outputs
  4. Consider rate limits - Don’t overwhelm external APIs

Failure Handling

Choose how a parallel block reacts when a branch fails using the on_failure parameter.

Failure Strategies

StrategyBehaviorUse Case
partial_okContinue with partial results. Failed branches return "Error: <msg>"Data aggregation where some sources may be unavailable
fail_fastCancel remaining branches and raise WorkflowStepError on first failureCritical workflows where any failure invalidates results
fail_allWait for all branches, then raise WorkflowStepError if any failedComprehensive error reporting and debugging

Examples

from praisonaiagents import Agent, Workflow, parallel, WorkflowStepError

# partial_ok (default) — continue even if one branch fails
workflow = Workflow(steps=[
    parallel([agent_a, agent_b, agent_c]),
    aggregator,
])

# fail_fast — abort on first failure
workflow = Workflow(steps=[
    parallel([agent_a, agent_b, agent_c], on_failure="fail_fast"),
    aggregator,
])

# fail_all — gather all errors then fail
workflow = Workflow(steps=[
    parallel([agent_a, agent_b, agent_c], on_failure="fail_all"),
    aggregator,
])

Error Handling with partial_ok

def aggregator(ctx: WorkflowContext) -> StepResult:
    outputs = ctx.variables["parallel_outputs"]
    
    # Filter successful results
    successful = [o for o in outputs if not o.startswith("Error:")]
    errors = [o for o in outputs if o.startswith("Error:")]
    
    if len(successful) >= 2:  # Minimum threshold
        return StepResult(output=f"Processed {len(successful)} sources")
    else:
        return StepResult(output=f"Insufficient data: {len(errors)} failures")

Exception Handling with fail_fast/fail_all

try:
    result = workflow.start("Process all branches")
except WorkflowStepError as e:
    print(f"Workflow failed: {e}")
    print(f"Root cause: {e.cause}")
    for error in e.errors:
        print(f"  Branch {error['step']}: {error['error']}")

See Also