Skip to main content

Model 5: Event-Driven

When to Use: High-volume batch processing, decoupled architectures, or when you need async recipe execution with guaranteed delivery and retry semantics.

How It Works

The Event-Driven model decouples recipe invocation from execution. Producers publish recipe requests to a queue, and workers consume and process them asynchronously.
Note: PraisonAI does not include built-in queue support. This model documents an integration pattern using external message queues. You implement the producer/consumer logic in your application.

Pros & Cons

  • Async at scale - Process thousands of recipes concurrently
  • Decoupled - Producers don’t wait for results
  • Guaranteed delivery - Queue handles retries
  • Horizontal scaling - Add workers as needed
  • Fault tolerant - Failed jobs can be retried
  • Backpressure handling - Queue buffers during spikes

Step-by-Step Tutorial

1

Choose a Message Queue

QueueBest ForComplexity
Redis (rq)Simple, low volumeLow
CeleryPython-native, flexibleMedium
RabbitMQEnterprise, reliableMedium
AWS SQSServerless, managedLow
KafkaHigh throughput, streamingHigh
2

Install Dependencies

# For Redis Queue (rq)
pip install rq redis praisonai

# For Celery
pip install celery praisonai
3

Define Worker Task

# worker.py
from rq import Worker, Queue
from redis import Redis
from praisonai import recipe
import json

def run_recipe_task(recipe_name: str, input_data: dict, callback_url: str = None):
    """Execute a recipe and optionally send results to callback."""
    result = recipe.run(recipe_name, input=input_data)
    
    if callback_url:
        import requests
        requests.post(callback_url, json=result.to_dict())
    
    return result.to_dict()

if __name__ == "__main__":
    redis_conn = Redis()
    queue = Queue("recipes", connection=redis_conn)
    worker = Worker([queue], connection=redis_conn)
    worker.work()
4

Create Producer

# producer.py
from rq import Queue
from redis import Redis
from worker import run_recipe_task

redis_conn = Redis()
queue = Queue("recipes", connection=redis_conn)

# Enqueue a recipe job
job = queue.enqueue(
    run_recipe_task,
    "support-reply-drafter",
    {"ticket_id": "T-123", "message": "Help needed"},
    callback_url="https://myapp.com/webhook/recipe-result"
)

print(f"Job ID: {job.id}")
print(f"Status: {job.get_status()}")
5

Start Workers

# Redis Queue
rq worker recipes

# Celery
celery -A tasks worker --loglevel=info

Production-Ready Example

# event_driven_recipes.py
import os
import json
import logging
from typing import Any, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
import uuid

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class RecipeJob:
    """Recipe job metadata."""
    job_id: str
    recipe_name: str
    input_data: Dict[str, Any]
    callback_url: Optional[str] = None
    created_at: str = None
    
    def __post_init__(self):
        if not self.created_at:
            self.created_at = datetime.utcnow().isoformat()

class RecipeJobProducer:
    """Producer for recipe jobs."""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        from rq import Queue
        from redis import Redis
        
        self.redis = Redis.from_url(redis_url)
        self.queue = Queue("recipes", connection=self.redis)
    
    def submit(
        self,
        recipe_name: str,
        input_data: Dict[str, Any],
        callback_url: str = None,
        priority: str = "normal"
    ) -> str:
        """Submit a recipe job to the queue."""
        job_id = str(uuid.uuid4())
        
        job = self.queue.enqueue(
            "worker.run_recipe_task",
            recipe_name,
            input_data,
            callback_url,
            job_id=job_id,
            job_timeout=300,  # 5 minutes
            result_ttl=86400,  # Keep results for 24 hours
        )
        
        logger.info(f"Submitted job {job_id} for recipe {recipe_name}")
        return job_id
    
    def get_status(self, job_id: str) -> Dict[str, Any]:
        """Get job status."""
        from rq.job import Job
        
        try:
            job = Job.fetch(job_id, connection=self.redis)
            return {
                "job_id": job_id,
                "status": job.get_status(),
                "result": job.result if job.is_finished else None,
                "error": str(job.exc_info) if job.is_failed else None,
            }
        except Exception:
            return {"job_id": job_id, "status": "not_found"}
    
    def get_result(self, job_id: str, timeout: int = 60) -> Optional[Dict]:
        """Wait for and return job result."""
        from rq.job import Job
        import time
        
        start = time.time()
        while time.time() - start < timeout:
            job = Job.fetch(job_id, connection=self.redis)
            if job.is_finished:
                return job.result
            if job.is_failed:
                raise RuntimeError(f"Job failed: {job.exc_info}")
            time.sleep(0.5)
        
        raise TimeoutError(f"Job {job_id} did not complete in {timeout}s")


