Skip to main content

Workflow API

PraisonAI provides a simple, powerful workflow system for chaining agents and functions.

Quick Start

from praisonaiagents import Workflow, WorkflowContext, StepResult

def validate(ctx: WorkflowContext) -> StepResult:
    return StepResult(output=f"Valid: {ctx.input}")

def process(ctx: WorkflowContext) -> StepResult:
    return StepResult(output=f"Done: {ctx.previous_result}")

workflow = Workflow(steps=[validate, process])
result = workflow.start("Hello World")
print(result["output"])  # "Done: Valid: Hello World"

Import

from praisonaiagents import Workflow, WorkflowStep, WorkflowContext, StepResult
# Pipeline is an alias for Workflow (same thing)
from praisonaiagents import Pipeline
# Or from workflows module
from praisonaiagents.workflows import WorkflowManager, Workflow, WorkflowStep
# Pattern helpers
from praisonaiagents.workflows import route, parallel, loop, repeat
Pipeline and Workflow are interchangeable - they refer to the same class. Use whichever term fits your mental model better.

Callbacks

Workflow supports callbacks for monitoring and custom logic:
def on_start(workflow, input_text):
    print(f"Starting workflow with: {input_text}")

def on_complete(workflow, result):
    print(f"Workflow completed: {result['status']}")

def on_step_start(step_name, context):
    print(f"Starting step: {step_name}")

def on_step_complete(step_name, result):
    print(f"Step {step_name} completed: {result.output[:50]}...")

def on_step_error(step_name, error):
    print(f"Step {step_name} failed: {error}")

workflow = Workflow(
    steps=[step1, step2],
    on_workflow_start=on_start,
    on_workflow_complete=on_complete,
    on_step_start=on_step_start,
    on_step_complete=on_step_complete,
    on_step_error=on_step_error
)

Guardrails

Add validation to steps with automatic retry:
def validate_output(result):
    if "error" in result.output.lower():
        return (False, "Output contains error, please fix")
    return (True, None)

workflow = Workflow(steps=[
    WorkflowStep(
        name="generator",
        handler=my_generator,
        guardrail=validate_output,
        max_retries=3
    )
])
When validation fails:
  1. The step is retried (up to max_retries)
  2. Validation feedback is passed to the step via ctx.variables["validation_feedback"]
  3. For agent steps, feedback is appended to the prompt

Status Tracking

Track workflow and step execution status:
workflow = Workflow(steps=[step1, step2])
print(workflow.status)  # "not_started"

result = workflow.start("input")
print(workflow.status)  # "completed"
print(workflow.step_statuses)  # {"step1": "completed", "step2": "completed"}

# Result includes status
print(result["status"])  # "completed"
print(result["steps"][0]["status"])  # "completed"
print(result["steps"][0]["retries"])  # 0

WorkflowContext

Context passed to step handlers containing workflow state.

Constructor

WorkflowContext(
    input: str = "",
    previous_result: Optional[str] = None,
    current_step: str = "",
    variables: Dict[str, Any] = {}
)

Attributes

AttributeTypeDescription
inputstrOriginal workflow input
previous_resultOptional[str]Output from previous step
current_stepstrCurrent step name
variablesDict[str, Any]All workflow variables

StepResult

Result returned from step handlers.

Constructor

StepResult(
    output: str = "",
    stop_workflow: bool = False,
    variables: Dict[str, Any] = {}
)

Attributes

AttributeTypeDefaultDescription
outputstr""Step output content
stop_workflowboolFalseIf True, stop the entire workflow
variablesDict[str, Any]{}Variables to add/update

Example

def validate(ctx: WorkflowContext) -> StepResult:
    if "error" in ctx.input:
        return StepResult(output="Invalid", stop_workflow=True)
    return StepResult(output="Valid", variables={"validated": True})

Workflow

A complete workflow with multiple steps.

Constructor

Workflow(
    name: str = "Workflow",
    description: str = "",
    steps: List = [],
    variables: Dict[str, Any] = {},
    default_llm: Optional[str] = None,
    default_agent_config: Optional[Dict[str, Any]] = None
)

Parameters

