Skip to main content
Workflow error handling provides structured exception handling for step failures, enabling robust parallel execution and graceful error recovery.

Quick Start

1

Basic Error Handling

from praisonaiagents import Agent, Workflow, WorkflowStepError

agent = Agent(
    name="Research Agent", 
    instructions="Research topics that might fail"
)

workflow = Workflow(steps=[agent])

try:
    result = workflow.start("Research invalid topic")
except WorkflowStepError as e:
    print(f"Workflow failed: {e}")
    print(f"Root cause: {e.cause}")
2

Handling Multiple Errors

from praisonaiagents import parallel, WorkflowStepError

workflow = Workflow(steps=[
    parallel([agent_a, agent_b, agent_c], on_failure="fail_all"),
])

try:
    workflow.start("Process all branches")
except WorkflowStepError as e:
    print(f"Workflow failed: {e}")
    print(f"Root cause: {e.cause}")
    for err in e.errors:
        print(f"  Branch {err['step']}: {err['error']}")

How It Works

ComponentRole
WorkflowStepErrorMain exception class for workflow failures
causeOriginal exception that triggered the failure
errorsList of multiple errors (for parallel failures)

Configuration Options

AttributeTypeDefaultDescription
causeException | NoneNoneThe underlying exception that triggered the failure (first error in fail_all mode)
errorsList[dict][]List of {"step": int, "error": Exception} for fail_all mode. Empty for fail_fast

Failed Task Propagation

When tasks fail after exhausting retries, dependent tasks are automatically skipped instead of running with None context:

How It Works

Example

from praisonaiagents import Agent, Task, PraisonAIAgents

agent = Agent(name="Worker", instructions="Process data")

# Primary task that might fail
fetch_data = Task(
    description="Fetch data from unreliable API",
    agent=agent,
    max_retries=3
)

# Dependent task - will be skipped if fetch_data fails
process_data = Task(
    description="Process the fetched data",
    agent=agent,
    context=[fetch_data]  # Depends on fetch_data
)

workflow = PraisonAIAgents(
    agents=[agent], 
    tasks=[fetch_data, process_data]
)

result = workflow.start()

# Check task statuses
print(f"Fetch data status: {fetch_data.status}")
print(f"Process data status: {process_data.status}")

if fetch_data.status == "failed":
    # process_data.status will also be "failed" (skipped)
    print("Primary task failed, dependent task was skipped")

Failure Propagation Rules

  1. Failed Task: When a task fails after max_retries, its status is set to "failed"
  2. Dependent Detection: Tasks with context=[failed_task] are identified as dependents
  3. Skip Execution: Dependent tasks are marked as "failed" without execution
  4. No None Propagation: Dependent tasks don’t receive None values from failed dependencies

Process Integration

This behavior works consistently across all process types:
# Sequential process - stops at first failure
workflow = PraisonAIAgents(
    agents=[agent],
    tasks=[task_a, task_b, task_c],
    process="sequential"  # Stops if task_a fails
)

# Workflow process - skips dependents of failed tasks  
workflow = PraisonAIAgents(
    agents=[agent],
    tasks=[fetch, process, save],
    process="workflow"  # Skips process+save if fetch fails
)

Common Patterns

Pattern 1: Single Step Recovery

from praisonaiagents import Agent, Workflow, WorkflowStepError

def with_retry():
    for attempt in range(3):
        try:
            workflow = Workflow(steps=[unreliable_agent])
            return workflow.start("Task")
        except WorkflowStepError as e:
            if attempt == 2:  # Last attempt
                raise
            print(f"Attempt {attempt + 1} failed: {e}")

Pattern 2: Parallel Error Analysis

def analyze_parallel_failures(workflow_errors):
    """Analyze which parallel branches failed and why."""
    failed_branches = []
    for error_info in workflow_errors.errors:
        step_idx = error_info['step']
        error = error_info['error']
        failed_branches.append({
            'branch': step_idx,
            'error_type': type(error).__name__,
            'message': str(error)
        })
    return failed_branches

Pattern 3: Graceful Degradation

def robust_workflow(input_data):
    """Run workflow with fallback strategies."""
    try:
        # Try optimal path
        return run_full_workflow(input_data)
    except WorkflowStepError as e:
        if "timeout" in str(e).lower():
            # Fallback to simpler workflow
            return run_simple_workflow(input_data)
        else:
            # Log and re-raise for other errors
            logger.error(f"Workflow failed: {e}")
            raise

Best Practices

Catch WorkflowStepError specifically rather than generic Exception to handle workflow failures appropriately while allowing other errors to bubble up.
try:
    result = workflow.start("Task")
except WorkflowStepError as e:
    # Handle workflow-specific failures
    handle_workflow_error(e)
except Exception as e:
    # Handle unexpected errors
    logger.exception("Unexpected error")
    raise
Use the cause and errors attributes to understand what specifically went wrong and implement targeted recovery strategies.
except WorkflowStepError as e:
    if isinstance(e.cause, TimeoutError):
        # Retry with longer timeout
        retry_with_timeout()
    elif isinstance(e.cause, ConnectionError):
        # Switch to backup service
        use_backup_service()
Include workflow context in error logs to help with debugging and monitoring.
except WorkflowStepError as e:
    logger.error(
        "Workflow failed",
        extra={
            "workflow_id": workflow.id,
            "step_count": len(workflow.steps),
            "error_count": len(e.errors),
            "root_cause": str(e.cause)
        }
    )
When using parallel execution, design your aggregation logic to handle partial results gracefully.
def smart_aggregator(ctx):
    """Aggregate results even with some failures."""
    outputs = ctx.variables.get("parallel_outputs", [])
    valid_results = [o for o in outputs if not o.startswith("Error:")]
    
    if len(valid_results) >= 2:  # Minimum threshold
        return aggregate_partial_results(valid_results)
    else:
        raise WorkflowStepError("Insufficient successful results")

Workflow Parallel

Parallel execution with failure strategies

Workflow Patterns

Common workflow implementation patterns