Skip to main content
Nested workflows allow you to combine workflow patterns like loops, parallel execution, and routing to create sophisticated data processing pipelines.

Quick Start

from praisonaiagents import AgentFlow, WorkflowContext, StepResult
from praisonaiagents.workflows import loop

def process_cell(ctx: WorkflowContext) -> StepResult:
    row = ctx.variables.get("row")
    col = ctx.variables.get("col")
    return StepResult(output=f"Processed {row}-{col}")

# Inner loop processes columns
inner = loop(process_cell, over="columns", var_name="col")

# Outer loop processes rows
outer = loop(steps=[inner], over="rows", var_name="row")

workflow = AgentFlow(
    steps=[outer],
    variables={
        "rows": ["A", "B"],
        "columns": ["1", "2", "3"]
    }
)
workflow.start("Process matrix")

Supported Nesting Patterns

Loop in Loop

Process multi-dimensional data like matrices or hierarchical structures

Parallel in Loop

Run concurrent tasks for each item in a collection

Route in Loop

Make routing decisions for each item based on conditions

If in Loop

Apply conditional logic to each item during iteration

Loop Inside Loop

Process nested data structures by placing one loop inside another.
from praisonaiagents import AgentFlow, WorkflowContext, StepResult
from praisonaiagents.workflows import loop

results = []

def process_member(ctx: WorkflowContext) -> StepResult:
    team = ctx.variables.get("team", {})
    member = ctx.variables.get("member")
    results.append(f"{team['name']}/{member}")
    return StepResult(output=f"Processed {member}")

# Inner: process each member
member_loop = loop(process_member, over="members", var_name="member")

# Outer: process each team
team_loop = loop(steps=[member_loop], over="teams", var_name="team")

workflow = AgentFlow(
    steps=[team_loop],
    variables={
        "teams": [
            {"name": "Frontend", "members": ["Alice", "Bob"]},
            {"name": "Backend", "members": ["Charlie"]}
        ],
        "members": ["Dev1", "Dev2"]  # Default members
    }
)
workflow.start("Process organization")
# Results: ['Frontend/Dev1', 'Frontend/Dev2', 'Backend/Dev1', 'Backend/Dev2']

Parallel Inside Loop

Run multiple tasks concurrently for each item in a loop.
from praisonaiagents import AgentFlow, WorkflowContext, StepResult
from praisonaiagents.workflows import loop, parallel

def security_scan(ctx: WorkflowContext) -> StepResult:
    project = ctx.variables.get("project")
    return StepResult(output=f"Security scan: {project}")

def run_tests(ctx: WorkflowContext) -> StepResult:
    project = ctx.variables.get("project")
    return StepResult(output=f"Tests passed: {project}")

def code_review(ctx: WorkflowContext) -> StepResult:
    project = ctx.variables.get("project")
    return StepResult(output=f"Review complete: {project}")

# Parallel tasks for each project
parallel_tasks = parallel([security_scan, run_tests, code_review])

workflow = AgentFlow(
    steps=[
        loop(
            steps=[parallel_tasks],
            over="projects",
            var_name="project"
        )
    ],
    variables={"projects": ["api", "web", "mobile"]}
)
workflow.start("Analyze all projects")

Route Inside Loop

Apply different processing logic to each item based on conditions.
from praisonaiagents import AgentFlow, WorkflowContext, StepResult
from praisonaiagents.workflows import loop, route

def process_image(ctx: WorkflowContext) -> StepResult:
    return StepResult(output="Image processed")

def process_video(ctx: WorkflowContext) -> StepResult:
    return StepResult(output="Video processed")

def get_file_type(ctx: WorkflowContext) -> StepResult:
    file = ctx.variables.get("file", {})
    return StepResult(output=file.get("type", "unknown"))

# Route based on file type
type_router = route({
    "image": [process_image],
    "video": [process_video]
})

workflow = AgentFlow(
    steps=[
        loop(
            steps=[get_file_type, type_router],
            over="files",
            var_name="file"
        )
    ],
    variables={
        "files": [
            {"name": "photo.jpg", "type": "image"},
            {"name": "movie.mp4", "type": "video"}
        ]
    }
)
workflow.start("Process files")

Depth Limit Protection

Nested workflows have a maximum depth of 5 levels to prevent infinite recursion and stack overflow errors.
from praisonaiagents.workflows import MAX_NESTING_DEPTH

print(f"Maximum nesting depth: {MAX_NESTING_DEPTH}")  # 5

# This will raise ValueError if depth exceeds 5
try:
    # Creating 6+ levels of nesting
    inner = loop(handler, over="items")
    for _ in range(5):
        inner = loop(steps=[inner], over="items")
    
    workflow = AgentFlow(steps=[inner], variables={"items": ["a"]})
    workflow.start("test")
except ValueError as e:
    print(f"Error: {e}")
    # "Maximum nesting depth (5) exceeded..."

Best Practices

Aim for 2-3 levels of nesting maximum. If you need more, consider restructuring your workflow or breaking it into sub-workflows.
Use descriptive var_name values like customer, order, item instead of generic names like x, y, z.
When processing independent items, use parallel=True in loops to speed up execution.
Add error handling in your step functions to prevent one failed item from stopping the entire workflow.