Skip to main content

Task Configuration

This page provides comprehensive documentation for task configuration parameters, including task types, conditional execution, task chaining, and workflow control.

Core Task Parameters

Task Type Configuration

task_type

  • Type: str
  • Default: "normal"
  • Options: "normal", "decision", "loop", "parallel", "sequential"
  • Description: Defines the execution behavior and flow control of the task
The task_type parameter determines how a task is executed within the workflow and how it interacts with other tasks.
from praisonaiagents import Task

# Normal task - standard execution
task = Task(
    description="Analyze data",
    task_type="normal",
    expected_output="Data analysis report"
)

# Decision task - conditional branching
decision_task = Task(
    description="Evaluate results",
    task_type="decision",
    condition={
        "field": "score",
        "operator": ">=",
        "value": 0.8
    },
    next_tasks=["success_task", "retry_task"]
)

Task Type Details

Normal Tasks

Standard tasks that execute sequentially with defined inputs and outputs.
normal_task = Task(
    name="data_processing",
    description="Process customer data",
    task_type="normal",
    agent=data_agent,
    expected_output="Processed data summary"
)

Decision Tasks

Tasks that evaluate conditions and determine workflow paths.
decision_task = Task(
    name="quality_check",
    description="Check data quality",
    task_type="decision",
    condition={
        "type": "compound",
        "operator": "AND",
        "conditions": [
            {"field": "completeness", "operator": ">=", "value": 0.95},
            {"field": "accuracy", "operator": ">=", "value": 0.98}
        ]
    },
    next_tasks={
        "true": "publish_task",
        "false": "cleanup_task"
    }
)

Loop Tasks

Tasks that repeat based on conditions or iterations.
loop_task = Task(
    name="iterative_improvement",
    description="Improve results iteratively",
    task_type="loop",
    loop_state={
        "max_iterations": 5,
        "condition": {
            "field": "quality_score",
            "operator": "<",
            "value": 0.9
        },
        "increment_field": "iteration_count"
    }
)

Workflow Control Parameters

condition

  • Type: Dict[str, Any]
  • Default: None
  • Description: Defines conditions for task execution or branching
Complex condition structures for advanced workflow control:
# Simple condition
simple_condition = {
    "field": "status",
    "operator": "==",
    "value": "completed"
}

# Compound condition with AND/OR logic
compound_condition = {
    "type": "compound",
    "operator": "OR",
    "conditions": [
        {
            "type": "compound",
            "operator": "AND",
            "conditions": [
                {"field": "priority", "operator": "==", "value": "high"},
                {"field": "deadline", "operator": "<", "value": "2024-12-31"}
            ]
        },
        {"field": "override", "operator": "==", "value": True}
    ]
}

# Function-based condition
function_condition = {
    "type": "function",
    "function": "evaluate_complex_criteria",
    "params": {
        "threshold": 0.85,
        "include_history": True
    }
}

next_tasks

  • Type: Union[List[str], Dict[str, str]]
  • Default: []
  • Description: Defines subsequent tasks in the workflow
Controls task sequencing and branching:
# Sequential flow
sequential_task = Task(
    name="step1",
    next_tasks=["step2", "step3"]  # Execute both after completion
)

# Conditional branching
branching_task = Task(
    name="router",
    task_type="decision",
    next_tasks={
        "success": "success_handler",
        "failure": "error_handler",
        "retry": "retry_task"
    }
)

# Dynamic task selection
dynamic_task = Task(
    name="analyzer",
    next_tasks={
        "type": "dynamic",
        "selector": "select_next_task",
        "params": {"strategy": "priority"}
    }
)

is_start

  • Type: bool
  • Default: False
  • Description: Marks the task as a workflow entry point
Identifies tasks that can initiate workflow execution:
# Entry point task
start_task = Task(
    name="initialization",
    description="Initialize workflow",
    is_start=True,
    next_tasks=["validation", "setup"]
)

# Multiple entry points
alternative_start = Task(
    name="quick_start",
    description="Fast initialization path",
    is_start=True,
    config={"skip_validation": True}
)

Advanced Task Configuration

Complex Workflow Patterns

Parallel Execution Pattern