ParameterTypeDefaultDescription
namestr"Workflow"Workflow name
descriptionstr""Workflow description
stepsList[]List of steps (Agent, function, or WorkflowStep)
variablesDict[str, Any]{}Initial variables
default_llmOptional[str]NoneDefault LLM for action-based steps
default_agent_configOptional[Dict]NoneDefault agent config
planningboolFalseEnable planning mode
planning_llmOptional[str]NoneLLM for planning
reasoningboolFalseEnable chain-of-thought reasoning
verboseboolFalseEnable verbose output
memory_configOptional[Dict]NoneMemory configuration

Methods

start()

Run the workflow with the given input.
def start(
    input: str = "",
    llm: Optional[str] = None,
    verbose: bool = False
) -> Dict[str, Any]
ParameterTypeDefaultDescription
inputstr""Input text for the workflow
llmOptional[str]NoneLLM model override
verboseboolFalsePrint step progress
Returns: Dict with output, steps, variables, and status

astart() / arun()

Async version of start() for async workflow execution.
async def astart(
    input: str = "",
    llm: Optional[str] = None,
    verbose: bool = False
) -> Dict[str, Any]
Example:
import asyncio

async def main():
    workflow = Workflow(steps=[step1, step2])
    result = await workflow.astart("Hello World")
    print(result["output"])

asyncio.run(main())

Step Types

Workflows accept three types of steps:
  1. Functions - Automatically wrapped as handlers
  2. Agents - Executed with the input
  3. WorkflowStep - Full configuration
from praisonaiagents import Workflow, Agent, WorkflowStep

workflow = Workflow(
    steps=[
        my_function,                    # Function
        Agent(name="Writer", ...),      # Agent
        WorkflowStep(name="custom", handler=my_handler)  # WorkflowStep
    ]
)

WorkflowStep

A dataclass representing a single step in a workflow.

Constructor

WorkflowStep(
    name: str,
    description: str = "",
    action: str = "",
    handler: Optional[Callable] = None,
    should_run: Optional[Callable] = None,
    agent: Optional[Agent] = None,
    agent_config: Optional[Dict[str, Any]] = None,
    condition: Optional[str] = None,
    on_error: Literal["stop", "continue", "retry"] = "stop",
    max_retries: int = 1,
    context_from: Optional[List[str]] = None,
    retain_full_context: bool = True,
    output_variable: Optional[str] = None,
    tools: Optional[List[Any]] = None,
    next_steps: Optional[List[str]] = None,
    branch_condition: Optional[Dict[str, List[str]]] = None,
    loop_over: Optional[str] = None,
    loop_var: str = "item"
)

Parameters

ParameterTypeDefaultDescription
namestrrequiredStep name
descriptionstr""Step description
actionstr""The action/prompt to execute
handlerOptional[Callable]NoneCustom function (ctx) -> StepResult
should_runOptional[Callable]NoneCondition function (ctx) -> bool
agentOptional[Agent]NoneDirect Agent instance
agent_configOptional[Dict]NonePer-step agent configuration
conditionOptional[str]NoneCondition string for execution
on_errorLiteral[...]"stop"Error handling: “stop”, “continue”, “retry”
max_retriesint1Maximum retry attempts
context_fromOptional[List[str]]NoneSteps to include context from
retain_full_contextboolTrueInclude all previous outputs
output_variableOptional[str]NoneCustom variable name for output
toolsOptional[List[Any]]NoneTools for this step
next_stepsOptional[List[str]]NoneNext step names for branching
branch_conditionOptional[Dict]NoneConditional branching rules
loop_overOptional[str]NoneVariable name to iterate over
loop_varstr"item"Variable name for current item
guardrailOptional[Callable]NoneValidation function (result) -> (bool, feedback)
output_fileOptional[str]NoneSave step output to file
output_jsonOptional[Any]NonePydantic model for JSON output
output_pydanticOptional[Any]NonePydantic model for structured output
imagesOptional[List[str]]NoneImage paths/URLs for vision tasks
async_executionboolFalseMark step for async execution
quality_checkboolTrueEnable quality validation
rerunboolTrueAllow step to be rerun

Handler Function

