Documentation Index
Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Workflow Parallel Execution
Execute multiple steps concurrently and combine their results. This pattern is ideal for independent tasks that can run simultaneously.
Quick Start
from praisonaiagents import AgentFlow, WorkflowContext, StepResult
from praisonaiagents import parallel
import time
# Parallel workers
def research_market(ctx: WorkflowContext) -> StepResult:
time.sleep(0.1) # Simulate work
return StepResult(output="📊 Market: Growth 15% YoY")
def research_competitors(ctx: WorkflowContext) -> StepResult:
time.sleep(0.1) # Simulate work
return StepResult(output="🏢 Competitors: 3 major players")
def research_customers(ctx: WorkflowContext) -> StepResult:
time.sleep(0.1) # Simulate work
return StepResult(output="👥 Customers: 85% satisfaction")
# Aggregator
def summarize(ctx: WorkflowContext) -> StepResult:
outputs = ctx.variables.get("parallel_outputs", [])
summary = "📋 SUMMARY:\n" + "\n".join(f" • {o}" for o in outputs)
return StepResult(output=summary)
# Create workflow
workflow = AgentFlow(steps=[
parallel([research_market, research_competitors, research_customers]),
summarize
])
result = workflow.start("Analyze business", )
print(result["output"])
Output:
⚡ Running 3 steps in parallel...
✅ Parallel complete: 3 results
✅ summarize: 📋 SUMMARY:...
📋 SUMMARY:
• 📊 Market: Growth 15% YoY
• 🏢 Competitors: 3 major players
• 👥 Customers: 85% satisfaction
API Reference
parallel()
parallel(
steps: List,
max_workers: Optional[int] = None,
on_failure: str = "partial_ok"
) -> Parallel
Parameters
| Parameter | Type | Default | Description |
|---|
steps | List | — | List of steps to execute concurrently |
max_workers | Optional[int] | None | Cap on ThreadPoolExecutor workers. Defaults to min(3, len(steps)) |
on_failure | "partial_ok" | "fail_fast" | "fail_all" | "partial_ok" | Failure-handling strategy |
Accessing Results
After parallel execution, results are available in ctx.variables:
| Variable | Type | Description |
|---|
parallel_outputs | List[str] | List of all outputs in order |
def aggregator(ctx: WorkflowContext) -> StepResult:
outputs = ctx.variables["parallel_outputs"]
# outputs = ["Result A", "Result B", "Result C"]
return StepResult(output=f"Combined: {len(outputs)} results")
Examples
With Agents
from praisonaiagents import Agent
researcher = Agent(name="Researcher", role="Research topics")
analyst = Agent(name="Analyst", role="Analyze data")
writer = Agent(name="Writer", role="Write content")
workflow = AgentFlow(steps=[
parallel([researcher, analyst, writer]),
final_aggregator
])
Mixed Steps
workflow = AgentFlow(steps=[
parallel([
my_function, # Function
Agent(name="Bot"), # Agent
Task(...) # Task
]),
aggregator
])
Nested Parallel
workflow = AgentFlow(steps=[
parallel([
parallel([task_a1, task_a2]), # Group A
parallel([task_b1, task_b2]) # Group B
]),
final_aggregator
])
Parallel execution uses Python’s ThreadPoolExecutor:
- Concurrent I/O: Ideal for API calls, file operations
- Thread-safe: Each step gets its own copy of variables
- Automatic joining: All results collected before next step
# Performance comparison
# Sequential: 3 steps × 1s each = 3s total
# Parallel: 3 steps × 1s each = ~1s total (concurrent)
Use Cases
| Use Case | Description |
|---|
| Multi-source Research | Query multiple APIs simultaneously |
| Data Processing | Process independent data chunks |
| Report Generation | Generate sections in parallel |
| Validation | Run multiple validators concurrently |
| A/B Comparison | Run different approaches and compare |
Best Practices
- Independent tasks only - Parallel steps shouldn’t depend on each other
- Handle errors gracefully - One failure shouldn’t break all tasks
- Aggregate results - Always follow with a step that combines outputs
- Consider rate limits - Don’t overwhelm external APIs
Failure Handling
Choose how a parallel block reacts when a branch fails using the on_failure parameter.
Failure Strategies
| Strategy | Behavior | Use Case |
|---|
partial_ok | Continue with partial results. Failed branches return "Error: <msg>" | Data aggregation where some sources may be unavailable |
fail_fast | Cancel remaining branches and raise WorkflowStepError on first failure | Critical workflows where any failure invalidates results |
fail_all | Wait for all branches, then raise WorkflowStepError if any failed | Comprehensive error reporting and debugging |
Examples
from praisonaiagents import Agent, Workflow, parallel, WorkflowStepError
# partial_ok (default) — continue even if one branch fails
workflow = Workflow(steps=[
parallel([agent_a, agent_b, agent_c]),
aggregator,
])
# fail_fast — abort on first failure
workflow = Workflow(steps=[
parallel([agent_a, agent_b, agent_c], on_failure="fail_fast"),
aggregator,
])
# fail_all — gather all errors then fail
workflow = Workflow(steps=[
parallel([agent_a, agent_b, agent_c], on_failure="fail_all"),
aggregator,
])
Error Handling with partial_ok
def aggregator(ctx: WorkflowContext) -> StepResult:
outputs = ctx.variables["parallel_outputs"]
# Filter successful results
successful = [o for o in outputs if not o.startswith("Error:")]
errors = [o for o in outputs if o.startswith("Error:")]
if len(successful) >= 2: # Minimum threshold
return StepResult(output=f"Processed {len(successful)} sources")
else:
return StepResult(output=f"Insufficient data: {len(errors)} failures")
Exception Handling with fail_fast/fail_all
try:
result = workflow.start("Process all branches")
except WorkflowStepError as e:
print(f"Workflow failed: {e}")
print(f"Root cause: {e.cause}")
for error in e.errors:
print(f" Branch {error['step']}: {error['error']}")
See Also