Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Workflow error handling provides structured exception handling for step failures, enabling robust parallel execution and graceful error recovery.

Quick Start

1

Basic Error Handling

from praisonaiagents import Agent, Workflow, WorkflowStepError

agent = Agent(
    name="Research Agent", 
    instructions="Research topics that might fail"
)

workflow = Workflow(steps=[agent])

try:
    result = workflow.start("Research invalid topic")
except WorkflowStepError as e:
    print(f"Workflow failed: {e}")
    print(f"Root cause: {e.cause}")
2

Handling Multiple Errors

from praisonaiagents import parallel, WorkflowStepError

workflow = Workflow(steps=[
    parallel([agent_a, agent_b, agent_c], on_failure="fail_all"),
])

try:
    workflow.start("Process all branches")
except WorkflowStepError as e:
    print(f"Workflow failed: {e}")
    print(f"Root cause: {e.cause}")
    for err in e.errors:
        print(f"  Branch {err['step']}: {err['error']}")

How It Works

ComponentRole
WorkflowStepErrorMain exception class for workflow failures
causeOriginal exception that triggered the failure
errorsList of multiple errors (for parallel failures)

Configuration Options

AttributeTypeDefaultDescription
causeException | NoneNoneThe underlying exception that triggered the failure (first error in fail_all mode)
errorsList[dict][]List of {"step": int, "error": Exception} for fail_all mode. Empty for fail_fast

Failed Task Propagation

When tasks fail after exhausting retries, dependent tasks are automatically skipped instead of running with None context:

How It Works

Example

from praisonaiagents import Agent, Task, PraisonAIAgents

agent = Agent(name="Worker", instructions="Process data")

# Primary task that might fail
fetch_data = Task(
    description="Fetch data from unreliable API",
    agent=agent,
    max_retries=3
)

# Dependent task - will be skipped if fetch_data fails
process_data = Task(
    description="Process the fetched data",
    agent=agent,
    context=[fetch_data]  # Depends on fetch_data
)

workflow = PraisonAIAgents(
    agents=[agent], 
    tasks=[fetch_data, process_data]
)

result = workflow.start()

# Check task statuses
print(f"Fetch data status: {fetch_data.status}")
print(f"Process data status: {process_data.status}")

if fetch_data.status == "failed":
    # process_data.status will also be "failed" (skipped)
    print("Primary task failed, dependent task was skipped")

Failure Propagation Rules

  1. Failed Task: When a task fails after max_retries, its status is set to "failed"
  2. Dependent Detection: Tasks with context=[failed_task] are identified as dependents
  3. Skip Execution: Dependent tasks are marked as "failed" without execution
  4. No None Propagation: Dependent tasks don’t receive None values from failed dependencies

Process Integration

This behavior works consistently across all process types:
# Sequential process - stops at first failure
workflow = PraisonAIAgents(
    agents=[agent],
    tasks=[task_a, task_b, task_c],
    process="sequential"  # Stops if task_a fails
)

# Workflow process - skips dependents of failed tasks  
workflow = PraisonAIAgents(
    agents=[agent],
    tasks=[fetch, process, save],
    process="workflow"  # Skips process+save if fetch fails
)

Common Patterns

Pattern 1: Single Step Recovery

from praisonaiagents import Agent, Workflow, WorkflowStepError

def with_retry():
    for attempt in range(3):
        try:
            workflow = Workflow(steps=[unreliable_agent])
            return workflow.start("Task")
        except WorkflowStepError as e:
            if attempt == 2:  # Last attempt
                raise
            print(f"Attempt {attempt + 1} failed: {e}")

Pattern 2: Parallel Error Analysis

def analyze_parallel_failures(workflow_errors):
    """Analyze which parallel branches failed and why."""
    failed_branches = []
    for error_info in workflow_errors.errors:
        step_idx = error_info['step']
        error = error_info['error']
        failed_branches.append({
            'branch': step_idx,
            'error_type': type(error).__name__,
            'message': str(error)
        })
    return failed_branches

Pattern 3: Graceful Degradation

def robust_workflow(input_data):
    """Run workflow with fallback strategies."""
    try:
        # Try optimal path
        return run_full_workflow(input_data)
    except WorkflowStepError as e:
        if "timeout" in str(e).lower():
            # Fallback to simpler workflow
            return run_simple_workflow(input_data)
        else:
            # Log and re-raise for other errors
            logger.error(f"Workflow failed: {e}")
            raise

Best Practices

Catch WorkflowStepError specifically rather than generic Exception to handle workflow failures appropriately while allowing other errors to bubble up.
try:
    result = workflow.start("Task")
except WorkflowStepError as e:
    # Handle workflow-specific failures
    handle_workflow_error(e)
except Exception as e:
    # Handle unexpected errors
    logger.exception("Unexpected error")
    raise
Use the cause and errors attributes to understand what specifically went wrong and implement targeted recovery strategies.
except WorkflowStepError as e:
    if isinstance(e.cause, TimeoutError):
        # Retry with longer timeout
        retry_with_timeout()
    elif isinstance(e.cause, ConnectionError):
        # Switch to backup service
        use_backup_service()
Include workflow context in error logs to help with debugging and monitoring.
except WorkflowStepError as e:
    logger.error(
        "Workflow failed",
        extra={
            "workflow_id": workflow.id,
            "step_count": len(workflow.steps),
            "error_count": len(e.errors),
            "root_cause": str(e.cause)
        }
    )
When using parallel execution, design your aggregation logic to handle partial results gracefully.
def smart_aggregator(ctx):
    """Aggregate results even with some failures."""
    outputs = ctx.variables.get("parallel_outputs", [])
    valid_results = [o for o in outputs if not o.startswith("Error:")]
    
    if len(valid_results) >= 2:  # Minimum threshold
        return aggregate_partial_results(valid_results)
    else:
        raise WorkflowStepError("Insufficient successful results")

Workflow Parallel

Parallel execution with failure strategies

Workflow Patterns

Common workflow implementation patterns