Skip to main content

Task Orchestration Best Practices

This guide provides best practices for orchestrating complex task workflows in PraisonAI Agents, helping you choose the right execution patterns and optimize performance.

Choosing the Right Execution Mode

When to Use Sequential Process

Sequential execution is ideal for linear workflows where each step depends on the previous one.
# Good use case: Data pipeline
from praisonaiagents import Agent, Task, Process

# Sequential data processing pipeline
extractor = Agent(role="Data Extractor", goal="Extract data from sources")
transformer = Agent(role="Data Transformer", goal="Clean and transform data")
loader = Agent(role="Data Loader", goal="Load data into destination")

tasks = {
    "extract": Task(
        description="Extract data from API",
        agent=extractor,
        expected_output="Raw JSON data"
    ),
    "transform": Task(
        description="Clean and normalize data",
        agent=transformer,
        context=["extract"],
        expected_output="Cleaned dataset"
    ),
    "load": Task(
        description="Load into database",
        agent=loader,
        context=["transform"],
        expected_output="Load confirmation"
    )
}

process = Process(tasks=tasks, agents=[extractor, transformer, loader])
process.sequential()  # Each task waits for previous to complete
Best for:
  • ETL pipelines
  • Document processing workflows
  • Step-by-step procedures
  • When order is critical

When to Use Workflow Process

Workflow execution supports complex patterns with conditions, loops, and parallel paths.
# Complex customer service workflow
support_agent = Agent(role="Support", goal="Handle inquiries")
tech_agent = Agent(role="Technical", goal="Solve technical issues")
billing_agent = Agent(role="Billing", goal="Handle payments")
escalation_agent = Agent(role="Escalation", goal="Handle complex cases")

tasks = {
    "categorize": Task(
        description="Categorize customer inquiry",
        agent=support_agent,
        task_type="decision",
        condition={
            "technical": ["tech_support"],
            "billing": ["billing_support"],
            "complex": ["escalate"],
            "simple": ["quick_response"]
        }
    ),
    "tech_support": Task(
        description="Resolve technical issue",
        agent=tech_agent,
        task_type="decision",
        condition={
            "resolved": ["send_confirmation"],
            "needs_escalation": ["escalate"]
        }
    ),
    "billing_support": Task(
        description="Handle billing inquiry",
        agent=billing_agent,
        next_tasks=["send_confirmation"]
    ),
    "escalate": Task(
        description="Handle complex case",
        agent=escalation_agent,
        next_tasks=["send_confirmation"]
    ),
    "quick_response": Task(
        description="Send automated response",
        agent=support_agent,
        next_tasks=["send_confirmation"]
    ),
    "send_confirmation": Task(
        description="Send resolution confirmation",
        agent=support_agent
    )
}

process = Process(tasks=tasks, agents=[support_agent, tech_agent, billing_agent, escalation_agent])
process.workflow()  # Handles complex routing logic
Best for:
  • Decision trees
  • Conditional workflows
  • Parallel processing
  • Dynamic routing

When to Use Hierarchical Process

Hierarchical execution uses a manager agent for dynamic orchestration.
# Research project with dynamic task allocation
manager = Agent(
    role="Project Manager",
    goal="Coordinate research project efficiently"
)

researchers = [
    Agent(role="Literature Reviewer", goal="Review academic papers"),
    Agent(role="Data Analyst", goal="Analyze datasets"),
    Agent(role="Report Writer", goal="Write findings")
]

tasks = {
    "define_scope": Task(
        description="Define research scope and objectives",
        expected_output="Research plan"
    ),
    "literature_review": Task(
        description="Review relevant literature",
        expected_output="Literature summary"
    ),
    "data_collection": Task(
        description="Collect and prepare data",
        expected_output="Prepared dataset"
    ),
    "analysis": Task(
        description="Analyze data and draw insights",
        expected_output="Analysis results"
    ),
    "report": Task(
        description="Write comprehensive report",
        expected_output="Final report"
    )
}

# Manager dynamically assigns tasks based on agent availability and expertise
process = Process(
    tasks=tasks,
    agents=researchers,
    manager_llm="gpt-4o"
)
process.hierarchical()
Best for:
  • Dynamic workloads
  • Resource optimization
  • Adaptive workflows
  • Complex coordination

