Skip to main content
Workflow error handling provides structured exception handling for step failures, enabling robust parallel execution and graceful error recovery.

Quick Start

1

Basic Error Handling

from praisonaiagents import Agent, Workflow, WorkflowStepError

agent = Agent(
    name="Research Agent", 
    instructions="Research topics that might fail"
)

workflow = Workflow(steps=[agent])

try:
    result = workflow.start("Research invalid topic")
except WorkflowStepError as e:
    print(f"Workflow failed: {e}")
    print(f"Root cause: {e.cause}")
2

Handling Multiple Errors

from praisonaiagents import parallel, WorkflowStepError

workflow = Workflow(steps=[
    parallel([agent_a, agent_b, agent_c], on_failure="fail_all"),
])

try:
    workflow.start("Process all branches")
except WorkflowStepError as e:
    print(f"Workflow failed: {e}")
    print(f"Root cause: {e.cause}")
    for err in e.errors:
        print(f"  Branch {err['step']}: {err['error']}")

How It Works

ComponentRole
WorkflowStepErrorMain exception class for workflow failures
causeOriginal exception that triggered the failure
errorsList of multiple errors (for parallel failures)

Configuration Options

AttributeTypeDefaultDescription
causeException | NoneNoneThe underlying exception that triggered the failure (first error in fail_all mode)
errorsList[dict][]List of {"step": int, "error": Exception} for fail_all mode. Empty for fail_fast

Common Patterns

Pattern 1: Single Step Recovery

from praisonaiagents import Agent, Workflow, WorkflowStepError

def with_retry():
    for attempt in range(3):
        try:
            workflow = Workflow(steps=[unreliable_agent])
            return workflow.start("Task")
        except WorkflowStepError as e:
            if attempt == 2:  # Last attempt
                raise
            print(f"Attempt {attempt + 1} failed: {e}")

Pattern 2: Parallel Error Analysis

def analyze_parallel_failures(workflow_errors):
    """Analyze which parallel branches failed and why."""
    failed_branches = []
    for error_info in workflow_errors.errors:
        step_idx = error_info['step']
        error = error_info['error']
        failed_branches.append({
            'branch': step_idx,
            'error_type': type(error).__name__,
            'message': str(error)
        })
    return failed_branches

Pattern 3: Graceful Degradation

def robust_workflow(input_data):
    """Run workflow with fallback strategies."""
    try:
        # Try optimal path
        return run_full_workflow(input_data)
    except WorkflowStepError as e:
        if "timeout" in str(e).lower():
            # Fallback to simpler workflow
            return run_simple_workflow(input_data)
        else:
            # Log and re-raise for other errors
            logger.error(f"Workflow failed: {e}")
            raise

Best Practices

Catch WorkflowStepError specifically rather than generic Exception to handle workflow failures appropriately while allowing other errors to bubble up.
try:
    result = workflow.start("Task")
except WorkflowStepError as e:
    # Handle workflow-specific failures
    handle_workflow_error(e)
except Exception as e:
    # Handle unexpected errors
    logger.exception("Unexpected error")
    raise
Use the cause and errors attributes to understand what specifically went wrong and implement targeted recovery strategies.
except WorkflowStepError as e:
    if isinstance(e.cause, TimeoutError):
        # Retry with longer timeout
        retry_with_timeout()
    elif isinstance(e.cause, ConnectionError):
        # Switch to backup service
        use_backup_service()
Include workflow context in error logs to help with debugging and monitoring.
except WorkflowStepError as e:
    logger.error(
        "Workflow failed",
        extra={
            "workflow_id": workflow.id,
            "step_count": len(workflow.steps),
            "error_count": len(e.errors),
            "root_cause": str(e.cause)
        }
    )
When using parallel execution, design your aggregation logic to handle partial results gracefully.
def smart_aggregator(ctx):
    """Aggregate results even with some failures."""
    outputs = ctx.variables.get("parallel_outputs", [])
    valid_results = [o for o in outputs if not o.startswith("Error:")]
    
    if len(valid_results) >= 2:  # Minimum threshold
        return aggregate_partial_results(valid_results)
    else:
        raise WorkflowStepError("Insufficient successful results")

Workflow Parallel

Parallel execution with failure strategies

Workflow Patterns

Common workflow implementation patterns