# Parent task that spawns parallel tasks
parallel_parent = Task(
    name="parallel_processor",
    description="Process multiple data sources",
    task_type="parallel",
    config={
        "parallel_tasks": [
            {"name": "source_a", "timeout": 30},
            {"name": "source_b", "timeout": 45},
            {"name": "source_c", "timeout": 60}
        ],
        "wait_for_all": True,
        "failure_threshold": 1  # Allow 1 failure
    }
)

Sequential Pipeline Pattern

# Pipeline configuration
pipeline_task = Task(
    name="data_pipeline",
    description="Multi-stage data processing",
    task_type="sequential",
    config={
        "stages": [
            {"name": "extract", "retry": 3},
            {"name": "transform", "timeout": 120},
            {"name": "load", "checkpoint": True}
        ],
        "stop_on_failure": True
    }
)

Task Dependencies and Context

# Task with complex dependencies
dependent_task = Task(
    name="aggregator",
    description="Aggregate results from multiple sources",
    context=["analyzer_task", "validator_task"],  # Require outputs
    config={
        "wait_timeout": 300,  # 5 minutes
        "partial_results_allowed": True,
        "minimum_dependencies": 1
    }
)

Error Handling and Recovery

# Robust task configuration
robust_task = Task(
    name="critical_operation",
    description="Critical business logic",
    config={
        "error_handling": {
            "retry_count": 3,
            "retry_delay": 5,  # seconds
            "exponential_backoff": True,
            "fallback_task": "emergency_handler",
            "error_types": {
                "NetworkError": {"retry": True, "delay": 10},
                "ValidationError": {"retry": False, "escalate": True}
            }
        },
        "timeout": 180,
        "checkpoint": True,
        "rollback_on_failure": True
    }
)

Task State Management

Loop State Configuration

# Advanced loop configuration
iterative_task = Task(
    name="optimizer",
    task_type="loop",
    loop_state={
        "variables": {
            "iteration": 0,
            "best_score": 0.0,
            "history": []
        },
        "conditions": {
            "continue": {
                "type": "compound",
                "operator": "AND",
                "conditions": [
                    {"field": "iteration", "operator": "<", "value": 10},
                    {"field": "improvement", "operator": ">", "value": 0.01}
                ]
            }
        },
        "updates": {
            "iteration": {"operation": "increment", "value": 1},
            "history": {"operation": "append", "field": "current_score"},
            "best_score": {"operation": "max", "field": "current_score"}
        }
    }
)

Configuration Best Practices

Task Naming Conventions

# Good naming practices
task = Task(
    name="validate_user_input",  # Descriptive, action-oriented
    description="Validate and sanitize user-provided data",
    config={
        "naming_convention": "verb_noun_qualifier",
        "log_prefix": "[VALIDATION]"
    }
)

Performance Optimization

# Optimized task configuration
optimized_task = Task(
    name="batch_processor",
    config={
        "batch_size": 100,
        "parallel_workers": 4,
        "memory_limit": "2GB",
        "cpu_affinity": [0, 1, 2, 3],
        "priority": "high",
        "preload_resources": True
    }
)

Environment Variables

Task behavior can be configured via environment variables:
# Task execution settings
export PRAISONAI_TASK_DEFAULT_TIMEOUT=300
export PRAISONAI_TASK_MAX_RETRIES=3
export PRAISONAI_TASK_RETRY_DELAY=5

# Workflow settings
export PRAISONAI_WORKFLOW_MAX_DEPTH=10
export PRAISONAI_WORKFLOW_PARALLEL_LIMIT=5

# Debug settings
export PRAISONAI_TASK_DEBUG=true
export PRAISONAI_TASK_TRACE_EXECUTION=true

Validation Rules

Parameter Constraints

ParameterValidationConstraints
task_typeMust be valid typeOne of: normal, decision, loop, parallel, sequential
conditionValid structureMust have field, operator, value or be compound
next_tasksValid task namesTasks must exist in workflow
is_startBooleanAt least one task must be marked as start

Condition Operators

OperatorDescriptionExample
==Equals{"field": "status", "operator": "==", "value": "done"}
!=Not equals{"field": "type", "operator": "!=", "value": "error"}
>, <Greater/Less than{"field": "score", "operator": ">", "value": 0.8}
>=, <=Greater/Less or equal{"field": "count", "operator": ">=", "value": 10}
inContains{"field": "category", "operator": "in", "value": ["A", "B"]}
regexPattern match{"field": "email", "operator": "regex", "value": ".*@company.com"}

See Also