Custom handler functions receive WorkflowContext and return StepResult:
def my_handler(ctx: WorkflowContext) -> StepResult:
    # Access context
    print(f"Input: {ctx.input}")
    print(f"Previous: {ctx.previous_result}")
    print(f"Variables: {ctx.variables}")
    
    # Return result
    return StepResult(
        output="Step completed",
        stop_workflow=False,  # Set True to stop workflow
        variables={"key": "value"}  # Add/update variables
    )

should_run Function

Conditional execution - return True to run the step, False to skip:
def is_sensitive(ctx: WorkflowContext) -> bool:
    return "legal" in ctx.input.lower()

step = WorkflowStep(
    name="compliance",
    handler=check_compliance,
    should_run=is_sensitive  # Only runs for sensitive content
)

Agent Config Options

When using agent_config, you can specify:
KeyTypeDescription
rolestrAgent role (e.g., “Researcher”)
goalstrAgent goal
backstorystrAgent backstory
llmstrLLM model override
verboseboolEnable verbose output

Example

step = WorkflowStep(
    name="research",
    action="Research {{topic}}",
    agent_config={
        "role": "Researcher",
        "goal": "Find comprehensive information",
        "backstory": "Expert researcher"
    },
    tools=["tavily_search"],
    output_variable="research_data"
)

Branching Example

# Decision step with conditional branching
decision_step = WorkflowStep(
    name="evaluate",
    action="Evaluate if the task is complete. Reply with 'success' or 'failure'.",
    next_steps=["success_handler", "failure_handler"],
    branch_condition={
        "success": ["success_handler"],
        "failure": ["failure_handler"]
    }
)

Loop Example

# Loop step that iterates over a list
loop_step = WorkflowStep(
    name="process_items",
    action="Process item: {{current_item}}",
    loop_over="items",      # Variable containing the list
    loop_var="current_item" # Variable name for each item
)

# Execute with items
result = manager.execute(
    "my_workflow",
    variables={"items": ["item1", "item2", "item3"]}
)

Workflow

A dataclass representing a complete workflow with multiple steps.

Constructor

Workflow(
    name: str,
    description: str = "",
    steps: List[WorkflowStep] = [],
    variables: Dict[str, Any] = {},
    file_path: Optional[str] = None,
    default_agent_config: Optional[Dict[str, Any]] = None,
    default_llm: Optional[str] = None,
    memory_config: Optional[Dict[str, Any]] = None,
    planning: bool = False,
    planning_llm: Optional[str] = None
)

Parameters

ParameterTypeDefaultDescription
namestrrequiredWorkflow name
descriptionstr""Workflow description
stepsList[WorkflowStep][]List of workflow steps
variablesDict[str, Any]{}Default variables
file_pathOptional[str]NoneSource file path
default_agent_configOptional[Dict[str, Any]]NoneDefault agent config for all steps
default_llmOptional[str]NoneDefault LLM model
memory_configOptional[Dict[str, Any]]NoneMemory configuration
planningboolFalseEnable planning mode
planning_llmOptional[str]NoneLLM for planning

Example

workflow = Workflow(
    name="research_pipeline",
    description="Multi-agent research workflow",
    default_llm="gpt-4o-mini",
    planning=True,
    steps=[
        WorkflowStep(name="research", action="Research AI"),
        WorkflowStep(name="write", action="Write report")
    ],
    variables={"topic": "AI trends"}
)

WorkflowManager

The main class for managing and executing workflows.

Constructor

WorkflowManager(
    workspace_path: Optional[str] = None,
    verbose: int = 0
)

Parameters

ParameterTypeDefaultDescription
workspace_pathOptional[str]NonePath to workspace (defaults to cwd)
verboseint0Verbosity level (0-3)

Methods

execute()

Execute a workflow synchronously.
def execute(
    workflow_name: str,
    executor: Optional[Callable[[str], str]] = None,
    variables: Optional[Dict[str, Any]] = None,
    on_step: Optional[Callable[[WorkflowStep, int], None]] = None,
    on_result: Optional[Callable[[WorkflowStep, str], None]] = None,
    default_agent: Optional[Any] = None,
    default_llm: Optional[str] = None,
    memory: Optional[Any] = None,
    planning: bool = False,
    stream: bool = False,
    verbose: int = 0,
    checkpoint: Optional[str] = None,
    resume: Optional[str] = None
) -> Dict[str, Any]

Parameters

