> ## Documentation Index
> Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Task Orchestration Best Practices

> Best practices for designing and implementing complex task workflows

# Task Orchestration Best Practices

This guide provides best practices for orchestrating complex task workflows in PraisonAI Agents, helping you choose the right execution patterns and optimize performance.

## Choosing the Right Execution Mode

### When to Use Sequential Process

Sequential execution is ideal for linear workflows where each step depends on the previous one.

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Good use case: Data pipeline
from praisonaiagents import Agent, Task, Process

# Sequential data processing pipeline
extractor = Agent(role="Data Extractor", goal="Extract data from sources")
transformer = Agent(role="Data Transformer", goal="Clean and transform data")
loader = Agent(role="Data Loader", goal="Load data into destination")

tasks = {
    "extract": Task(
        description="Extract data from API",
        agent=extractor,
        expected_output="Raw JSON data"
    ),
    "transform": Task(
        description="Clean and normalize data",
        agent=transformer,
        context=["extract"],
        expected_output="Cleaned dataset"
    ),
    "load": Task(
        description="Load into database",
        agent=loader,
        context=["transform"],
        expected_output="Load confirmation"
    )
}

process = Process(tasks=tasks, agents=[extractor, transformer, loader])
process.sequential()  # Each task waits for previous to complete
```

**Best for:**

* ETL pipelines
* Document processing workflows
* Step-by-step procedures
* When order is critical

### When to Use Workflow Process

Workflow execution supports complex patterns with conditions, loops, and parallel paths.

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Complex customer service workflow
support_agent = Agent(role="Support", goal="Handle inquiries")
tech_agent = Agent(role="Technical", goal="Solve technical issues")
billing_agent = Agent(role="Billing", goal="Handle payments")
escalation_agent = Agent(role="Escalation", goal="Handle complex cases")

tasks = {
    "categorize": Task(
        description="Categorize customer inquiry",
        agent=support_agent,
        task_type="decision",
        condition={
            "technical": ["tech_support"],
            "billing": ["billing_support"],
            "complex": ["escalate"],
            "simple": ["quick_response"]
        }
    ),
    "tech_support": Task(
        description="Resolve technical issue",
        agent=tech_agent,
        task_type="decision",
        condition={
            "resolved": ["send_confirmation"],
            "needs_escalation": ["escalate"]
        }
    ),
    "billing_support": Task(
        description="Handle billing inquiry",
        agent=billing_agent,
        next_tasks=["send_confirmation"]
    ),
    "escalate": Task(
        description="Handle complex case",
        agent=escalation_agent,
        next_tasks=["send_confirmation"]
    ),
    "quick_response": Task(
        description="Send automated response",
        agent=support_agent,
        next_tasks=["send_confirmation"]
    ),
    "send_confirmation": Task(
        description="Send resolution confirmation",
        agent=support_agent
    )
}

process = Process(tasks=tasks, agents=[support_agent, tech_agent, billing_agent, escalation_agent])
process.workflow()  # Handles complex routing logic
```

**Best for:**

* Decision trees
* Conditional workflows
* Parallel processing
* Dynamic routing

### When to Use Hierarchical Process

Hierarchical execution uses a manager agent for dynamic orchestration.

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Research project with dynamic task allocation
manager = Agent(
    role="Project Manager",
    goal="Coordinate research project efficiently"
)

researchers = [
    Agent(role="Literature Reviewer", goal="Review academic papers"),
    Agent(role="Data Analyst", goal="Analyze datasets"),
    Agent(role="Report Writer", goal="Write findings")
]

tasks = {
    "define_scope": Task(
        description="Define research scope and objectives",
        expected_output="Research plan"
    ),
    "literature_review": Task(
        description="Review relevant literature",
        expected_output="Literature summary"
    ),
    "data_collection": Task(
        description="Collect and prepare data",
        expected_output="Prepared dataset"
    ),
    "analysis": Task(
        description="Analyze data and draw insights",
        expected_output="Analysis results"
    ),
    "report": Task(
        description="Write comprehensive report",
        expected_output="Final report"
    )
}

