Comprehensive guide for deploying PraisonAI agents in production environments
Environment Configuration
# config/production.py
import os
from typing import Optional
class ProductionConfig:
# API Keys (from environment)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
# Model Configuration
DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gpt-4")
TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
# Performance
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "300"))
MAX_CONCURRENT_AGENTS = int(os.getenv("MAX_CONCURRENT_AGENTS", "10"))
# Security
ENABLE_CONTENT_FILTERING = os.getenv("ENABLE_CONTENT_FILTERING", "true").lower() == "true"
ALLOWED_TOOLS = os.getenv("ALLOWED_TOOLS", "").split(",")
# Monitoring
TELEMETRY_ENABLED = os.getenv("TELEMETRY_ENABLED", "true").lower() == "true"
TELEMETRY_ENDPOINT = os.getenv("TELEMETRY_ENDPOINT")
Error Handling
# utils/error_handler.py
import logging
from functools import wraps
from typing import Callable, Any
import traceback
logger = logging.getLogger(__name__)
class ProductionError(Exception):
"""Base exception for production errors"""
pass
def production_error_handler(
fallback_result: Any = None,
log_errors: bool = True,
raise_errors: bool = False
):
"""Decorator for production error handling"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
if log_errors:
logger.error(f"Error in {func.__name__}: {str(e)}")
logger.debug(traceback.format_exc())
if raise_errors:
raise ProductionError(f"Production error in {func.__name__}: {str(e)}")
return fallback_result
@wraps(func)
def sync_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
if log_errors:
logger.error(f"Error in {func.__name__}: {str(e)}")
logger.debug(traceback.format_exc())
if raise_errors:
raise ProductionError(f"Production error in {func.__name__}: {str(e)}")
return fallback_result
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
Security Measures
# security/validator.py
from typing import List, Dict, Any
import re
class SecurityValidator:
def __init__(self, config: ProductionConfig):
self.config = config
self.blocked_patterns = [
r"(api_key|password|secret)",
r"(eval|exec|__import__)",
r"(system|popen|subprocess)"
]
def validate_input(self, input_text: str) -> bool:
"""Validate user input for security threats"""
# Check for injection attempts
for pattern in self.blocked_patterns:
if re.search(pattern, input_text, re.IGNORECASE):
return False
# Check input length
if len(input_text) > 10000: # 10KB limit
return False
return True
def sanitize_output(self, output: str) -> str:
"""Remove sensitive information from output"""
# Remove potential secrets
output = re.sub(r'(sk-|api_key=)[a-zA-Z0-9]+', '[REDACTED]', output)
return output
# Dockerfile
FROM python:3.11-slim
# Security: Run as non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
RUN chown -R appuser:appuser /app
# Switch to non-root user
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
EXPOSE 8000
CMD ["gunicorn", "app:app", "--bind", "0.0.0.0:8000", "--workers", "4"]
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: praisonai-agents
labels:
app: praisonai-agents
spec:
replicas: 3
selector:
matchLabels:
app: praisonai-agents
template:
metadata:
labels:
app: praisonai-agents
spec:
containers:
- name: praisonai
image: your-registry/praisonai-agents:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai-key
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: praisonai-service
spec:
selector:
app: praisonai-agents
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
# cache/agent_cache.py
from functools import lru_cache
import hashlib
import redis
import json
from typing import Optional, Any
class AgentCache:
def __init__(self, redis_url: Optional[str] = None):
self.redis_client = redis.from_url(redis_url) if redis_url else None
self.local_cache = {}
def cache_key(self, agent_name: str, input_text: str) -> str:
"""Generate cache key"""
content = f"{agent_name}:{input_text}"
return hashlib.md5(content.encode()).hexdigest()
async def get(self, agent_name: str, input_text: str) -> Optional[Any]:
"""Get cached result"""
key = self.cache_key(agent_name, input_text)
# Try local cache first
if key in self.local_cache:
return self.local_cache[key]
# Try Redis
if self.redis_client:
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
return None
async def set(self, agent_name: str, input_text: str, result: Any, ttl: int = 3600):
"""Cache result"""
key = self.cache_key(agent_name, input_text)
# Local cache
self.local_cache[key] = result
# Redis cache
if self.redis_client:
self.redis_client.setex(key, ttl, json.dumps(result))
# utils/connection_pool.py
import asyncio
from typing import Dict, Any
import aiohttp
class ConnectionPool:
def __init__(self, max_connections: int = 100):
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=30,
ttl_dns_cache=300
)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(connector=self.connector)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def request(self, method: str, url: str, **kwargs) -> Dict[str, Any]:
async with self.session.request(method, url, **kwargs) as response:
return await response.json()
# load_balancer/agent_balancer.py
from typing import List, Optional
import random
from collections import defaultdict
class AgentLoadBalancer:
def __init__(self, agents: List[Agent]):
self.agents = agents
self.agent_loads = defaultdict(int)
self.agent_errors = defaultdict(int)
def select_agent(self) -> Optional[Agent]:
"""Select agent with least load"""
available_agents = [
agent for agent in self.agents
if self.agent_errors[agent.name] < 3 # Skip agents with many errors
]
if not available_agents:
return None
# Select agent with minimum load
return min(available_agents, key=lambda a: self.agent_loads[a.name])
async def execute_with_balancing(self, task: str) -> Any:
agent = self.select_agent()
if not agent:
raise Exception("No available agents")
self.agent_loads[agent.name] += 1
try:
result = await agent.arun(task)
self.agent_errors[agent.name] = 0 # Reset error count on success
return result
except Exception as e:
self.agent_errors[agent.name] += 1
raise
finally:
self.agent_loads[agent.name] -= 1
# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
# Define metrics
agent_requests = Counter('agent_requests_total', 'Total agent requests', ['agent_name', 'status'])
agent_latency = Histogram('agent_request_duration_seconds', 'Agent request latency', ['agent_name'])
active_agents = Gauge('active_agents', 'Number of active agents')
token_usage = Counter('token_usage_total', 'Total tokens used', ['model', 'agent_name'])
class MetricsCollector:
@staticmethod
def record_request(agent_name: str, status: str):
agent_requests.labels(agent_name=agent_name, status=status).inc()
@staticmethod
def record_latency(agent_name: str, duration: float):
agent_latency.labels(agent_name=agent_name).observe(duration)
@staticmethod
def record_tokens(model: str, agent_name: str, tokens: int):
token_usage.labels(model=model, agent_name=agent_name).inc(tokens)
# logging_config.py
import logging
import json
from pythonjsonlogger import jsonlogger
def setup_production_logging():
# JSON formatter for structured logs
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logHandler.setFormatter(formatter)
# Configure root logger
logging.root.setLevel(logging.INFO)
logging.root.addHandler(logHandler)
# Configure specific loggers
logging.getLogger('praisonaiagents').setLevel(logging.INFO)
logging.getLogger('httpx').setLevel(logging.WARNING)
return logging.getLogger(__name__)
# health/health_check.py
from typing import Dict, List
import asyncio
class HealthChecker:
def __init__(self, agents: List[Agent], dependencies: Dict[str, Any]):
self.agents = agents
self.dependencies = dependencies
async def check_health(self) -> Dict[str, Any]:
"""Comprehensive health check"""
health_status = {
"status": "healthy",
"checks": {}
}
# Check agents
for agent in self.agents:
try:
await agent.arun("test")
health_status["checks"][f"agent_{agent.name}"] = "healthy"
except Exception as e:
health_status["status"] = "unhealthy"
health_status["checks"][f"agent_{agent.name}"] = str(e)
# Check dependencies
for name, dependency in self.dependencies.items():
try:
await dependency.ping()
health_status["checks"][name] = "healthy"
except Exception as e:
health_status["status"] = "unhealthy"
health_status["checks"][name] = str(e)
return health_status
# security/key_manager.py
import os
from cryptography.fernet import Fernet
import boto3
from typing import Optional
class SecureKeyManager:
def __init__(self, use_aws_secrets: bool = False):
self.use_aws_secrets = use_aws_secrets
self.fernet = Fernet(os.getenv("ENCRYPTION_KEY", Fernet.generate_key()))
if use_aws_secrets:
self.secrets_client = boto3.client('secretsmanager')
def get_api_key(self, key_name: str) -> Optional[str]:
"""Securely retrieve API key"""
if self.use_aws_secrets:
try:
response = self.secrets_client.get_secret_value(SecretId=key_name)
return response['SecretString']
except Exception as e:
logger.error(f"Failed to retrieve secret: {e}")
return None
else:
# Get from environment and decrypt
encrypted = os.getenv(key_name)
if encrypted:
return self.fernet.decrypt(encrypted.encode()).decode()
return None
# security/rate_limiter.py
from collections import defaultdict
import time
from typing import Dict, Tuple
class RateLimiter:
def __init__(self, requests_per_minute: int = 60):
self.requests_per_minute = requests_per_minute
self.requests: Dict[str, List[float]] = defaultdict(list)
def is_allowed(self, client_id: str) -> Tuple[bool, Optional[float]]:
"""Check if request is allowed"""
now = time.time()
minute_ago = now - 60
# Clean old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > minute_ago
]
if len(self.requests[client_id]) >= self.requests_per_minute:
# Calculate wait time
oldest_request = min(self.requests[client_id])
wait_time = 60 - (now - oldest_request)
return False, wait_time
self.requests[client_id].append(now)
return True, None
# optimization/model_selector.py
from typing import Dict, Optional
class CostOptimizedModelSelector:
def __init__(self):
self.model_costs = {
"gpt-4": 0.03,
"gpt-3.5-turbo": 0.002,
"claude-3-opus": 0.015,
"claude-3-sonnet": 0.003
}
self.model_capabilities = {
"gpt-4": ["complex_reasoning", "code", "analysis"],
"gpt-3.5-turbo": ["general", "conversation"],
"claude-3-opus": ["complex_reasoning", "long_context"],
"claude-3-sonnet": ["general", "fast"]
}
def select_model(self, task_type: str, max_cost: Optional[float] = None) -> str:
"""Select most cost-effective model for task"""
suitable_models = [
model for model, capabilities in self.model_capabilities.items()
if task_type in capabilities or "general" in capabilities
]
if max_cost:
suitable_models = [
model for model in suitable_models
if self.model_costs[model] <= max_cost
]
# Return cheapest suitable model
return min(suitable_models, key=lambda m: self.model_costs[m])
# optimization/token_optimizer.py
import tiktoken
class TokenOptimizer:
def __init__(self, model: str = "gpt-4"):
self.encoder = tiktoken.encoding_for_model(model)
self.max_tokens = 8000 # Leave room for response
def optimize_prompt(self, prompt: str, context: str) -> str:
"""Optimize prompt to fit within token limits"""
prompt_tokens = len(self.encoder.encode(prompt))
context_tokens = len(self.encoder.encode(context))
total_tokens = prompt_tokens + context_tokens
if total_tokens <= self.max_tokens:
return f"{prompt}\n\nContext: {context}"
# Truncate context to fit
available_tokens = self.max_tokens - prompt_tokens - 50 # Buffer
context_parts = context.split('. ')
optimized_context = ""
current_tokens = 0
for part in context_parts:
part_tokens = len(self.encoder.encode(part))
if current_tokens + part_tokens <= available_tokens:
optimized_context += part + ". "
current_tokens += part_tokens
else:
break
return f"{prompt}\n\nContext (truncated): {optimized_context}"
High Latency
# Check agent performance
async def diagnose_latency():
start = time.time()
result = await agent.arun("test")
latency = time.time() - start
if latency > 5: # 5 seconds threshold
logger.warning(f"High latency detected: {latency}s")
# Check cache hit rate
# Check model response time
# Check network latency
Memory Leaks
import tracemalloc
import gc
# Track memory usage
tracemalloc.start()
# ... run agents ...
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:10]:
print(stat)
API Rate Limits