Workflow API
PraisonAI provides a simple, powerful workflow system for chaining agents and functions.
Quick Start
from praisonaiagents import Workflow, WorkflowContext, StepResult
def validate(ctx: WorkflowContext) -> StepResult:
return StepResult(output=f"Valid: {ctx.input}")
def process(ctx: WorkflowContext) -> StepResult:
return StepResult(output=f"Done: {ctx.previous_result}")
workflow = Workflow(steps=[validate, process])
result = workflow.start("Hello World")
print(result["output"]) # "Done: Valid: Hello World"
Import
from praisonaiagents import Workflow, WorkflowStep, WorkflowContext, StepResult
# Pipeline is an alias for Workflow (same thing)
from praisonaiagents import Pipeline
# Or from workflows module
from praisonaiagents.workflows import WorkflowManager, Workflow, WorkflowStep
# Pattern helpers
from praisonaiagents.workflows import route, parallel, loop, repeat
Pipeline and Workflow are interchangeable - they refer to the same class.
Use whichever term fits your mental model better.
Callbacks
Workflow supports callbacks for monitoring and custom logic:
def on_start(workflow, input_text):
print(f"Starting workflow with: {input_text}")
def on_complete(workflow, result):
print(f"Workflow completed: {result['status']}")
def on_step_start(step_name, context):
print(f"Starting step: {step_name}")
def on_step_complete(step_name, result):
print(f"Step {step_name} completed: {result.output[:50]}...")
def on_step_error(step_name, error):
print(f"Step {step_name} failed: {error}")
workflow = Workflow(
steps=[step1, step2],
on_workflow_start=on_start,
on_workflow_complete=on_complete,
on_step_start=on_step_start,
on_step_complete=on_step_complete,
on_step_error=on_step_error
)
Guardrails
Add validation to steps with automatic retry:
def validate_output(result):
if "error" in result.output.lower():
return (False, "Output contains error, please fix")
return (True, None)
workflow = Workflow(steps=[
WorkflowStep(
name="generator",
handler=my_generator,
guardrail=validate_output,
max_retries=3
)
])
When validation fails:
- The step is retried (up to
max_retries)
- Validation feedback is passed to the step via
ctx.variables["validation_feedback"]
- For agent steps, feedback is appended to the prompt
Status Tracking
Track workflow and step execution status:
workflow = Workflow(steps=[step1, step2])
print(workflow.status) # "not_started"
result = workflow.start("input")
print(workflow.status) # "completed"
print(workflow.step_statuses) # {"step1": "completed", "step2": "completed"}
# Result includes status
print(result["status"]) # "completed"
print(result["steps"][0]["status"]) # "completed"
print(result["steps"][0]["retries"]) # 0
WorkflowContext
Context passed to step handlers containing workflow state.
Constructor
WorkflowContext(
input: str = "",
previous_result: Optional[str] = None,
current_step: str = "",
variables: Dict[str, Any] = {}
)
Attributes
| Attribute | Type | Description |
|---|
input | str | Original workflow input |
previous_result | Optional[str] | Output from previous step |
current_step | str | Current step name |
variables | Dict[str, Any] | All workflow variables |
StepResult
Result returned from step handlers.
Constructor
StepResult(
output: str = "",
stop_workflow: bool = False,
variables: Dict[str, Any] = {}
)
Attributes
| Attribute | Type | Default | Description |
|---|
output | str | "" | Step output content |
stop_workflow | bool | False | If True, stop the entire workflow |
variables | Dict[str, Any] | {} | Variables to add/update |
Example
def validate(ctx: WorkflowContext) -> StepResult:
if "error" in ctx.input:
return StepResult(output="Invalid", stop_workflow=True)
return StepResult(output="Valid", variables={"validated": True})
Workflow
A complete workflow with multiple steps.
Constructor
Workflow(
name: str = "Workflow",
description: str = "",
steps: List = [],
variables: Dict[str, Any] = {},
default_llm: Optional[str] = None,
default_agent_config: Optional[Dict[str, Any]] = None
)
Parameters
| Parameter | Type | Default | Description |
|---|
name | str | "Workflow" | Workflow name |
description | str | "" | Workflow description |
steps | List | [] | List of steps (Agent, function, or WorkflowStep) |
variables | Dict[str, Any] | {} | Initial variables |
default_llm | Optional[str] | None | Default LLM for action-based steps |
default_agent_config | Optional[Dict] | None | Default agent config |
planning | bool | False | Enable planning mode |
planning_llm | Optional[str] | None | LLM for planning |
reasoning | bool | False | Enable chain-of-thought reasoning |
verbose | bool | False | Enable verbose output |
memory_config | Optional[Dict] | None | Memory configuration |
Methods
start()
Run the workflow with the given input.
def start(
input: str = "",
llm: Optional[str] = None,
verbose: bool = False
) -> Dict[str, Any]
| Parameter | Type | Default | Description |
|---|
input | str | "" | Input text for the workflow |
llm | Optional[str] | None | LLM model override |
verbose | bool | False | Print step progress |
Returns: Dict with output, steps, variables, and status
astart() / arun()
Async version of start() for async workflow execution.
async def astart(
input: str = "",
llm: Optional[str] = None,
verbose: bool = False
) -> Dict[str, Any]
Example:
import asyncio
async def main():
workflow = Workflow(steps=[step1, step2])
result = await workflow.astart("Hello World")
print(result["output"])
asyncio.run(main())
Step Types
Workflows accept three types of steps:
- Functions - Automatically wrapped as handlers
- Agents - Executed with the input
- WorkflowStep - Full configuration
from praisonaiagents import Workflow, Agent, WorkflowStep
workflow = Workflow(
steps=[
my_function, # Function
Agent(name="Writer", ...), # Agent
WorkflowStep(name="custom", handler=my_handler) # WorkflowStep
]
)
WorkflowStep
A dataclass representing a single step in a workflow.
Constructor
WorkflowStep(
name: str,
description: str = "",
action: str = "",
handler: Optional[Callable] = None,
should_run: Optional[Callable] = None,
agent: Optional[Agent] = None,
agent_config: Optional[Dict[str, Any]] = None,
condition: Optional[str] = None,
on_error: Literal["stop", "continue", "retry"] = "stop",
max_retries: int = 1,
context_from: Optional[List[str]] = None,
retain_full_context: bool = True,
output_variable: Optional[str] = None,
tools: Optional[List[Any]] = None,
next_steps: Optional[List[str]] = None,
branch_condition: Optional[Dict[str, List[str]]] = None,
loop_over: Optional[str] = None,
loop_var: str = "item"
)
Parameters
| Parameter | Type | Default | Description |
|---|
name | str | required | Step name |
description | str | "" | Step description |
action | str | "" | The action/prompt to execute |
handler | Optional[Callable] | None | Custom function (ctx) -> StepResult |
should_run | Optional[Callable] | None | Condition function (ctx) -> bool |
agent | Optional[Agent] | None | Direct Agent instance |
agent_config | Optional[Dict] | None | Per-step agent configuration |
condition | Optional[str] | None | Condition string for execution |
on_error | Literal[...] | "stop" | Error handling: “stop”, “continue”, “retry” |
max_retries | int | 1 | Maximum retry attempts |
context_from | Optional[List[str]] | None | Steps to include context from |
retain_full_context | bool | True | Include all previous outputs |
output_variable | Optional[str] | None | Custom variable name for output |
tools | Optional[List[Any]] | None | Tools for this step |
next_steps | Optional[List[str]] | None | Next step names for branching |
branch_condition | Optional[Dict] | None | Conditional branching rules |
loop_over | Optional[str] | None | Variable name to iterate over |
loop_var | str | "item" | Variable name for current item |
guardrail | Optional[Callable] | None | Validation function (result) -> (bool, feedback) |
output_file | Optional[str] | None | Save step output to file |
output_json | Optional[Any] | None | Pydantic model for JSON output |
output_pydantic | Optional[Any] | None | Pydantic model for structured output |
images | Optional[List[str]] | None | Image paths/URLs for vision tasks |
async_execution | bool | False | Mark step for async execution |
quality_check | bool | True | Enable quality validation |
rerun | bool | True | Allow step to be rerun |
Handler Function
Custom handler functions receive WorkflowContext and return StepResult:
def my_handler(ctx: WorkflowContext) -> StepResult:
# Access context
print(f"Input: {ctx.input}")
print(f"Previous: {ctx.previous_result}")
print(f"Variables: {ctx.variables}")
# Return result
return StepResult(
output="Step completed",
stop_workflow=False, # Set True to stop workflow
variables={"key": "value"} # Add/update variables
)
should_run Function
Conditional execution - return True to run the step, False to skip:
def is_sensitive(ctx: WorkflowContext) -> bool:
return "legal" in ctx.input.lower()
step = WorkflowStep(
name="compliance",
handler=check_compliance,
should_run=is_sensitive # Only runs for sensitive content
)
Agent Config Options
When using agent_config, you can specify:
| Key | Type | Description |
|---|
role | str | Agent role (e.g., “Researcher”) |
goal | str | Agent goal |
backstory | str | Agent backstory |
llm | str | LLM model override |
verbose | bool | Enable verbose output |
Example
step = WorkflowStep(
name="research",
action="Research {{topic}}",
agent_config={
"role": "Researcher",
"goal": "Find comprehensive information",
"backstory": "Expert researcher"
},
tools=["tavily_search"],
output_variable="research_data"
)
Branching Example
# Decision step with conditional branching
decision_step = WorkflowStep(
name="evaluate",
action="Evaluate if the task is complete. Reply with 'success' or 'failure'.",
next_steps=["success_handler", "failure_handler"],
branch_condition={
"success": ["success_handler"],
"failure": ["failure_handler"]
}
)
Loop Example
# Loop step that iterates over a list
loop_step = WorkflowStep(
name="process_items",
action="Process item: {{current_item}}",
loop_over="items", # Variable containing the list
loop_var="current_item" # Variable name for each item
)
# Execute with items
result = manager.execute(
"my_workflow",
variables={"items": ["item1", "item2", "item3"]}
)
Workflow
A dataclass representing a complete workflow with multiple steps.
Constructor
Workflow(
name: str,
description: str = "",
steps: List[WorkflowStep] = [],
variables: Dict[str, Any] = {},
file_path: Optional[str] = None,
default_agent_config: Optional[Dict[str, Any]] = None,
default_llm: Optional[str] = None,
memory_config: Optional[Dict[str, Any]] = None,
planning: bool = False,
planning_llm: Optional[str] = None
)
Parameters
| Parameter | Type | Default | Description |
|---|
name | str | required | Workflow name |
description | str | "" | Workflow description |
steps | List[WorkflowStep] | [] | List of workflow steps |
variables | Dict[str, Any] | {} | Default variables |
file_path | Optional[str] | None | Source file path |
default_agent_config | Optional[Dict[str, Any]] | None | Default agent config for all steps |
default_llm | Optional[str] | None | Default LLM model |
memory_config | Optional[Dict[str, Any]] | None | Memory configuration |
planning | bool | False | Enable planning mode |
planning_llm | Optional[str] | None | LLM for planning |
Example
workflow = Workflow(
name="research_pipeline",
description="Multi-agent research workflow",
default_llm="gpt-4o-mini",
planning=True,
steps=[
WorkflowStep(name="research", action="Research AI"),
WorkflowStep(name="write", action="Write report")
],
variables={"topic": "AI trends"}
)
WorkflowManager
The main class for managing and executing workflows.
Constructor
WorkflowManager(
workspace_path: Optional[str] = None,
verbose: int = 0
)
Parameters
| Parameter | Type | Default | Description |
|---|
workspace_path | Optional[str] | None | Path to workspace (defaults to cwd) |
verbose | int | 0 | Verbosity level (0-3) |
Methods
execute()
Execute a workflow synchronously.
def execute(
workflow_name: str,
executor: Optional[Callable[[str], str]] = None,
variables: Optional[Dict[str, Any]] = None,
on_step: Optional[Callable[[WorkflowStep, int], None]] = None,
on_result: Optional[Callable[[WorkflowStep, str], None]] = None,
default_agent: Optional[Any] = None,
default_llm: Optional[str] = None,
memory: Optional[Any] = None,
planning: bool = False,
stream: bool = False,
verbose: int = 0,
checkpoint: Optional[str] = None,
resume: Optional[str] = None
) -> Dict[str, Any]
Parameters
| Parameter | Type | Default | Description |
|---|
workflow_name | str | required | Name of workflow to execute |
executor | Optional[Callable] | None | Function to execute each step |
variables | Optional[Dict] | None | Variables to substitute |
on_step | Optional[Callable] | None | Callback before each step |
on_result | Optional[Callable] | None | Callback after each step |
default_agent | Optional[Any] | None | Default agent for steps |
default_llm | Optional[str] | None | Default LLM model |
memory | Optional[Any] | None | Shared memory instance |
planning | bool | False | Enable planning mode |
stream | bool | False | Enable streaming output |
verbose | int | 0 | Verbosity level |
checkpoint | Optional[str] | None | Save checkpoint after each step with this name |
resume | Optional[str] | None | Resume from checkpoint with this name |
Returns
{
"success": bool,
"workflow": str,
"results": [
{
"step": str,
"status": "success" | "failed" | "skipped",
"output": str | None,
"error": str | None
}
],
"variables": Dict[str, Any]
}
Example
from praisonaiagents import Agent
from praisonaiagents.memory import WorkflowManager
agent = Agent(name="Assistant", llm="gpt-4o-mini")
manager = WorkflowManager()
result = manager.execute(
"deploy",
default_agent=agent,
variables={"environment": "production"},
on_step=lambda step, i: print(f"Starting: {step.name}"),
on_result=lambda step, output: print(f"Done: {step.name}")
)
if result["success"]:
print("Workflow completed!")
aexecute()
Execute a workflow asynchronously.
async def aexecute(
workflow_name: str,
executor: Optional[Callable[[str], str]] = None,
variables: Optional[Dict[str, Any]] = None,
on_step: Optional[Callable[[WorkflowStep, int], None]] = None,
on_result: Optional[Callable[[WorkflowStep, str], None]] = None,
default_agent: Optional[Any] = None,
default_llm: Optional[str] = None,
memory: Optional[Any] = None,
planning: bool = False,
stream: bool = False,
verbose: int = 0
) -> Dict[str, Any]
Parameters
Same as execute().
Example
import asyncio
from praisonaiagents.memory import WorkflowManager
manager = WorkflowManager()
async def main():
# Run multiple workflows concurrently
results = await asyncio.gather(
manager.aexecute("research", default_llm="gpt-4o-mini"),
manager.aexecute("analysis", default_llm="gpt-4o-mini"),
)
return results
results = asyncio.run(main())
list_workflows()
List all available workflows.
def list_workflows() -> List[Workflow]
Returns
List of Workflow objects.
Example
manager = WorkflowManager()
workflows = manager.list_workflows()
for workflow in workflows:
print(f"{workflow.name}: {len(workflow.steps)} steps")
get_workflow()
Get a specific workflow by name.
def get_workflow(name: str) -> Optional[Workflow]
Parameters
| Parameter | Type | Description |
|---|
name | str | Workflow name (case-insensitive) |
Returns
Workflow object or None if not found.
create_workflow()
Create a new workflow file.
def create_workflow(
name: str,
description: str = "",
steps: Optional[List[Dict[str, str]]] = None,
variables: Optional[Dict[str, Any]] = None
) -> Workflow
Parameters
| Parameter | Type | Default | Description |
|---|
name | str | required | Workflow name |
description | str | "" | Workflow description |
steps | Optional[List[Dict]] | None | List of step definitions |
variables | Optional[Dict] | None | Default variables |
Example
manager = WorkflowManager()
workflow = manager.create_workflow(
name="Code Review",
description="Review code changes",
steps=[
{"name": "Lint", "action": "Run linting"},
{"name": "Test", "action": "Run tests"},
{"name": "Review", "action": "Review code"}
],
variables={"branch": "main"}
)
get_stats()
Get workflow statistics.
def get_stats() -> Dict[str, Any]
Returns
{
"total_workflows": int,
"total_steps": int,
"workflows_dir": str
}
reload()
Reload workflows from disk.
Variable Substitution
Workflows support variable substitution using {{variable}} syntax:
| Variable | Description |
|---|
{{variable_name}} | User-defined variable |
{{previous_output}} | Output from previous step |
{{step_name_output}} | Output from specific step |
Example
workflow = Workflow(
name="pipeline",
variables={"topic": "AI"},
steps=[
WorkflowStep(
name="research",
action="Research {{topic}}",
output_variable="research_data"
),
WorkflowStep(
name="analyze",
action="Analyze: {{research_data}}"
),
WorkflowStep(
name="write",
action="Write about {{previous_output}}"
)
]
)
list_checkpoints()
List all saved workflow checkpoints.
def list_checkpoints() -> List[Dict[str, Any]]
Returns
List of checkpoint info dicts with keys: name, workflow, completed_steps, saved_at.
Example
manager = WorkflowManager()
checkpoints = manager.list_checkpoints()
for cp in checkpoints:
print(f"{cp['name']}: {cp['completed_steps']} steps completed")
delete_checkpoint()
Delete a saved checkpoint.
def delete_checkpoint(name: str) -> bool
Parameters
| Parameter | Type | Description |
|---|
name | str | Checkpoint name to delete |
Returns
True if deleted successfully, False if not found.
Example
manager = WorkflowManager()
# Execute with checkpoint
result = manager.execute("deploy", checkpoint="deploy-v1")
# Resume if interrupted
result = manager.execute("deploy", resume="deploy-v1")
# Clean up
manager.delete_checkpoint("deploy-v1")
Workflow Patterns
PraisonAI provides helper functions for common workflow patterns.
Import
from praisonaiagents import Workflow, WorkflowContext, StepResult
from praisonaiagents.memory.workflows import route, parallel, loop, repeat
route() - Decision-Based Branching
Routes to different steps based on the previous output.
route(
routes: Dict[str, List], # Key: pattern to match, Value: steps to execute
default: Optional[List] = None # Fallback steps
) -> Route
Example:
workflow = Workflow(steps=[
classify_request, # Returns "approve" or "reject"
route({
"approve": [approve_handler, notify_user],
"reject": [reject_handler],
"default": [fallback_handler]
})
])
parallel() - Concurrent Execution
Execute multiple steps concurrently and combine results.
parallel(steps: List) -> Parallel
Example:
workflow = Workflow(steps=[
parallel([research_market, research_competitors, research_customers]),
summarize_results # Access via ctx.variables["parallel_outputs"]
])
loop() - Iterate Over Data
Execute a step for each item in a list, CSV file, or text file.
loop(
step: Any, # Step to execute for each item
over: Optional[str] = None, # Variable name containing list
from_csv: Optional[str] = None, # CSV file path
from_file: Optional[str] = None, # Text file path
var_name: str = "item" # Variable name for current item
) -> Loop
Examples:
# Loop over list variable
workflow = Workflow(
steps=[loop(process_item, over="items")],
variables={"items": ["a", "b", "c"]}
)
# Loop over CSV file
workflow = Workflow(steps=[
loop(process_row, from_csv="data.csv")
])
In your handler, access the current item:
def process_item(ctx: WorkflowContext) -> StepResult:
item = ctx.variables["item"] # Current item
index = ctx.variables["loop_index"] # Current index
return StepResult(output=f"Processed: {item}")
repeat() - Evaluator-Optimizer Pattern
Repeat a step until a condition is met.
repeat(
step: Any, # Step to repeat
until: Optional[Callable[[WorkflowContext], bool]] = None, # Stop condition
max_iterations: int = 10 # Maximum iterations
) -> Repeat
Example:
def is_complete(ctx: WorkflowContext) -> bool:
return "done" in ctx.previous_result.lower()
workflow = Workflow(steps=[
repeat(
generator,
until=is_complete,
max_iterations=5
)
])
Pattern Combinations
Patterns can be combined for complex workflows:
workflow = Workflow(steps=[
# Step 1: Parallel research
parallel([research_a, research_b]),
# Step 2: Route based on findings
route({
"positive": [expand_research],
"negative": [summarize_and_stop]
}),
# Step 3: Iterate over results
loop(process_finding, over="findings"),
# Step 4: Repeat until quality threshold
repeat(refine_output, until=is_high_quality, max_iterations=3)
])
See Also