Process

The Process class provides sophisticated task orchestration capabilities for multi-agent systems, supporting sequential, workflow, and hierarchical execution patterns with advanced features like loops, conditions, and validation feedback.

Overview

The Process module orchestrates how tasks are executed across multiple agents, managing dependencies, context sharing, retries, and complex workflows. It provides three main execution modes, each suited for different use cases.

Basic Usage

from praisonaiagents import Agent, Task, Process

# Create agents
researcher = Agent(role="Researcher", goal="Find information")
writer = Agent(role="Writer", goal="Create content")

# Create tasks
task1 = Task(description="Research topic", agent=researcher)
task2 = Task(description="Write article", agent=writer, context=[task1])

# Create and run process
process = Process(
    tasks={"research": task1, "write": task2},
    agents=[researcher, writer]
)

# Execute sequentially
process.sequential()

Execution Modes

Sequential Process

Executes tasks one after another in a linear fashion.
# Simple sequential execution
# tasks and agents are assumed to be defined as in the "Basic Usage" example
process = Process(tasks=tasks, agents=agents)
process.sequential()

# Each task completes before the next begins
# Perfect for: pipelines, step-by-step workflows

Workflow Process

Supports complex task relationships with conditions, loops, and dynamic routing.
# Create decision task
decision_task = Task(
    description="Evaluate quality",
    agent=evaluator,
    task_type="decision",
    condition={
        "good": "publish_task",
        "poor": "revise_task"
    }
)

# Create loop task for batch processing
loop_task = Task(
    description="Process customer data",
    agent=processor,
    task_type="loop",
    input_file="customers.csv"  # Process each row
)

# tasks and agents are assumed to be defined as above
process = Process(tasks=tasks, agents=agents)
process.workflow()

Hierarchical Process

Uses a manager agent to dynamically coordinate task execution.
# Manager agent decides task order and assignments
# tasks and agents are assumed to be defined as in the "Basic Usage" example
process = Process(
    tasks=tasks,
    agents=agents,
    manager_llm="gpt-4o"  # Manager uses this model
)
process.hierarchical()

# Manager can:
# - Reassign tasks to different agents
# - Determine optimal execution order
# - Handle unexpected scenarios

Advanced Features

Loop Task Processing

Process batches of data from CSV or text files:
# Process each customer in CSV file
customer_task = Task(
    description="Send personalized email to {customer_name}",
    agent=email_agent,
    task_type="loop",
    loop_data="customers.csv"  # CSV with customer data
)

# Process list from text file
item_task = Task(
    description="Analyze item: {item}",
    agent=analyst,
    task_type="loop",
    input_file="items.txt"  # One item per line
)

# File-based loop data
# Note: In-memory loop data is not directly supported.
# Create a temporary file or use workflow logic for in-memory processing

Decision Tasks and Routing

Implement conditional workflows based on task outputs:
# Quality check with routing
quality_check = Task(
    description="Check if content meets standards",
    agent=reviewer,
    task_type="decision",
    condition={
        "approved": "publish_task",
        "needs_revision": "edit_task",
        "rejected": "discard_task"
    }
)

# Multi-condition routing
router_task = Task(
    description="Categorize support ticket",
    agent=classifier,
    task_type="decision",
    condition={
        "billing": "billing_team_task",
        "technical": "tech_team_task",
        "general": "support_team_task",
        "urgent": "escalation_task"
    }
)

Validation Feedback and Retry

Handle task failures with intelligent retry mechanisms:
# Task with validation
validated_task = Task(
    description="Generate valid JSON response",
    agent=generator,
    output_json={"type": "object", "properties": {"result": {"type": "string"}}},
    max_retries=3  # Max retry attempts
)

# Validation feedback flow
process = Process(
    tasks={
        "generate": validated_task,
        "validate": validation_task
    },
    agents=agents
    # Retry behavior is handled at the task level, not process level
)