# Manager dynamically assigns tasks based on agent availability and expertise
process = Process(
    tasks=tasks,
    agents=researchers,
    manager_llm="gpt-4o"
)
process.hierarchical()
```

**Best for:**

* Dynamic workloads
* Resource optimization
* Adaptive workflows
* Complex coordination

## Task Design Patterns

### The Pipeline Pattern

Chain tasks for data transformation:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Data enrichment pipeline
pipeline_tasks = {
    "fetch": Task(
        description="Fetch raw data from source",
        agent=fetcher,
        expected_output="Raw data",
        output_json=RawDataSchema
    ),
    "enrich": Task(
        description="Enrich data with external sources",
        agent=enricher,
        context=["fetch"],
        expected_output="Enriched data",
        output_json=EnrichedDataSchema
    ),
    "validate": Task(
        description="Validate enriched data",
        agent=validator,
        context=["enrich"],
        guardrails=[data_quality_check],
        expected_output="Validated data"
    ),
    "store": Task(
        description="Store in database",
        agent=storer,
        context=["validate"],
        expected_output="Storage confirmation"
    )
}
```

### The Fan-Out/Fan-In Pattern

Process items in parallel then aggregate:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Parallel analysis with aggregation
analysis_tasks = {
    "split": Task(
        description="Split dataset into chunks",
        agent=splitter,
        expected_output="List of data chunks"
    ),
    "analyze_chunk": Task(
        description="Analyze data chunk {chunk_id}",
        agent=analyzer,
        task_type="loop",
        loop_data="chunks.csv",
        context=["split"],
        expected_output="Chunk analysis"
    ),
    "aggregate": Task(
        description="Combine all analyses",
        agent=aggregator,
        context=["analyze_chunk"],
        expected_output="Combined analysis report"
    )
}
```

### The Retry Pattern

Implement robust retry logic:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Reliable API integration with retries
def api_validation(output):
    """Validate API response"""
    if "error" in output.raw:
        return GuardrailResult(
            success=False,
            error=f"API error: {output.raw}"
        )
    return GuardrailResult(success=True)

reliable_task = Task(
    description="Call external API",
    agent=api_agent,
    guardrails=[api_validation],
    validation_steps=3,  # Retry up to 3 times
    retry_delay=5,  # Wait 5 seconds between retries
    fallback_agent=backup_agent  # Use if all retries fail
)
```

### The Circuit Breaker Pattern

<Note>
  PraisonAI now ships a **built-in tool circuit breaker** that wraps every tool call automatically. See [Tool Circuit Breaker](/features/tool-circuit-breaker). The examples below show how to extend or customise that pattern.
</Note>

Prevent cascade failures:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
class CircuitBreakerTask(Task):
    """Task with circuit breaker pattern"""
    
    def __init__(self, failure_threshold=3, timeout=60, **kwargs):
        super().__init__(**kwargs)
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.last_failure_time = None
        self.timeout = timeout
    
    def execute(self):
        # Check if circuit is open
        if self.failure_count >= self.failure_threshold:
            if time.time() - self.last_failure_time < self.timeout:
                return TaskOutput(
                    raw="Circuit breaker open - service temporarily unavailable",
                    metadata={"circuit_status": "open"}
                )
        
        try:
            result = super().execute()
            self.failure_count = 0  # Reset on success
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            raise e
```

## Context Management Strategies

### Selective Context Passing

Only pass necessary context to avoid token limits:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Bad: Passing entire context
summary_task = Task(
    description="Summarize findings",
    agent=summarizer,
    context=[task1, task2, task3, task4, task5]  # Too much context
)

# Good: Selective context
summary_task = Task(
    description="Summarize findings",
    agent=summarizer,
    context=[task3, task5],  # Only relevant tasks
    context_fields=["key_findings", "recommendations"]  # Specific fields
)
```

### Context Compression

Compress context for efficiency:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
class ContextCompressor:
    """Compress context before passing to next task"""
    
    def compress(self, context_data, max_tokens=1000):
        # Implement compression logic
        # Could use summarization, key extraction, etc.
        compressed = self.extract_key_points(context_data, max_tokens)
        return compressed

# Use in task
# Note: context_processor is not a built-in parameter.
# You can implement context compression in your agent or workflow logic
compression_task = Task(
    description="Process with compressed context",
    agent=processor,  # Agent should handle context compression
    context=[previous_task],
    expected_output="Processed result"
)
```

### Context Windowing

Implement sliding window for long sequences:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Process long document with context window
window_size = 3

for i in range(len(document_chunks)):
    # Get context window
    start_idx = max(0, i - window_size)
    context_chunks = document_chunks[start_idx:i]
    
    task = Task(
        description=f"Process chunk {i}",
        agent=processor,
        context=context_chunks,
        expected_output="Processed chunk"
    )
```

