Documentation Index Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Workflow error handling provides structured exception handling for step failures, enabling robust parallel execution and graceful error recovery.
Quick Start
Basic Error Handling
from praisonaiagents import Agent , Workflow , WorkflowStepError
agent = Agent (
name = " Research Agent " ,
instructions = " Research topics that might fail "
)
workflow = Workflow ( steps =[ agent ])
try :
result = workflow . start ( " Research invalid topic " )
except WorkflowStepError as e :
print ( f "Workflow failed: { e } " )
print ( f "Root cause: { e . cause } " )
Handling Multiple Errors
from praisonaiagents import parallel , WorkflowStepError
workflow = Workflow ( steps =[
parallel ([ agent_a , agent_b , agent_c ], on_failure = " fail_all " ),
])
try :
workflow . start ( " Process all branches " )
except WorkflowStepError as e :
print ( f "Workflow failed: { e } " )
print ( f "Root cause: { e . cause } " )
for err in e . errors :
print ( f " Branch { err [ ' step ' ] } : { err [ ' error ' ] } " )
How It Works
Component Role WorkflowStepError Main exception class for workflow failures cause Original exception that triggered the failure errors List of multiple errors (for parallel failures)
Configuration Options
Attribute Type Default Description causeException | NoneNoneThe underlying exception that triggered the failure (first error in fail_all mode) errorsList[dict][]List of {"step": int, "error": Exception} for fail_all mode. Empty for fail_fast
Failed Task Propagation
When tasks fail after exhausting retries, dependent tasks are automatically skipped instead of running with None context:
How It Works
Example
from praisonaiagents import Agent , Task , PraisonAIAgents
agent = Agent ( name = " Worker " , instructions = " Process data " )
# Primary task that might fail
fetch_data = Task (
description = " Fetch data from unreliable API " ,
agent = agent ,
max_retries = 3
)
# Dependent task - will be skipped if fetch_data fails
process_data = Task (
description = " Process the fetched data " ,
agent = agent ,
context =[ fetch_data ] # Depends on fetch_data
)
workflow = PraisonAIAgents (
agents =[ agent ],
tasks =[ fetch_data , process_data ]
)
result = workflow . start ()
# Check task statuses
print ( f "Fetch data status: { fetch_data . status } " )
print ( f "Process data status: { process_data . status } " )
if fetch_data . status == " failed " :
# process_data.status will also be "failed" (skipped)
print ( " Primary task failed, dependent task was skipped " )
Failure Propagation Rules
Failed Task : When a task fails after max_retries, its status is set to "failed"
Dependent Detection : Tasks with context=[failed_task] are identified as dependents
Skip Execution : Dependent tasks are marked as "failed" without execution
No None Propagation : Dependent tasks don’t receive None values from failed dependencies
Process Integration
This behavior works consistently across all process types:
# Sequential process - stops at first failure
workflow = PraisonAIAgents (
agents =[ agent ],
tasks =[ task_a , task_b , task_c ],
process = " sequential " # Stops if task_a fails
)
# Workflow process - skips dependents of failed tasks
workflow = PraisonAIAgents (
agents =[ agent ],
tasks =[ fetch , process , save ],
process = " workflow " # Skips process+save if fetch fails
)
Common Patterns
Pattern 1: Single Step Recovery
from praisonaiagents import Agent , Workflow , WorkflowStepError
def with_retry ():
for attempt in range ( 3 ):
try :
workflow = Workflow ( steps =[ unreliable_agent ])
return workflow . start ( " Task " )
except WorkflowStepError as e :
if attempt == 2 : # Last attempt
raise
print ( f "Attempt { attempt + 1 } failed: { e } " )
Pattern 2: Parallel Error Analysis
def analyze_parallel_failures ( workflow_errors ):
""" Analyze which parallel branches failed and why. """
failed_branches = []
for error_info in workflow_errors . errors :
step_idx = error_info [ ' step ' ]
error = error_info [ ' error ' ]
failed_branches . append ({
' branch ' : step_idx ,
' error_type ' : type ( error ). __name__ ,
' message ' : str ( error )
})
return failed_branches
Pattern 3: Graceful Degradation
def robust_workflow ( input_data ):
""" Run workflow with fallback strategies. """
try :
# Try optimal path
return run_full_workflow ( input_data )
except WorkflowStepError as e :
if " timeout " in str ( e ). lower ():
# Fallback to simpler workflow
return run_simple_workflow ( input_data )
else :
# Log and re-raise for other errors
logger . error ( f "Workflow failed: { e } " )
raise
Best Practices
Always Catch Specific Errors
Catch WorkflowStepError specifically rather than generic Exception to handle workflow failures appropriately while allowing other errors to bubble up. try :
result = workflow . start ( " Task " )
except WorkflowStepError as e :
# Handle workflow-specific failures
handle_workflow_error ( e )
except Exception as e :
# Handle unexpected errors
logger . exception ( " Unexpected error " )
raise
Use the cause and errors attributes to understand what specifically went wrong and implement targeted recovery strategies. except WorkflowStepError as e :
if isinstance ( e . cause , TimeoutError ):
# Retry with longer timeout
retry_with_timeout ()
elif isinstance ( e . cause , ConnectionError ):
# Switch to backup service
use_backup_service ()
Include workflow context in error logs to help with debugging and monitoring. except WorkflowStepError as e :
logger . error (
" Workflow failed " ,
extra ={
" workflow_id " : workflow . id ,
" step_count " : len ( workflow . steps ),
" error_count " : len ( e . errors ),
" root_cause " : str ( e . cause )
}
)
Design for Partial Success
When using parallel execution, design your aggregation logic to handle partial results gracefully. def smart_aggregator ( ctx ):
""" Aggregate results even with some failures. """
outputs = ctx . variables . get ( " parallel_outputs " , [])
valid_results = [ o for o in outputs if not o . startswith ( " Error: " )]
if len ( valid_results ) >= 2 : # Minimum threshold
return aggregate_partial_results ( valid_results )
else :
raise WorkflowStepError ( " Insufficient successful results " )
Workflow Parallel Parallel execution with failure strategies
Workflow Patterns Common workflow implementation patterns