Task Design Patterns

The Pipeline Pattern

Chain tasks for data transformation:
# Data enrichment pipeline
pipeline_tasks = {
    "fetch": Task(
        description="Fetch raw data from source",
        agent=fetcher,
        expected_output="Raw data",
        output_json=RawDataSchema
    ),
    "enrich": Task(
        description="Enrich data with external sources",
        agent=enricher,
        context=["fetch"],
        expected_output="Enriched data",
        output_json=EnrichedDataSchema
    ),
    "validate": Task(
        description="Validate enriched data",
        agent=validator,
        context=["enrich"],
        guardrails=[data_quality_check],
        expected_output="Validated data"
    ),
    "store": Task(
        description="Store in database",
        agent=storer,
        context=["validate"],
        expected_output="Storage confirmation"
    )
}

The Fan-Out/Fan-In Pattern

Process items in parallel then aggregate:
# Parallel analysis with aggregation
analysis_tasks = {
    "split": Task(
        description="Split dataset into chunks",
        agent=splitter,
        expected_output="List of data chunks"
    ),
    "analyze_chunk": Task(
        description="Analyze data chunk {chunk_id}",
        agent=analyzer,
        task_type="loop",
        loop_data="chunks.csv",
        context=["split"],
        expected_output="Chunk analysis"
    ),
    "aggregate": Task(
        description="Combine all analyses",
        agent=aggregator,
        context=["analyze_chunk"],
        expected_output="Combined analysis report"
    )
}

The Retry Pattern

Implement robust retry logic:
# Reliable API integration with retries
def api_validation(output):
    """Validate API response"""
    if "error" in output.raw:
        return GuardrailResult(
            success=False,
            error=f"API error: {output.raw}"
        )
    return GuardrailResult(success=True)

reliable_task = Task(
    description="Call external API",
    agent=api_agent,
    guardrails=[api_validation],
    validation_steps=3,  # Retry up to 3 times
    retry_delay=5,  # Wait 5 seconds between retries
    fallback_agent=backup_agent  # Use if all retries fail
)

The Circuit Breaker Pattern

Prevent cascade failures:
class CircuitBreakerTask(Task):
    """Task with circuit breaker pattern"""
    
    def __init__(self, failure_threshold=3, timeout=60, **kwargs):
        super().__init__(**kwargs)
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.last_failure_time = None
        self.timeout = timeout
    
    def execute(self):
        # Check if circuit is open
        if self.failure_count >= self.failure_threshold:
            if time.time() - self.last_failure_time < self.timeout:
                return TaskOutput(
                    raw="Circuit breaker open - service temporarily unavailable",
                    metadata={"circuit_status": "open"}
                )
        
        try:
            result = super().execute()
            self.failure_count = 0  # Reset on success
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            raise e

Context Management Strategies

Selective Context Passing

Only pass necessary context to avoid token limits:
# Bad: Passing entire context
summary_task = Task(
    description="Summarize findings",
    agent=summarizer,
    context=[task1, task2, task3, task4, task5]  # Too much context
)

# Good: Selective context
summary_task = Task(
    description="Summarize findings",
    agent=summarizer,
    context=[task3, task5],  # Only relevant tasks
    context_fields=["key_findings", "recommendations"]  # Specific fields
)

Context Compression

Compress context for efficiency:
class ContextCompressor:
    """Compress context before passing to next task"""
    
    def compress(self, context_data, max_tokens=1000):
        # Implement compression logic
        # Could use summarization, key extraction, etc.
        compressed = self.extract_key_points(context_data, max_tokens)
        return compressed

# Use in task
# Note: context_processor is not a built-in parameter.
# You can implement context compression in your agent or workflow logic
compression_task = Task(
    description="Process with compressed context",
    agent=processor,  # Agent should handle context compression
    context=[previous_task],
    expected_output="Processed result"
)

Context Windowing

Implement sliding window for long sequences:
# Process long document with context window
window_size = 3

for i in range(len(document_chunks)):
    # Get context window
    start_idx = max(0, i - window_size)
    context_chunks = document_chunks[start_idx:i]
    
    task = Task(
        description=f"Process chunk {i}",
        agent=processor,
        context=context_chunks,
        expected_output="Processed chunk"
    )