## Performance Optimization

### Parallel Execution

Maximize parallelism where possible:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Parallel independent tasks
independent_tasks = {
    "analyze_sales": Task(
        description="Analyze sales data",
        agent=sales_analyst,
        async_execution=True
    ),
    "analyze_marketing": Task(
        description="Analyze marketing data",
        agent=marketing_analyst,
        async_execution=True
    ),
    "analyze_support": Task(
        description="Analyze support tickets",
        agent=support_analyst,
        async_execution=True
    ),
    "combine_results": Task(
        description="Combine all analyses",
        agent=reporter,
        context=["analyze_sales", "analyze_marketing", "analyze_support"]
    )
}

# Execute parallel tasks concurrently
async def run_parallel():
    process = Process(tasks=independent_tasks, agents=agents)
    await process.aworkflow()
```

### Resource Pooling

Manage agent resources efficiently:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from concurrent.futures import ThreadPoolExecutor

class AgentPool:
    """Pool of agents for concurrent execution"""
    
    def __init__(self, agent_template, pool_size=5):
        self.agents = [
            Agent(**agent_template) for _ in range(pool_size)
        ]
        self.executor = ThreadPoolExecutor(max_workers=pool_size)
    
    def execute_task(self, task):
        # Get available agent from pool
        agent = self.get_available_agent()
        future = self.executor.submit(agent.execute, task)
        return future

# Use agent pool
agent_pool = AgentPool(
    agent_template={
        "role": "Data Processor",
        "goal": "Process data efficiently"
    },
    pool_size=10
)
```

### Caching Strategies

Implement intelligent caching:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from functools import lru_cache
import hashlib

class CachedTask(Task):
    """Task with result caching"""
    
    def __init__(self, cache_ttl=3600, **kwargs):
        super().__init__(**kwargs)
        self.cache_ttl = cache_ttl
        self.cache = {}
    
    def get_cache_key(self, inputs):
        """Generate cache key from inputs"""
        key_str = f"{self.description}:{inputs}"
        return hashlib.md5(key_str.encode()).hexdigest()
    
    def execute(self, inputs=None):
        cache_key = self.get_cache_key(inputs)
        
        # Check cache
        if cache_key in self.cache:
            cached_result, timestamp = self.cache[cache_key]
            if time.time() - timestamp < self.cache_ttl:
                return cached_result
        
        # Execute and cache
        result = super().execute()
        self.cache[cache_key] = (result, time.time())
        return result
```

## Error Handling and Recovery

### Graceful Degradation

Design workflows that degrade gracefully:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
# Workflow with graceful degradation
tasks = {
    "primary_analysis": Task(
        description="Perform detailed analysis",
        agent=primary_analyst,
        max_retries=2
    ),
    "fallback_analysis": Task(
        description="Perform basic analysis",
        agent=basic_analyst,
        condition_on_previous_failure=True  # Only runs if primary fails
    ),
    "report": Task(
        description="Generate report with available data",
        agent=reporter,
        context=["primary_analysis", "fallback_analysis"],
        handle_missing_context=True  # Continues even if some context missing
    )
}
```

### Checkpoint and Resume

Implement checkpointing for long workflows:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
class CheckpointedProcess(Process):
    """Process with checkpoint/resume capability"""
    
    def __init__(self, checkpoint_dir="checkpoints", **kwargs):
        super().__init__(**kwargs)
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
    
    def save_checkpoint(self, task_id, result):
        """Save task result to checkpoint"""
        checkpoint_file = self.checkpoint_dir / f"{task_id}.json"
        with open(checkpoint_file, "w") as f:
            json.dump({
                "task_id": task_id,
                "result": result.dict(),
                "timestamp": time.time()
            }, f)
    
    def load_checkpoint(self, task_id):
        """Load task result from checkpoint"""
        checkpoint_file = self.checkpoint_dir / f"{task_id}.json"
        if checkpoint_file.exists():
            with open(checkpoint_file, "r") as f:
                return json.load(f)
        return None
    
    def workflow(self):
        """Execute workflow with checkpointing"""
        for task_id, task in self.tasks.items():
            # Check for existing checkpoint
            checkpoint = self.load_checkpoint(task_id)
            if checkpoint:
                print(f"Resuming from checkpoint: {task_id}")
                task.result = TaskOutput(**checkpoint["result"])
                continue
            
            # Execute task
            result = task.execute()
            
            # Save checkpoint
            self.save_checkpoint(task_id, result)