# Process handles validation feedback automatically:
# 1. Task executes
# 2. If validation fails, feedback is provided
# 3. Task retries with feedback context
# 4. Process continues until success or max retries

Context Management

Share information between tasks efficiently:
# Explicit context dependency
research_task = Task(description="Research topic", agent=researcher)
write_task = Task(
    description="Write based on research",
    agent=writer,
    context=[research_task]  # Access research results
)

# Context retention control
analysis_task = Task(
    description="Analyze data",
    agent=analyst,
    retain_full_context=True  # Keep in context for future tasks
)

# Selective context
summary_task = Task(
    description="Summarize findings",
    agent=summarizer,
    context=[research_task, analysis_task]
    # Context management is automatic
)

Async Execution

All process types support asynchronous execution:
import asyncio

async def run_async_workflow():
    process = Process(tasks=tasks, agents=agents)
    
    # Async sequential
    await process.asequential()
    
    # Async workflow with streaming
    async for task_id in process.aworkflow():
        print(f"Completed task: {task_id}")
    
    # Async hierarchical
    await process.ahierarchical()

# Run async process
asyncio.run(run_async_workflow())

Configuration Options

Process Parameters

  • tasks (Dict[str, Task]): Dictionary mapping task IDs to Task objects
  • agents (List[Agent]): Available agents for task execution
  • manager_llm (str, optional): LLM for hierarchical manager agent
  • verbose (bool): Enable detailed logging
  • max_iter (int): Maximum iterations for workflow execution (default: 10)

Task States

Tasks progress through these states:
  • pending: Not yet started
  • in_progress: Currently executing
  • completed: Successfully finished
  • failed: Execution failed
  • skipped: Skipped due to conditions
  • retrying: Failed but retrying

Best Practices

  1. Choose the Right Mode:
    • Sequential: Simple pipelines, predictable workflows
    • Workflow: Complex logic, conditions, loops
    • Hierarchical: Dynamic orchestration, adaptive workflows
  2. Handle Failures Gracefully:
    task = Task(
        description="Critical operation",
        agent=agent,
        max_retries=3  # Retry on failure
        # Fallback logic should be implemented in workflow
    )
    
  3. Optimize Context Sharing:
    # Only share necessary context
    task = Task(
        description="Summarize",
        agent=agent,
        context=[relevant_task],  # Not all previous tasks
        retain_full_context=False  # Only use previous task output
    )
    
  4. Monitor Progress:
    process = Process(tasks=tasks, agents=agents, verbose=True)
    
    # Track task states
    for task_id, task in tasks.items():
        print(f"{task_id}: {task.status}")
    

Integration Example

from praisonaiagents import Agent, Task, Process

# Create specialized agents
data_agent = Agent(role="Data Processor", goal="Process customer data")
email_agent = Agent(role="Email Sender", goal="Send personalized emails")
validator = Agent(role="Validator", goal="Ensure quality")
manager = Agent(role="Manager", goal="Coordinate workflow")

# Create workflow tasks
tasks = {
    "load": Task(
        description="Load customer data",
        agent=data_agent,
        task_type="loop",
        loop_data="customers.csv"
    ),
    "personalize": Task(
        description="Create personalized content for {customer_name}",
        agent=email_agent,
        context=["load"]
    ),
    "validate": Task(
        description="Check email quality",
        agent=validator,
        task_type="decision",
        condition={
            "approved": "send",
            "rejected": "revise"
        },
        context=["personalize"]
    ),
    "revise": Task(
        description="Improve email content",
        agent=email_agent,
        next_tasks=["validate"]  # Loop back to validation
    ),
    "send": Task(
        description="Send the email",
        agent=email_agent,
        context=["personalize"]
    )
}

# Execute with hierarchical process for dynamic coordination
process = Process(
    tasks=tasks,
    agents=[data_agent, email_agent, validator, manager],
    manager_llm="gpt-4o"
)

# Run the process
process.hierarchical()

See Also