Performance Optimization

Parallel Execution

Maximize parallelism where possible:
# Parallel independent tasks
independent_tasks = {
    "analyze_sales": Task(
        description="Analyze sales data",
        agent=sales_analyst,
        async_execution=True
    ),
    "analyze_marketing": Task(
        description="Analyze marketing data",
        agent=marketing_analyst,
        async_execution=True
    ),
    "analyze_support": Task(
        description="Analyze support tickets",
        agent=support_analyst,
        async_execution=True
    ),
    "combine_results": Task(
        description="Combine all analyses",
        agent=reporter,
        context=["analyze_sales", "analyze_marketing", "analyze_support"]
    )
}

# Execute parallel tasks concurrently
async def run_parallel():
    process = Process(tasks=independent_tasks, agents=agents)
    await process.aworkflow()

Resource Pooling

Manage agent resources efficiently:
from concurrent.futures import ThreadPoolExecutor

class AgentPool:
    """Pool of agents for concurrent execution"""
    
    def __init__(self, agent_template, pool_size=5):
        self.agents = [
            Agent(**agent_template) for _ in range(pool_size)
        ]
        self.executor = ThreadPoolExecutor(max_workers=pool_size)
    
    def execute_task(self, task):
        # Get available agent from pool
        agent = self.get_available_agent()
        future = self.executor.submit(agent.execute, task)
        return future

# Use agent pool
agent_pool = AgentPool(
    agent_template={
        "role": "Data Processor",
        "goal": "Process data efficiently"
    },
    pool_size=10
)

Caching Strategies

Implement intelligent caching:
from functools import lru_cache
import hashlib

class CachedTask(Task):
    """Task with result caching"""
    
    def __init__(self, cache_ttl=3600, **kwargs):
        super().__init__(**kwargs)
        self.cache_ttl = cache_ttl
        self.cache = {}
    
    def get_cache_key(self, inputs):
        """Generate cache key from inputs"""
        key_str = f"{self.description}:{inputs}"
        return hashlib.md5(key_str.encode()).hexdigest()
    
    def execute(self, inputs=None):
        cache_key = self.get_cache_key(inputs)
        
        # Check cache
        if cache_key in self.cache:
            cached_result, timestamp = self.cache[cache_key]
            if time.time() - timestamp < self.cache_ttl:
                return cached_result
        
        # Execute and cache
        result = super().execute()
        self.cache[cache_key] = (result, time.time())
        return result

Error Handling and Recovery

Graceful Degradation

Design workflows that degrade gracefully:
# Workflow with graceful degradation
tasks = {
    "primary_analysis": Task(
        description="Perform detailed analysis",
        agent=primary_analyst,
        max_retries=2
    ),
    "fallback_analysis": Task(
        description="Perform basic analysis",
        agent=basic_analyst,
        condition_on_previous_failure=True  # Only runs if primary fails
    ),
    "report": Task(
        description="Generate report with available data",
        agent=reporter,
        context=["primary_analysis", "fallback_analysis"],
        handle_missing_context=True  # Continues even if some context missing
    )
}

Checkpoint and Resume

Implement checkpointing for long workflows:
class CheckpointedProcess(Process):
    """Process with checkpoint/resume capability"""
    
    def __init__(self, checkpoint_dir="checkpoints", **kwargs):
        super().__init__(**kwargs)
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
    
    def save_checkpoint(self, task_id, result):
        """Save task result to checkpoint"""
        checkpoint_file = self.checkpoint_dir / f"{task_id}.json"
        with open(checkpoint_file, "w") as f:
            json.dump({
                "task_id": task_id,
                "result": result.dict(),
                "timestamp": time.time()
            }, f)
    
    def load_checkpoint(self, task_id):
        """Load task result from checkpoint"""
        checkpoint_file = self.checkpoint_dir / f"{task_id}.json"
        if checkpoint_file.exists():
            with open(checkpoint_file, "r") as f:
                return json.load(f)
        return None
    
    def workflow(self):
        """Execute workflow with checkpointing"""
        for task_id, task in self.tasks.items():
            # Check for existing checkpoint
            checkpoint = self.load_checkpoint(task_id)
            if checkpoint:
                print(f"Resuming from checkpoint: {task_id}")
                task.result = TaskOutput(**checkpoint["result"])
                continue
            
            # Execute task
            result = task.execute()
            
            # Save checkpoint
            self.save_checkpoint(task_id, result)