ParameterTypeDefaultDescription
workflow_namestrrequiredName of workflow to execute
executorOptional[Callable]NoneFunction to execute each step
variablesOptional[Dict]NoneVariables to substitute
on_stepOptional[Callable]NoneCallback before each step
on_resultOptional[Callable]NoneCallback after each step
default_agentOptional[Any]NoneDefault agent for steps
default_llmOptional[str]NoneDefault LLM model
memoryOptional[Any]NoneShared memory instance
planningboolFalseEnable planning mode
streamboolFalseEnable streaming output
verboseint0Verbosity level
checkpointOptional[str]NoneSave checkpoint after each step with this name
resumeOptional[str]NoneResume from checkpoint with this name

Returns

{
    "success": bool,
    "workflow": str,
    "results": [
        {
            "step": str,
            "status": "success" | "failed" | "skipped",
            "output": str | None,
            "error": str | None
        }
    ],
    "variables": Dict[str, Any]
}

Example

from praisonaiagents import Agent
from praisonaiagents.memory import WorkflowManager

agent = Agent(name="Assistant", llm="gpt-4o-mini")
manager = WorkflowManager()

result = manager.execute(
    "deploy",
    default_agent=agent,
    variables={"environment": "production"},
    on_step=lambda step, i: print(f"Starting: {step.name}"),
    on_result=lambda step, output: print(f"Done: {step.name}")
)

if result["success"]:
    print("Workflow completed!")

aexecute()

Execute a workflow asynchronously.
async def aexecute(
    workflow_name: str,
    executor: Optional[Callable[[str], str]] = None,
    variables: Optional[Dict[str, Any]] = None,
    on_step: Optional[Callable[[WorkflowStep, int], None]] = None,
    on_result: Optional[Callable[[WorkflowStep, str], None]] = None,
    default_agent: Optional[Any] = None,
    default_llm: Optional[str] = None,
    memory: Optional[Any] = None,
    planning: bool = False,
    stream: bool = False,
    verbose: int = 0
) -> Dict[str, Any]

Parameters

Same as execute().

Example

import asyncio
from praisonaiagents.memory import WorkflowManager

manager = WorkflowManager()

async def main():
    # Run multiple workflows concurrently
    results = await asyncio.gather(
        manager.aexecute("research", default_llm="gpt-4o-mini"),
        manager.aexecute("analysis", default_llm="gpt-4o-mini"),
    )
    return results

results = asyncio.run(main())

list_workflows()

List all available workflows.
def list_workflows() -> List[Workflow]

Returns

List of Workflow objects.

Example

manager = WorkflowManager()
workflows = manager.list_workflows()

for workflow in workflows:
    print(f"{workflow.name}: {len(workflow.steps)} steps")

get_workflow()

Get a specific workflow by name.
def get_workflow(name: str) -> Optional[Workflow]

Parameters

ParameterTypeDescription
namestrWorkflow name (case-insensitive)

Returns

Workflow object or None if not found.

create_workflow()

Create a new workflow file.
def create_workflow(
    name: str,
    description: str = "",
    steps: Optional[List[Dict[str, str]]] = None,
    variables: Optional[Dict[str, Any]] = None
) -> Workflow

Parameters

ParameterTypeDefaultDescription
namestrrequiredWorkflow name
descriptionstr""Workflow description
stepsOptional[List[Dict]]NoneList of step definitions
variablesOptional[Dict]NoneDefault variables

Example

manager = WorkflowManager()

workflow = manager.create_workflow(
    name="Code Review",
    description="Review code changes",
    steps=[
        {"name": "Lint", "action": "Run linting"},
        {"name": "Test", "action": "Run tests"},
        {"name": "Review", "action": "Review code"}
    ],
    variables={"branch": "main"}
)

get_stats()

Get workflow statistics.
def get_stats() -> Dict[str, Any]

Returns

{
    "total_workflows": int,
    "total_steps": int,
    "workflows_dir": str
}

reload()

Reload workflows from disk.
def reload() -> None

Variable Substitution

Workflows support variable substitution using {{variable}} syntax:
VariableDescription
{{variable_name}}User-defined variable
{{previous_output}}Output from previous step
{{step_name_output}}Output from specific step

Example

