Skip to main content

Parallel Workflow

Execute multiple independent tasks simultaneously for faster results.

Overview

           ┌─────────────┐
           │   Input     │
           └──────┬──────┘
    ┌─────────────┼─────────────┐
    ▼             ▼             ▼
┌───────┐    ┌───────┐    ┌───────┐
│Agent 1│    │Agent 2│    │Agent 3│
└───┬───┘    └───┬───┘    └───┬───┘
    └─────────────┼─────────────┘

           ┌─────────────┐
           │  Aggregate  │
           └─────────────┘

Implementation

from praisonaiagents import Agent, Task, PraisonAIAgents

# Create agents for parallel research
ai_researcher = Agent(
    name="AI Researcher",
    instructions="Research AI developments"
)

ml_researcher = Agent(
    name="ML Researcher", 
    instructions="Research ML developments"
)

nlp_researcher = Agent(
    name="NLP Researcher",
    instructions="Research NLP developments"
)

# Create independent tasks
tasks = [
    Task(description="Research AI trends", agent=ai_researcher),
    Task(description="Research ML trends", agent=ml_researcher),
    Task(description="Research NLP trends", agent=nlp_researcher)
]

# Run in parallel
agents = PraisonAIAgents(
    agents=[ai_researcher, ml_researcher, nlp_researcher],
    tasks=tasks,
    process="parallel"
)

results = agents.start()

Using Workflows Module

from praisonaiagents.workflows import ParallelWorkflow

workflow = ParallelWorkflow(
    agents=[ai_researcher, ml_researcher, nlp_researcher],
    aggregator=lambda results: "\n\n".join(results)
)

result = workflow.run("Research latest developments")

Async Parallel Execution

import asyncio
from praisonaiagents import Agent

async def run_parallel():
    agents = [
        Agent(name="Agent1"),
        Agent(name="Agent2"),
        Agent(name="Agent3")
    ]
    
    # Run all agents concurrently
    tasks = [agent.astart("Research topic") for agent in agents]
    results = await asyncio.gather(*tasks)
    
    return results

results = asyncio.run(run_parallel())

With Aggregation

from praisonaiagents import Agent, PraisonAIAgents

# Parallel researchers
researchers = [
    Agent(name="Researcher1"),
    Agent(name="Researcher2"),
    Agent(name="Researcher3")
]

# Aggregator agent
aggregator = Agent(
    name="Aggregator",
    instructions="Combine and summarize all research findings."
)

# Run parallel then aggregate
agents = PraisonAIAgents(
    agents=researchers + [aggregator],
    tasks=[
        # Parallel tasks
        Task(description="Research topic A", agent=researchers[0]),
        Task(description="Research topic B", agent=researchers[1]),
        Task(description="Research topic C", agent=researchers[2]),
        # Aggregation task (depends on all parallel tasks)
        Task(
            description="Summarize all findings",
            agent=aggregator,
            context=["Research topic A", "Research topic B", "Research topic C"]
        )
    ]
)