> ## Documentation Index
> Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Workflow Parallel Execution

> Execute multiple steps concurrently using the parallel() helper

# Workflow Parallel Execution

Execute multiple steps concurrently and combine their results. This pattern is ideal for independent tasks that can run simultaneously.

```mermaid theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
graph TD
    A[Input] --> B[parallel]
    B --> C[Task A]
    B --> D[Task B]
    B --> E[Task C]
    C --> F[Aggregator]
    D --> F
    E --> F
    F --> G[Output]
```

## Quick Start

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents import AgentFlow, WorkflowContext, StepResult
from praisonaiagents import parallel
import time

# Parallel workers
def research_market(ctx: WorkflowContext) -> StepResult:
    time.sleep(0.1)  # Simulate work
    return StepResult(output="📊 Market: Growth 15% YoY")

def research_competitors(ctx: WorkflowContext) -> StepResult:
    time.sleep(0.1)  # Simulate work
    return StepResult(output="🏢 Competitors: 3 major players")

def research_customers(ctx: WorkflowContext) -> StepResult:
    time.sleep(0.1)  # Simulate work
    return StepResult(output="👥 Customers: 85% satisfaction")

# Aggregator
def summarize(ctx: WorkflowContext) -> StepResult:
    outputs = ctx.variables.get("parallel_outputs", [])
    summary = "📋 SUMMARY:\n" + "\n".join(f"  • {o}" for o in outputs)
    return StepResult(output=summary)

# Create workflow
workflow = AgentFlow(steps=[
    parallel([research_market, research_competitors, research_customers]),
    summarize
])

result = workflow.start("Analyze business", )
print(result["output"])
```

**Output:**

```
⚡ Running 3 steps in parallel...
✅ Parallel complete: 3 results
✅ summarize: 📋 SUMMARY:...

📋 SUMMARY:
  • 📊 Market: Growth 15% YoY
  • 🏢 Competitors: 3 major players
  • 👥 Customers: 85% satisfaction
```

## API Reference

### parallel()

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
parallel(
    steps: List, 
    max_workers: Optional[int] = None,
    on_failure: str = "partial_ok"
) -> Parallel
```

### Parameters

| Parameter     | Type                                        | Default        | Description                                                           |
| ------------- | ------------------------------------------- | -------------- | --------------------------------------------------------------------- |
| `steps`       | `List`                                      | —              | List of steps to execute concurrently                                 |
| `max_workers` | `Optional[int]`                             | `None`         | Cap on `ThreadPoolExecutor` workers. Defaults to `min(3, len(steps))` |
| `on_failure`  | `"partial_ok" \| "fail_fast" \| "fail_all"` | `"partial_ok"` | Failure-handling strategy                                             |

### Accessing Results

After parallel execution, results are available in `ctx.variables`:

| Variable           | Type        | Description                  |
| ------------------ | ----------- | ---------------------------- |
| `parallel_outputs` | `List[str]` | List of all outputs in order |

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
def aggregator(ctx: WorkflowContext) -> StepResult:
    outputs = ctx.variables["parallel_outputs"]
    # outputs = ["Result A", "Result B", "Result C"]
    return StepResult(output=f"Combined: {len(outputs)} results")
```

## Examples

### With Agents

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents import Agent

researcher = Agent(name="Researcher", role="Research topics")
analyst = Agent(name="Analyst", role="Analyze data")
writer = Agent(name="Writer", role="Write content")

workflow = AgentFlow(steps=[
    parallel([researcher, analyst, writer]),
    final_aggregator
])
```

### Mixed Steps

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
workflow = AgentFlow(steps=[
    parallel([
        my_function,           # Function
        Agent(name="Bot"),     # Agent
        Task(...)      # Task
    ]),
    aggregator
])
```

### Nested Parallel

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
workflow = AgentFlow(steps=[
    parallel([
        parallel([task_a1, task_a2]),  # Group A
        parallel([task_b1, task_b2])   # Group B
    ]),
    final_aggregator
])
```