class RecipeJobWorker:
    """Worker for processing recipe jobs."""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        from redis import Redis
        self.redis = Redis.from_url(redis_url)
    
    def run(self, queues: list = None):
        """Start the worker."""
        from rq import Worker, Queue
        
        queues = queues or ["recipes"]
        queue_objs = [Queue(q, connection=self.redis) for q in queues]
        
        worker = Worker(queue_objs, connection=self.redis)
        worker.work()


# Worker task function
def run_recipe_task(
    recipe_name: str,
    input_data: dict,
    callback_url: str = None
) -> dict:
    """Execute a recipe and handle results."""
    from praisonai import recipe
    import requests
    
    logger.info(f"Processing recipe: {recipe_name}")
    
    try:
        result = recipe.run(recipe_name, input=input_data)
        result_dict = result.to_dict()
        
        if callback_url:
            try:
                requests.post(
                    callback_url,
                    json=result_dict,
                    timeout=10
                )
                logger.info(f"Sent result to callback: {callback_url}")
            except Exception as e:
                logger.error(f"Callback failed: {e}")
        
        return result_dict
        
    except Exception as e:
        logger.error(f"Recipe execution failed: {e}")
        raise


# Usage example
if __name__ == "__main__":
    import sys
    
    if len(sys.argv) > 1 and sys.argv[1] == "worker":
        # Start worker
        worker = RecipeJobWorker()
        worker.run()
    else:
        # Submit job
        producer = RecipeJobProducer()
        
        job_id = producer.submit(
            "support-reply-drafter",
            {"ticket_id": "T-123", "message": "I need help"},
            callback_url="https://myapp.com/webhook/result"
        )
        
        print(f"Submitted job: {job_id}")
        
        # Wait for result
        result = producer.get_result(job_id, timeout=60)
        print(f"Result: {result}")

AWS SQS Integration

# sqs_worker.py
import boto3
import json
from praisonai import recipe

sqs = boto3.client('sqs')
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/recipe-jobs"

def process_messages():
    """Poll SQS and process recipe jobs."""
    while True:
        response = sqs.receive_message(
            QueueUrl=QUEUE_URL,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20,
        )
        
        for message in response.get("Messages", []):
            try:
                body = json.loads(message["Body"])
                
                result = recipe.run(
                    body["recipe_name"],
                    input=body["input_data"]
                )
                
                # Store result or send to callback
                print(f"Completed: {result.run_id}")
                
                # Delete processed message
                sqs.delete_message(
                    QueueUrl=QUEUE_URL,
                    ReceiptHandle=message["ReceiptHandle"]
                )
                
            except Exception as e:
                print(f"Error processing message: {e}")
                # Message will be retried after visibility timeout

if __name__ == "__main__":
    process_messages()

Troubleshooting

Check worker status and logs:
# Redis Queue
rq info

# Celery
celery -A tasks inspect active
Enable detailed logging:
import logging
logging.basicConfig(level=logging.DEBUG)
Check failed job queue:
rq info --failed
Store large data externally and pass references:
# Instead of:
queue.enqueue(task, large_data)

# Do:
data_id = store_in_s3(large_data)
queue.enqueue(task, data_id)
Implement retry logic for callbacks:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5)
session.mount("https://", HTTPAdapter(max_retries=retries))

Security & Ops Notes

Security Considerations
  • Message encryption - Encrypt sensitive data in messages
  • Queue authentication - Secure queue access with credentials
  • Callback validation - Validate callback URLs before sending
  • Dead letter queues - Handle failed jobs properly
  • Rate limiting - Prevent queue flooding
# Validate callback URL
from urllib.parse import urlparse

def is_valid_callback(url: str) -> bool:
    parsed = urlparse(url)
    return (
        parsed.scheme in ("https",) and
        parsed.netloc and
        not parsed.netloc.startswith("localhost")
    )

Monitoring

Key metrics to track:
  • Queue depth - Number of pending jobs
  • Processing time - Job execution duration
  • Failure rate - Percentage of failed jobs
  • Worker utilization - Active vs idle workers
# Prometheus metrics example
from prometheus_client import Counter, Histogram

JOBS_TOTAL = Counter('recipe_jobs_total', 'Total jobs', ['recipe', 'status'])
JOB_DURATION = Histogram('recipe_job_duration_seconds', 'Job duration')

@JOB_DURATION.time()
def run_recipe_task(recipe_name, input_data):
    try:
        result = recipe.run(recipe_name, input=input_data)
        JOBS_TOTAL.labels(recipe=recipe_name, status='success').inc()
        return result
    except Exception:
        JOBS_TOTAL.labels(recipe=recipe_name, status='failed').inc()
        raise

Next Steps