```

## Monitoring and Observability

### Task Metrics

Track key metrics for optimization:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
class MetricsCollector:
    """Collect and analyze task metrics"""
    
    def __init__(self):
        self.metrics = {
            "execution_times": {},
            "success_rates": {},
            "retry_counts": {},
            "token_usage": {}
        }
    
    def record_task(self, task_id, output):
        """Record task metrics"""
        self.metrics["execution_times"][task_id] = output.metadata.get("execution_time", 0)
        self.metrics["retry_counts"][task_id] = output.metadata.get("retry_count", 0)
        self.metrics["token_usage"][task_id] = output.metadata.get("tokens_used", 0)
    
    def get_bottlenecks(self):
        """Identify performance bottlenecks"""
        sorted_times = sorted(
            self.metrics["execution_times"].items(),
            key=lambda x: x[1],
            reverse=True
        )
        return sorted_times[:5]  # Top 5 slowest tasks

# Use metrics collector
collector = MetricsCollector()

for task_id, task in tasks.items():
    result = task.execute()
    collector.record_task(task_id, result)

print(f"Bottlenecks: {collector.get_bottlenecks()}")
```

### Workflow Visualization

Visualize complex workflows:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import networkx as nx
import matplotlib.pyplot as plt

def visualize_workflow(tasks):
    """Create visual representation of workflow"""
    G = nx.DiGraph()
    
    # Add nodes and edges
    for task_id, task in tasks.items():
        G.add_node(task_id, task_type=task.task_type)
        
        # Add edges based on dependencies
        if task.context:
            for dep in task.context:
                G.add_edge(dep.id, task_id)
        
        if hasattr(task, 'next_tasks'):
            for next_task in task.next_tasks:
                G.add_edge(task_id, next_task)
    
    # Draw graph
    pos = nx.spring_layout(G)
    nx.draw(G, pos, with_labels=True, node_color='lightblue', 
            node_size=1500, font_size=10, arrows=True)
    plt.savefig("workflow_visualization.png")
    plt.close()

# Visualize your workflow
visualize_workflow(tasks)
```

## Testing Task Workflows

### Unit Testing Tasks

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import unittest
from unittest.mock import Mock, patch

class TestTaskWorkflow(unittest.TestCase):
    """Test individual tasks and workflows"""
    
    def test_task_execution(self):
        """Test single task execution"""
        mock_agent = Mock()
        mock_agent.chat.return_value = "Test result"
        
        task = Task(
            description="Test task",
            agent=mock_agent,
            expected_output="Test output"
        )
        
        result = task.execute()
        self.assertEqual(result.raw, "Test result")
        mock_agent.chat.assert_called_once()
    
    def test_task_retry(self):
        """Test task retry logic"""
        mock_agent = Mock()
        mock_agent.chat.side_effect = [
            Exception("First attempt failed"),
            Exception("Second attempt failed"),
            "Success on third attempt"
        ]
        
        task = Task(
            description="Retry test",
            agent=mock_agent,
            max_retries=3
        )
        
        result = task.execute()
        self.assertEqual(result.raw, "Success on third attempt")
        self.assertEqual(mock_agent.chat.call_count, 3)
```

### Integration Testing

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
def test_workflow_integration():
    """Test complete workflow execution"""
    # Create test agents with predictable behavior
    test_agents = [
        Agent(role="Test Agent 1", goal="Test"),
        Agent(role="Test Agent 2", goal="Test")
    ]
    
    # Create test workflow
    test_tasks = {
        "task1": Task(description="First task", agent=test_agents[0]),
        "task2": Task(description="Second task", agent=test_agents[1], context=["task1"])
    }
    
    # Execute and verify
    process = Process(tasks=test_tasks, agents=test_agents)
    process.sequential()
    
    # Verify results
    assert test_tasks["task1"].status == "completed"
    assert test_tasks["task2"].status == "completed"
```

## See Also

* [Process Documentation](/api/praisonaiagents/process/process) - Process API reference
* [Task Configuration](/api/praisonaiagents/task/task) - Task setup options
* [Workflow Examples](/examples/adaptive-learning) - Real-world examples
* [Performance Tuning](/best-practices/performance-tuning) - Optimization guide