Monitoring and Observability

Task Metrics

Track key metrics for optimization:
class MetricsCollector:
    """Collect and analyze task metrics"""
    
    def __init__(self):
        self.metrics = {
            "execution_times": {},
            "success_rates": {},
            "retry_counts": {},
            "token_usage": {}
        }
    
    def record_task(self, task_id, output):
        """Record task metrics"""
        self.metrics["execution_times"][task_id] = output.metadata.get("execution_time", 0)
        self.metrics["retry_counts"][task_id] = output.metadata.get("retry_count", 0)
        self.metrics["token_usage"][task_id] = output.metadata.get("tokens_used", 0)
    
    def get_bottlenecks(self):
        """Identify performance bottlenecks"""
        sorted_times = sorted(
            self.metrics["execution_times"].items(),
            key=lambda x: x[1],
            reverse=True
        )
        return sorted_times[:5]  # Top 5 slowest tasks

# Use metrics collector
collector = MetricsCollector()

for task_id, task in tasks.items():
    result = task.execute()
    collector.record_task(task_id, result)

print(f"Bottlenecks: {collector.get_bottlenecks()}")

Workflow Visualization

Visualize complex workflows:
import networkx as nx
import matplotlib.pyplot as plt

def visualize_workflow(tasks):
    """Create visual representation of workflow"""
    G = nx.DiGraph()
    
    # Add nodes and edges
    for task_id, task in tasks.items():
        G.add_node(task_id, task_type=task.task_type)
        
        # Add edges based on dependencies
        if task.context:
            for dep in task.context:
                G.add_edge(dep.id, task_id)
        
        if hasattr(task, 'next_tasks'):
            for next_task in task.next_tasks:
                G.add_edge(task_id, next_task)
    
    # Draw graph
    pos = nx.spring_layout(G)
    nx.draw(G, pos, with_labels=True, node_color='lightblue', 
            node_size=1500, font_size=10, arrows=True)
    plt.savefig("workflow_visualization.png")
    plt.close()

# Visualize your workflow
visualize_workflow(tasks)

Testing Task Workflows

Unit Testing Tasks

import unittest
from unittest.mock import Mock, patch

class TestTaskWorkflow(unittest.TestCase):
    """Test individual tasks and workflows"""
    
    def test_task_execution(self):
        """Test single task execution"""
        mock_agent = Mock()
        mock_agent.chat.return_value = "Test result"
        
        task = Task(
            description="Test task",
            agent=mock_agent,
            expected_output="Test output"
        )
        
        result = task.execute()
        self.assertEqual(result.raw, "Test result")
        mock_agent.chat.assert_called_once()
    
    def test_task_retry(self):
        """Test task retry logic"""
        mock_agent = Mock()
        mock_agent.chat.side_effect = [
            Exception("First attempt failed"),
            Exception("Second attempt failed"),
            "Success on third attempt"
        ]
        
        task = Task(
            description="Retry test",
            agent=mock_agent,
            max_retries=3
        )
        
        result = task.execute()
        self.assertEqual(result.raw, "Success on third attempt")
        self.assertEqual(mock_agent.chat.call_count, 3)

Integration Testing

def test_workflow_integration():
    """Test complete workflow execution"""
    # Create test agents with predictable behavior
    test_agents = [
        Agent(role="Test Agent 1", goal="Test"),
        Agent(role="Test Agent 2", goal="Test")
    ]
    
    # Create test workflow
    test_tasks = {
        "task1": Task(description="First task", agent=test_agents[0]),
        "task2": Task(description="Second task", agent=test_agents[1], context=["task1"])
    }
    
    # Execute and verify
    process = Process(tasks=test_tasks, agents=test_agents)
    process.sequential()
    
    # Verify results
    assert test_tasks["task1"].status == "completed"
    assert test_tasks["task2"].status == "completed"

See Also