workflow = Workflow(
    name="pipeline",
    variables={"topic": "AI"},
    steps=[
        WorkflowStep(
            name="research",
            action="Research {{topic}}",
            output_variable="research_data"
        ),
        WorkflowStep(
            name="analyze",
            action="Analyze: {{research_data}}"
        ),
        WorkflowStep(
            name="write",
            action="Write about {{previous_output}}"
        )
    ]
)

list_checkpoints()

List all saved workflow checkpoints.
def list_checkpoints() -> List[Dict[str, Any]]

Returns

List of checkpoint info dicts with keys: name, workflow, completed_steps, saved_at.

Example

manager = WorkflowManager()
checkpoints = manager.list_checkpoints()

for cp in checkpoints:
    print(f"{cp['name']}: {cp['completed_steps']} steps completed")

delete_checkpoint()

Delete a saved checkpoint.
def delete_checkpoint(name: str) -> bool

Parameters

ParameterTypeDescription
namestrCheckpoint name to delete

Returns

True if deleted successfully, False if not found.

Example

manager = WorkflowManager()

# Execute with checkpoint
result = manager.execute("deploy", checkpoint="deploy-v1")

# Resume if interrupted
result = manager.execute("deploy", resume="deploy-v1")

# Clean up
manager.delete_checkpoint("deploy-v1")

Workflow Patterns

PraisonAI provides helper functions for common workflow patterns.

Import

from praisonaiagents import Workflow, WorkflowContext, StepResult
from praisonaiagents.memory.workflows import route, parallel, loop, repeat

route() - Decision-Based Branching

Routes to different steps based on the previous output.
route(
    routes: Dict[str, List],  # Key: pattern to match, Value: steps to execute
    default: Optional[List] = None  # Fallback steps
) -> Route
Example:
workflow = Workflow(steps=[
    classify_request,  # Returns "approve" or "reject"
    route({
        "approve": [approve_handler, notify_user],
        "reject": [reject_handler],
        "default": [fallback_handler]
    })
])

parallel() - Concurrent Execution

Execute multiple steps concurrently and combine results.
parallel(steps: List) -> Parallel
Example:
workflow = Workflow(steps=[
    parallel([research_market, research_competitors, research_customers]),
    summarize_results  # Access via ctx.variables["parallel_outputs"]
])

loop() - Iterate Over Data

Execute a step for each item in a list, CSV file, or text file.
loop(
    step: Any,                    # Step to execute for each item
    over: Optional[str] = None,   # Variable name containing list
    from_csv: Optional[str] = None,  # CSV file path
    from_file: Optional[str] = None, # Text file path
    var_name: str = "item"        # Variable name for current item
) -> Loop
Examples:
# Loop over list variable
workflow = Workflow(
    steps=[loop(process_item, over="items")],
    variables={"items": ["a", "b", "c"]}
)

# Loop over CSV file
workflow = Workflow(steps=[
    loop(process_row, from_csv="data.csv")
])
In your handler, access the current item:
def process_item(ctx: WorkflowContext) -> StepResult:
    item = ctx.variables["item"]  # Current item
    index = ctx.variables["loop_index"]  # Current index
    return StepResult(output=f"Processed: {item}")

repeat() - Evaluator-Optimizer Pattern

Repeat a step until a condition is met.
repeat(
    step: Any,                                    # Step to repeat
    until: Optional[Callable[[WorkflowContext], bool]] = None,  # Stop condition
    max_iterations: int = 10                      # Maximum iterations
) -> Repeat
Example:
def is_complete(ctx: WorkflowContext) -> bool:
    return "done" in ctx.previous_result.lower()

workflow = Workflow(steps=[
    repeat(
        generator,
        until=is_complete,
        max_iterations=5
    )
])

Pattern Combinations

Patterns can be combined for complex workflows:
workflow = Workflow(steps=[
    # Step 1: Parallel research
    parallel([research_a, research_b]),
    
    # Step 2: Route based on findings
    route({
        "positive": [expand_research],
        "negative": [summarize_and_stop]
    }),
    
    # Step 3: Iterate over results
    loop(process_finding, over="findings"),
    
    # Step 4: Repeat until quality threshold
    repeat(refine_output, until=is_high_quality, max_iterations=3)
])

See Also