## Performance

Parallel execution uses Python's `ThreadPoolExecutor`:

* **Concurrent I/O**: Ideal for API calls, file operations
* **Thread-safe**: Each step gets its own copy of variables
* **Automatic joining**: All results collected before next step

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Performance comparison
# Sequential: 3 steps × 1s each = 3s total
# Parallel:   3 steps × 1s each = ~1s total (concurrent)
```

## Use Cases

| Use Case                  | Description                          |
| ------------------------- | ------------------------------------ |
| **Multi-source Research** | Query multiple APIs simultaneously   |
| **Data Processing**       | Process independent data chunks      |
| **Report Generation**     | Generate sections in parallel        |
| **Validation**            | Run multiple validators concurrently |
| **A/B Comparison**        | Run different approaches and compare |

## Best Practices

1. **Independent tasks only** - Parallel steps shouldn't depend on each other
2. **Handle errors gracefully** - One failure shouldn't break all tasks
3. **Aggregate results** - Always follow with a step that combines outputs
4. **Consider rate limits** - Don't overwhelm external APIs

## Failure Handling

Choose how a parallel block reacts when a branch fails using the `on_failure` parameter.

```mermaid theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
graph TB
    A[Need every branch to succeed?] --> B[fail_all]
    A --> C[Stop early on first failure?]
    C --> D[fail_fast]
    A --> E[Tolerate missing results?]
    E --> F[partial_ok - Default]
    
    classDef decision fill:#6366F1,stroke:#7C90A0,color:#fff
    classDef strategy fill:#10B981,stroke:#7C90A0,color:#fff
    
    class A,C,E decision
    class B,D,F strategy
```

### Failure Strategies

| Strategy     | Behavior                                                                 | Use Case                                                 |
| ------------ | ------------------------------------------------------------------------ | -------------------------------------------------------- |
| `partial_ok` | Continue with partial results. Failed branches return `"Error: <msg>"`   | Data aggregation where some sources may be unavailable   |
| `fail_fast`  | Cancel remaining branches and raise `WorkflowStepError` on first failure | Critical workflows where any failure invalidates results |
| `fail_all`   | Wait for all branches, then raise `WorkflowStepError` if any failed      | Comprehensive error reporting and debugging              |

### Examples

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from praisonaiagents import Agent, Workflow, parallel, WorkflowStepError

# partial_ok (default) — continue even if one branch fails
workflow = Workflow(steps=[
    parallel([agent_a, agent_b, agent_c]),
    aggregator,
])

# fail_fast — abort on first failure
workflow = Workflow(steps=[
    parallel([agent_a, agent_b, agent_c], on_failure="fail_fast"),
    aggregator,
])

# fail_all — gather all errors then fail
workflow = Workflow(steps=[
    parallel([agent_a, agent_b, agent_c], on_failure="fail_all"),
    aggregator,
])
```

### Error Handling with partial\_ok

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
def aggregator(ctx: WorkflowContext) -> StepResult:
    outputs = ctx.variables["parallel_outputs"]
    
    # Filter successful results
    successful = [o for o in outputs if not o.startswith("Error:")]
    errors = [o for o in outputs if o.startswith("Error:")]
    
    if len(successful) >= 2:  # Minimum threshold
        return StepResult(output=f"Processed {len(successful)} sources")
    else:
        return StepResult(output=f"Insufficient data: {len(errors)} failures")
```

### Exception Handling with fail\_fast/fail\_all

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
try:
    result = workflow.start("Process all branches")
except WorkflowStepError as e:
    print(f"Workflow failed: {e}")
    print(f"Root cause: {e.cause}")
    for error in e.errors:
        print(f"  Branch {error['step']}: {error['error']}")
```

## See Also

* [Workflow Error Handling](/features/workflow-errors)
* [Workflow Patterns Overview](/features/workflow-patterns)
* [Workflow Routing](/features/workflow-routing)
* [Loop Processing](/features/workflow-loop)
* [Evaluator-Optimizer](/features/workflow-repeat)
