Task Orchestration Best Practices
This guide provides best practices for orchestrating complex task workflows in PraisonAI Agents, helping you choose the right execution patterns and optimize performance.Choosing the Right Execution Mode
When to Use Sequential Process
Sequential execution is ideal for linear workflows where each step depends on the previous one.Copy
# Good use case: Data pipeline
from praisonaiagents import Agent, Task, Process
# Sequential data processing pipeline
extractor = Agent(role="Data Extractor", goal="Extract data from sources")
transformer = Agent(role="Data Transformer", goal="Clean and transform data")
loader = Agent(role="Data Loader", goal="Load data into destination")
tasks = {
"extract": Task(
description="Extract data from API",
agent=extractor,
expected_output="Raw JSON data"
),
"transform": Task(
description="Clean and normalize data",
agent=transformer,
context=["extract"],
expected_output="Cleaned dataset"
),
"load": Task(
description="Load into database",
agent=loader,
context=["transform"],
expected_output="Load confirmation"
)
}
process = Process(tasks=tasks, agents=[extractor, transformer, loader])
process.sequential() # Each task waits for previous to complete
- ETL pipelines
- Document processing workflows
- Step-by-step procedures
- When order is critical
When to Use Workflow Process
Workflow execution supports complex patterns with conditions, loops, and parallel paths.Copy
# Complex customer service workflow
support_agent = Agent(role="Support", goal="Handle inquiries")
tech_agent = Agent(role="Technical", goal="Solve technical issues")
billing_agent = Agent(role="Billing", goal="Handle payments")
escalation_agent = Agent(role="Escalation", goal="Handle complex cases")
tasks = {
"categorize": Task(
description="Categorize customer inquiry",
agent=support_agent,
task_type="decision",
condition={
"technical": ["tech_support"],
"billing": ["billing_support"],
"complex": ["escalate"],
"simple": ["quick_response"]
}
),
"tech_support": Task(
description="Resolve technical issue",
agent=tech_agent,
task_type="decision",
condition={
"resolved": ["send_confirmation"],
"needs_escalation": ["escalate"]
}
),
"billing_support": Task(
description="Handle billing inquiry",
agent=billing_agent,
next_tasks=["send_confirmation"]
),
"escalate": Task(
description="Handle complex case",
agent=escalation_agent,
next_tasks=["send_confirmation"]
),
"quick_response": Task(
description="Send automated response",
agent=support_agent,
next_tasks=["send_confirmation"]
),
"send_confirmation": Task(
description="Send resolution confirmation",
agent=support_agent
)
}
process = Process(tasks=tasks, agents=[support_agent, tech_agent, billing_agent, escalation_agent])
process.workflow() # Handles complex routing logic
- Decision trees
- Conditional workflows
- Parallel processing
- Dynamic routing
When to Use Hierarchical Process
Hierarchical execution uses a manager agent for dynamic orchestration.Copy
# Research project with dynamic task allocation
manager = Agent(
role="Project Manager",
goal="Coordinate research project efficiently"
)
researchers = [
Agent(role="Literature Reviewer", goal="Review academic papers"),
Agent(role="Data Analyst", goal="Analyze datasets"),
Agent(role="Report Writer", goal="Write findings")
]
tasks = {
"define_scope": Task(
description="Define research scope and objectives",
expected_output="Research plan"
),
"literature_review": Task(
description="Review relevant literature",
expected_output="Literature summary"
),
"data_collection": Task(
description="Collect and prepare data",
expected_output="Prepared dataset"
),
"analysis": Task(
description="Analyze data and draw insights",
expected_output="Analysis results"
),
"report": Task(
description="Write comprehensive report",
expected_output="Final report"
)
}
# Manager dynamically assigns tasks based on agent availability and expertise
process = Process(
tasks=tasks,
agents=researchers,
manager_llm="gpt-4o"
)
process.hierarchical()
- Dynamic workloads
- Resource optimization
- Adaptive workflows
- Complex coordination
Task Design Patterns
The Pipeline Pattern
Chain tasks for data transformation:Copy
# Data enrichment pipeline
pipeline_tasks = {
"fetch": Task(
description="Fetch raw data from source",
agent=fetcher,
expected_output="Raw data",
output_json=RawDataSchema
),
"enrich": Task(
description="Enrich data with external sources",
agent=enricher,
context=["fetch"],
expected_output="Enriched data",
output_json=EnrichedDataSchema
),
"validate": Task(
description="Validate enriched data",
agent=validator,
context=["enrich"],
guardrails=[data_quality_check],
expected_output="Validated data"
),
"store": Task(
description="Store in database",
agent=storer,
context=["validate"],
expected_output="Storage confirmation"
)
}
The Fan-Out/Fan-In Pattern
Process items in parallel then aggregate:Copy
# Parallel analysis with aggregation
analysis_tasks = {
"split": Task(
description="Split dataset into chunks",
agent=splitter,
expected_output="List of data chunks"
),
"analyze_chunk": Task(
description="Analyze data chunk {chunk_id}",
agent=analyzer,
task_type="loop",
loop_data="chunks.csv",
context=["split"],
expected_output="Chunk analysis"
),
"aggregate": Task(
description="Combine all analyses",
agent=aggregator,
context=["analyze_chunk"],
expected_output="Combined analysis report"
)
}
The Retry Pattern
Implement robust retry logic:Copy
# Reliable API integration with retries
def api_validation(output):
"""Validate API response"""
if "error" in output.raw:
return GuardrailResult(
success=False,
error=f"API error: {output.raw}"
)
return GuardrailResult(success=True)
reliable_task = Task(
description="Call external API",
agent=api_agent,
guardrails=[api_validation],
validation_steps=3, # Retry up to 3 times
retry_delay=5, # Wait 5 seconds between retries
fallback_agent=backup_agent # Use if all retries fail
)
The Circuit Breaker Pattern
Prevent cascade failures:Copy
class CircuitBreakerTask(Task):
"""Task with circuit breaker pattern"""
def __init__(self, failure_threshold=3, timeout=60, **kwargs):
super().__init__(**kwargs)
self.failure_count = 0
self.failure_threshold = failure_threshold
self.last_failure_time = None
self.timeout = timeout
def execute(self):
# Check if circuit is open
if self.failure_count >= self.failure_threshold:
if time.time() - self.last_failure_time < self.timeout:
return TaskOutput(
raw="Circuit breaker open - service temporarily unavailable",
metadata={"circuit_status": "open"}
)
try:
result = super().execute()
self.failure_count = 0 # Reset on success
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
raise e
Context Management Strategies
Selective Context Passing
Only pass necessary context to avoid token limits:Copy
# Bad: Passing entire context
summary_task = Task(
description="Summarize findings",
agent=summarizer,
context=[task1, task2, task3, task4, task5] # Too much context
)
# Good: Selective context
summary_task = Task(
description="Summarize findings",
agent=summarizer,
context=[task3, task5], # Only relevant tasks
context_fields=["key_findings", "recommendations"] # Specific fields
)
Context Compression
Compress context for efficiency:Copy
class ContextCompressor:
"""Compress context before passing to next task"""
def compress(self, context_data, max_tokens=1000):
# Implement compression logic
# Could use summarization, key extraction, etc.
compressed = self.extract_key_points(context_data, max_tokens)
return compressed
# Use in task
# Note: context_processor is not a built-in parameter.
# You can implement context compression in your agent or workflow logic
compression_task = Task(
description="Process with compressed context",
agent=processor, # Agent should handle context compression
context=[previous_task],
expected_output="Processed result"
)
Context Windowing
Implement sliding window for long sequences:Copy
# Process long document with context window
window_size = 3
for i in range(len(document_chunks)):
# Get context window
start_idx = max(0, i - window_size)
context_chunks = document_chunks[start_idx:i]
task = Task(
description=f"Process chunk {i}",
agent=processor,
context=context_chunks,
expected_output="Processed chunk"
)
Performance Optimization
Parallel Execution
Maximize parallelism where possible:Copy
# Parallel independent tasks
independent_tasks = {
"analyze_sales": Task(
description="Analyze sales data",
agent=sales_analyst,
async_execution=True
),
"analyze_marketing": Task(
description="Analyze marketing data",
agent=marketing_analyst,
async_execution=True
),
"analyze_support": Task(
description="Analyze support tickets",
agent=support_analyst,
async_execution=True
),
"combine_results": Task(
description="Combine all analyses",
agent=reporter,
context=["analyze_sales", "analyze_marketing", "analyze_support"]
)
}
# Execute parallel tasks concurrently
async def run_parallel():
process = Process(tasks=independent_tasks, agents=agents)
await process.aworkflow()
Resource Pooling
Manage agent resources efficiently:Copy
from concurrent.futures import ThreadPoolExecutor
class AgentPool:
"""Pool of agents for concurrent execution"""
def __init__(self, agent_template, pool_size=5):
self.agents = [
Agent(**agent_template) for _ in range(pool_size)
]
self.executor = ThreadPoolExecutor(max_workers=pool_size)
def execute_task(self, task):
# Get available agent from pool
agent = self.get_available_agent()
future = self.executor.submit(agent.execute, task)
return future
# Use agent pool
agent_pool = AgentPool(
agent_template={
"role": "Data Processor",
"goal": "Process data efficiently"
},
pool_size=10
)
Caching Strategies
Implement intelligent caching:Copy
from functools import lru_cache
import hashlib
class CachedTask(Task):
"""Task with result caching"""
def __init__(self, cache_ttl=3600, **kwargs):
super().__init__(**kwargs)
self.cache_ttl = cache_ttl
self.cache = {}
def get_cache_key(self, inputs):
"""Generate cache key from inputs"""
key_str = f"{self.description}:{inputs}"
return hashlib.md5(key_str.encode()).hexdigest()
def execute(self, inputs=None):
cache_key = self.get_cache_key(inputs)
# Check cache
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.cache_ttl:
return cached_result
# Execute and cache
result = super().execute()
self.cache[cache_key] = (result, time.time())
return result
Error Handling and Recovery
Graceful Degradation
Design workflows that degrade gracefully:Copy
# Workflow with graceful degradation
tasks = {
"primary_analysis": Task(
description="Perform detailed analysis",
agent=primary_analyst,
max_retries=2
),
"fallback_analysis": Task(
description="Perform basic analysis",
agent=basic_analyst,
condition_on_previous_failure=True # Only runs if primary fails
),
"report": Task(
description="Generate report with available data",
agent=reporter,
context=["primary_analysis", "fallback_analysis"],
handle_missing_context=True # Continues even if some context missing
)
}
Checkpoint and Resume
Implement checkpointing for long workflows:Copy
class CheckpointedProcess(Process):
"""Process with checkpoint/resume capability"""
def __init__(self, checkpoint_dir="checkpoints", **kwargs):
super().__init__(**kwargs)
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def save_checkpoint(self, task_id, result):
"""Save task result to checkpoint"""
checkpoint_file = self.checkpoint_dir / f"{task_id}.json"
with open(checkpoint_file, "w") as f:
json.dump({
"task_id": task_id,
"result": result.dict(),
"timestamp": time.time()
}, f)
def load_checkpoint(self, task_id):
"""Load task result from checkpoint"""
checkpoint_file = self.checkpoint_dir / f"{task_id}.json"
if checkpoint_file.exists():
with open(checkpoint_file, "r") as f:
return json.load(f)
return None
def workflow(self):
"""Execute workflow with checkpointing"""
for task_id, task in self.tasks.items():
# Check for existing checkpoint
checkpoint = self.load_checkpoint(task_id)
if checkpoint:
print(f"Resuming from checkpoint: {task_id}")
task.result = TaskOutput(**checkpoint["result"])
continue
# Execute task
result = task.execute()
# Save checkpoint
self.save_checkpoint(task_id, result)
Monitoring and Observability
Task Metrics
Track key metrics for optimization:Copy
class MetricsCollector:
"""Collect and analyze task metrics"""
def __init__(self):
self.metrics = {
"execution_times": {},
"success_rates": {},
"retry_counts": {},
"token_usage": {}
}
def record_task(self, task_id, output):
"""Record task metrics"""
self.metrics["execution_times"][task_id] = output.metadata.get("execution_time", 0)
self.metrics["retry_counts"][task_id] = output.metadata.get("retry_count", 0)
self.metrics["token_usage"][task_id] = output.metadata.get("tokens_used", 0)
def get_bottlenecks(self):
"""Identify performance bottlenecks"""
sorted_times = sorted(
self.metrics["execution_times"].items(),
key=lambda x: x[1],
reverse=True
)
return sorted_times[:5] # Top 5 slowest tasks
# Use metrics collector
collector = MetricsCollector()
for task_id, task in tasks.items():
result = task.execute()
collector.record_task(task_id, result)
print(f"Bottlenecks: {collector.get_bottlenecks()}")
Workflow Visualization
Visualize complex workflows:Copy
import networkx as nx
import matplotlib.pyplot as plt
def visualize_workflow(tasks):
"""Create visual representation of workflow"""
G = nx.DiGraph()
# Add nodes and edges
for task_id, task in tasks.items():
G.add_node(task_id, task_type=task.task_type)
# Add edges based on dependencies
if task.context:
for dep in task.context:
G.add_edge(dep.id, task_id)
if hasattr(task, 'next_tasks'):
for next_task in task.next_tasks:
G.add_edge(task_id, next_task)
# Draw graph
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, node_color='lightblue',
node_size=1500, font_size=10, arrows=True)
plt.savefig("workflow_visualization.png")
plt.close()
# Visualize your workflow
visualize_workflow(tasks)
Testing Task Workflows
Unit Testing Tasks
Copy
import unittest
from unittest.mock import Mock, patch
class TestTaskWorkflow(unittest.TestCase):
"""Test individual tasks and workflows"""
def test_task_execution(self):
"""Test single task execution"""
mock_agent = Mock()
mock_agent.chat.return_value = "Test result"
task = Task(
description="Test task",
agent=mock_agent,
expected_output="Test output"
)
result = task.execute()
self.assertEqual(result.raw, "Test result")
mock_agent.chat.assert_called_once()
def test_task_retry(self):
"""Test task retry logic"""
mock_agent = Mock()
mock_agent.chat.side_effect = [
Exception("First attempt failed"),
Exception("Second attempt failed"),
"Success on third attempt"
]
task = Task(
description="Retry test",
agent=mock_agent,
max_retries=3
)
result = task.execute()
self.assertEqual(result.raw, "Success on third attempt")
self.assertEqual(mock_agent.chat.call_count, 3)
Integration Testing
Copy
def test_workflow_integration():
"""Test complete workflow execution"""
# Create test agents with predictable behavior
test_agents = [
Agent(role="Test Agent 1", goal="Test"),
Agent(role="Test Agent 2", goal="Test")
]
# Create test workflow
test_tasks = {
"task1": Task(description="First task", agent=test_agents[0]),
"task2": Task(description="Second task", agent=test_agents[1], context=["task1"])
}
# Execute and verify
process = Process(tasks=test_tasks, agents=test_agents)
process.sequential()
# Verify results
assert test_tasks["task1"].status == "completed"
assert test_tasks["task2"].status == "completed"
See Also
- Process Documentation - Process API reference
- Task Configuration - Task setup options
- Workflow Examples - Real-world examples
- Performance Tuning - Optimization guide

