from praisonaiagents import Agent, Task, Process
# Create specialized agents
data_agent = Agent(role="Data Processor", goal="Process customer data")
email_agent = Agent(role="Email Sender", goal="Send personalized emails")
validator = Agent(role="Validator", goal="Ensure quality")
manager = Agent(role="Manager", goal="Coordinate workflow")
# Create workflow tasks
tasks = {
"load": Task(
description="Load customer data",
agent=data_agent,
task_type="loop",
loop_data="customers.csv"
),
"personalize": Task(
description="Create personalized content for {customer_name}",
agent=email_agent,
context=["load"]
),
"validate": Task(
description="Check email quality",
agent=validator,
task_type="decision",
condition={
"approved": "send",
"rejected": "revise"
},
context=["personalize"]
),
"revise": Task(
description="Improve email content",
agent=email_agent,
next_tasks=["validate"] # Loop back to validation
),
"send": Task(
description="Send the email",
agent=email_agent,
context=["personalize"]
)
}
# Execute with hierarchical process for dynamic coordination
process = Process(
tasks=tasks,
agents=[data_agent, email_agent, validator, manager],
manager_llm="gpt-4o"
)
# Run the process
process.hierarchical()