Model 5: Event-Driven
When to Use : High-volume batch processing, decoupled architectures, or when you need async recipe execution with guaranteed delivery and retry semantics.
How It Works
The Event-Driven model decouples recipe invocation from execution. Producers publish recipe requests to a queue, and workers consume and process them asynchronously.
Note : PraisonAI does not include built-in queue support. This model documents an integration pattern using external message queues. You implement the producer/consumer logic in your application.
Pros & Cons
Async at scale - Process thousands of recipes concurrently
Decoupled - Producers don’t wait for results
Guaranteed delivery - Queue handles retries
Horizontal scaling - Add workers as needed
Fault tolerant - Failed jobs can be retried
Backpressure handling - Queue buffers during spikes
Complexity - Requires queue infrastructure
No real-time response - Results delivered async
Eventual consistency - Results may be delayed
Debugging harder - Distributed tracing needed
Infrastructure cost - Queue service costs
Step-by-Step Tutorial
Choose a Message Queue
Queue Best For Complexity Redis (rq) Simple, low volume Low Celery Python-native, flexible Medium RabbitMQ Enterprise, reliable Medium AWS SQS Serverless, managed Low Kafka High throughput, streaming High
Install Dependencies
# For Redis Queue (rq)
pip install rq redis praisonai
# For Celery
pip install celery praisonai
Define Worker Task
# worker.py
from rq import Worker, Queue
from redis import Redis
from praisonai import recipe
import json
def run_recipe_task ( recipe_name : str , input_data : dict , callback_url : str = None ):
"""Execute a recipe and optionally send results to callback."""
result = recipe.run(recipe_name, input = input_data)
if callback_url:
import requests
requests.post(callback_url, json = result.to_dict())
return result.to_dict()
if __name__ == "__main__" :
redis_conn = Redis()
queue = Queue( "recipes" , connection = redis_conn)
worker = Worker([queue], connection = redis_conn)
worker.work()
Create Producer
Redis Queue Producer
Celery Producer
# producer.py
from rq import Queue
from redis import Redis
from worker import run_recipe_task
redis_conn = Redis()
queue = Queue( "recipes" , connection = redis_conn)
# Enqueue a recipe job
job = queue.enqueue(
run_recipe_task,
"support-reply-drafter" ,
{ "ticket_id" : "T-123" , "message" : "Help needed" },
callback_url = "https://myapp.com/webhook/recipe-result"
)
print ( f "Job ID: { job.id } " )
print ( f "Status: { job.get_status() } " )
Start Workers
# Redis Queue
rq worker recipes
# Celery
celery -A tasks worker --loglevel=info
Production-Ready Example
# event_driven_recipes.py
import os
import json
import logging
from typing import Any, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
import uuid
logging.basicConfig( level = logging. INFO )
logger = logging.getLogger( __name__ )
@dataclass
class RecipeJob :
"""Recipe job metadata."""
job_id: str
recipe_name: str
input_data: Dict[ str , Any]
callback_url: Optional[ str ] = None
created_at: str = None
def __post_init__ ( self ):
if not self .created_at:
self .created_at = datetime.utcnow().isoformat()
class RecipeJobProducer :
"""Producer for recipe jobs."""
def __init__ ( self , redis_url : str = "redis://localhost:6379" ):
from rq import Queue
from redis import Redis
self .redis = Redis.from_url(redis_url)
self .queue = Queue( "recipes" , connection = self .redis)
def submit (
self ,
recipe_name : str ,
input_data : Dict[ str , Any],
callback_url : str = None ,
priority : str = "normal"
) -> str :
"""Submit a recipe job to the queue."""
job_id = str (uuid.uuid4())
job = self .queue.enqueue(
"worker.run_recipe_task" ,
recipe_name,
input_data,
callback_url,
job_id = job_id,
job_timeout = 300 , # 5 minutes
result_ttl = 86400 , # Keep results for 24 hours
)
logger.info( f "Submitted job { job_id } for recipe { recipe_name } " )
return job_id
def get_status ( self , job_id : str ) -> Dict[ str , Any]:
"""Get job status."""
from rq.job import Job
try :
job = Job.fetch(job_id, connection = self .redis)
return {
"job_id" : job_id,
"status" : job.get_status(),
"result" : job.result if job.is_finished else None ,
"error" : str (job.exc_info) if job.is_failed else None ,
}
except Exception :
return { "job_id" : job_id, "status" : "not_found" }
def get_result ( self , job_id : str , timeout : int = 60 ) -> Optional[Dict]:
"""Wait for and return job result."""
from rq.job import Job
import time
start = time.time()
while time.time() - start < timeout:
job = Job.fetch(job_id, connection = self .redis)
if job.is_finished:
return job.result
if job.is_failed:
raise RuntimeError ( f "Job failed: { job.exc_info } " )
time.sleep( 0.5 )
raise TimeoutError ( f "Job { job_id } did not complete in { timeout } s" )
class RecipeJobWorker :
"""Worker for processing recipe jobs."""
def __init__ ( self , redis_url : str = "redis://localhost:6379" ):
from redis import Redis
self .redis = Redis.from_url(redis_url)
def run ( self , queues : list = None ):
"""Start the worker."""
from rq import Worker, Queue
queues = queues or [ "recipes" ]
queue_objs = [Queue(q, connection = self .redis) for q in queues]
worker = Worker(queue_objs, connection = self .redis)
worker.work()
# Worker task function
def run_recipe_task (
recipe_name : str ,
input_data : dict ,
callback_url : str = None
) -> dict :
"""Execute a recipe and handle results."""
from praisonai import recipe
import requests
logger.info( f "Processing recipe: { recipe_name } " )
try :
result = recipe.run(recipe_name, input = input_data)
result_dict = result.to_dict()
if callback_url:
try :
requests.post(
callback_url,
json = result_dict,
timeout = 10
)
logger.info( f "Sent result to callback: { callback_url } " )
except Exception as e:
logger.error( f "Callback failed: { e } " )
return result_dict
except Exception as e:
logger.error( f "Recipe execution failed: { e } " )
raise
# Usage example
if __name__ == "__main__" :
import sys
if len (sys.argv) > 1 and sys.argv[ 1 ] == "worker" :
# Start worker
worker = RecipeJobWorker()
worker.run()
else :
# Submit job
producer = RecipeJobProducer()
job_id = producer.submit(
"support-reply-drafter" ,
{ "ticket_id" : "T-123" , "message" : "I need help" },
callback_url = "https://myapp.com/webhook/result"
)
print ( f "Submitted job: { job_id } " )
# Wait for result
result = producer.get_result(job_id, timeout = 60 )
print ( f "Result: { result } " )
AWS SQS Integration
# sqs_worker.py
import boto3
import json
from praisonai import recipe
sqs = boto3.client( 'sqs' )
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/recipe-jobs"
def process_messages ():
"""Poll SQS and process recipe jobs."""
while True :
response = sqs.receive_message(
QueueUrl = QUEUE_URL ,
MaxNumberOfMessages = 10 ,
WaitTimeSeconds = 20 ,
)
for message in response.get( "Messages" , []):
try :
body = json.loads(message[ "Body" ])
result = recipe.run(
body[ "recipe_name" ],
input = body[ "input_data" ]
)
# Store result or send to callback
print ( f "Completed: { result.run_id } " )
# Delete processed message
sqs.delete_message(
QueueUrl = QUEUE_URL ,
ReceiptHandle = message[ "ReceiptHandle" ]
)
except Exception as e:
print ( f "Error processing message: { e } " )
# Message will be retried after visibility timeout
if __name__ == "__main__" :
process_messages()
Troubleshooting
Check worker status and logs: # Redis Queue
rq info
# Celery
celery -A tasks inspect active
Enable detailed logging: import logging
logging.basicConfig( level = logging. DEBUG )
Check failed job queue:
Memory issues with large payloads
Store large data externally and pass references: # Instead of:
queue.enqueue(task, large_data)
# Do:
data_id = store_in_s3(large_data)
queue.enqueue(task, data_id)
Implement retry logic for callbacks: import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
retries = Retry( total = 3 , backoff_factor = 0.5 )
session.mount( "https://" , HTTPAdapter( max_retries = retries))
Security & Ops Notes
Message encryption - Encrypt sensitive data in messages
Queue authentication - Secure queue access with credentials
Callback validation - Validate callback URLs before sending
Dead letter queues - Handle failed jobs properly
Rate limiting - Prevent queue flooding
# Validate callback URL
from urllib.parse import urlparse
def is_valid_callback ( url : str ) -> bool :
parsed = urlparse(url)
return (
parsed.scheme in ( "https" ,) and
parsed.netloc and
not parsed.netloc.startswith( "localhost" )
)
Monitoring
Key metrics to track:
Queue depth - Number of pending jobs
Processing time - Job execution duration
Failure rate - Percentage of failed jobs
Worker utilization - Active vs idle workers
# Prometheus metrics example
from prometheus_client import Counter, Histogram
JOBS_TOTAL = Counter( 'recipe_jobs_total' , 'Total jobs' , [ 'recipe' , 'status' ])
JOB_DURATION = Histogram( 'recipe_job_duration_seconds' , 'Job duration' )
@JOB_DURATION.time ()
def run_recipe_task ( recipe_name , input_data ):
try :
result = recipe.run(recipe_name, input = input_data)
JOBS_TOTAL .labels( recipe = recipe_name, status = 'success' ).inc()
return result
except Exception :
JOBS_TOTAL .labels( recipe = recipe_name, status = 'failed' ).inc()
raise
Next Steps