Documentation Index
Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Production Deployment Guide
This guide covers best practices, configurations, and strategies for deploying PraisonAI agents in production environments.Overview
Deploying AI agents in production requires careful consideration of:- Performance and scalability
- Security and compliance
- Monitoring and observability
- Cost optimization
- Error handling and recovery
Pre-Deployment Checklist
1. Code Preparation
Environment Configuration
Environment Configuration
# config/production.py
import os
from typing import Optional
class ProductionConfig:
# API Keys (from environment)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
# Model Configuration
DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gpt-4")
TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
# Performance
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "300"))
MAX_CONCURRENT_AGENTS = int(os.getenv("MAX_CONCURRENT_AGENTS", "10"))
# Security
ENABLE_CONTENT_FILTERING = os.getenv("ENABLE_CONTENT_FILTERING", "true").lower() == "true"
ALLOWED_TOOLS = os.getenv("ALLOWED_TOOLS", "").split(",")
# Monitoring
TELEMETRY_ENABLED = os.getenv("TELEMETRY_ENABLED", "true").lower() == "true"
TELEMETRY_ENDPOINT = os.getenv("TELEMETRY_ENDPOINT")
Error Handling
Error Handling
# utils/error_handler.py
import logging
from functools import wraps
from typing import Callable, Any
import traceback
logger = logging.getLogger(__name__)
class ProductionError(Exception):
"""Base exception for production errors"""
pass
def production_error_handler(
fallback_result: Any = None,
log_errors: bool = True,
raise_errors: bool = False
):
"""Decorator for production error handling"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
if log_errors:
logger.error(f"Error in {func.__name__}: {str(e)}")
logger.debug(traceback.format_exc())
if raise_errors:
raise ProductionError(f"Production error in {func.__name__}: {str(e)}")
return fallback_result
@wraps(func)
def sync_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
if log_errors:
logger.error(f"Error in {func.__name__}: {str(e)}")
logger.debug(traceback.format_exc())
if raise_errors:
raise ProductionError(f"Production error in {func.__name__}: {str(e)}")
return fallback_result
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
Security Measures
Security Measures
# security/validator.py
from typing import List, Dict, Any
import re
class SecurityValidator:
def __init__(self, config: ProductionConfig):
self.config = config
self.blocked_patterns = [
r"(api_key|password|secret)",
r"(eval|exec|__import__)",
r"(system|popen|subprocess)"
]
def validate_input(self, input_text: str) -> bool:
"""Validate user input for security threats"""
# Check for injection attempts
for pattern in self.blocked_patterns:
if re.search(pattern, input_text, re.IGNORECASE):
return False
# Check input length
if len(input_text) > 10000: # 10KB limit
return False
return True
def sanitize_output(self, output: str) -> str:
"""Remove sensitive information from output"""
# Remove potential secrets
output = re.sub(r'(sk-|api_key=)[a-zA-Z0-9]+', '[REDACTED]', output)
return output
2. Infrastructure Setup
Container Deployment
# Dockerfile
FROM python:3.11-slim
# Security: Run as non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
RUN chown -R appuser:appuser /app
# Switch to non-root user
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
EXPOSE 8000
CMD ["gunicorn", "app:app", "--bind", "0.0.0.0:8000", "--workers", "4"]
Kubernetes Deployment
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: praisonai-agents
labels:
app: praisonai-agents
spec:
replicas: 3
selector:
matchLabels:
app: praisonai-agents
template:
metadata:
labels:
app: praisonai-agents
spec:
containers:
- name: praisonai
image: your-registry/praisonai-agents:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai-key
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: praisonai-service
spec:
selector:
app: praisonai-agents
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
Performance Optimization
1. Caching Strategy
# cache/agent_cache.py
from functools import lru_cache
import hashlib
import redis
import json
from typing import Optional, Any
class AgentCache:
def __init__(self, redis_url: Optional[str] = None):
self.redis_client = redis.from_url(redis_url) if redis_url else None
self.local_cache = {}
def cache_key(self, agent_name: str, input_text: str) -> str:
"""Generate cache key"""
content = f"{agent_name}:{input_text}"
return hashlib.md5(content.encode()).hexdigest()
async def get(self, agent_name: str, input_text: str) -> Optional[Any]:
"""Get cached result"""
key = self.cache_key(agent_name, input_text)
# Try local cache first
if key in self.local_cache:
return self.local_cache[key]
# Try Redis
if self.redis_client:
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
return None
async def set(self, agent_name: str, input_text: str, result: Any, ttl: int = 3600):
"""Cache result"""
key = self.cache_key(agent_name, input_text)
# Local cache
self.local_cache[key] = result
# Redis cache
if self.redis_client:
self.redis_client.setex(key, ttl, json.dumps(result))
2. Connection Pooling
# utils/connection_pool.py
import asyncio
from typing import Dict, Any
import aiohttp
class ConnectionPool:
def __init__(self, max_connections: int = 100):
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=30,
ttl_dns_cache=300
)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(connector=self.connector)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def request(self, method: str, url: str, **kwargs) -> Dict[str, Any]:
async with self.session.request(method, url, **kwargs) as response:
return await response.json()
3. Load Balancing
# load_balancer/agent_balancer.py
from typing import List, Optional
import random
from collections import defaultdict
class AgentLoadBalancer:
def __init__(self, agents: List[Agent]):
self.agents = agents
self.agent_loads = defaultdict(int)
self.agent_errors = defaultdict(int)
def select_agent(self) -> Optional[Agent]:
"""Select agent with least load"""
available_agents = [
agent for agent in self.agents
if self.agent_errors[agent.name] < 3 # Skip agents with many errors
]
if not available_agents:
return None
# Select agent with minimum load
return min(available_agents, key=lambda a: self.agent_loads[a.name])
async def execute_with_balancing(self, task: str) -> Any:
agent = self.select_agent()
if not agent:
raise Exception("No available agents")
self.agent_loads[agent.name] += 1
try:
result = await agent.arun(task)
self.agent_errors[agent.name] = 0 # Reset error count on success
return result
except Exception as e:
self.agent_errors[agent.name] += 1
raise
finally:
self.agent_loads[agent.name] -= 1
Monitoring and Observability
1. Metrics Collection
# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
# Define metrics
agent_requests = Counter('agent_requests_total', 'Total agent requests', ['agent_name', 'status'])
agent_latency = Histogram('agent_request_duration_seconds', 'Agent request latency', ['agent_name'])
active_agents = Gauge('active_agents', 'Number of active agents')
token_usage = Counter('token_usage_total', 'Total tokens used', ['model', 'agent_name'])
class MetricsCollector:
@staticmethod
def record_request(agent_name: str, status: str):
agent_requests.labels(agent_name=agent_name, status=status).inc()
@staticmethod
def record_latency(agent_name: str, duration: float):
agent_latency.labels(agent_name=agent_name).observe(duration)
@staticmethod
def record_tokens(model: str, agent_name: str, tokens: int):
token_usage.labels(model=model, agent_name=agent_name).inc(tokens)
2. Logging Configuration
# logging_config.py
import logging
import json
from pythonjsonlogger import jsonlogger
def setup_production_logging():
# JSON formatter for structured logs
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logHandler.setFormatter(formatter)
# Configure root logger
logging.root.setLevel(logging.INFO)
logging.root.addHandler(logHandler)
# Configure specific loggers
logging.getLogger('praisonaiagents').setLevel(logging.INFO)
logging.getLogger('httpx').setLevel(logging.WARNING)
return logging.getLogger(__name__)
3. Health Checks
# health/health_check.py
from typing import Dict, List
import asyncio
class HealthChecker:
def __init__(self, agents: List[Agent], dependencies: Dict[str, Any]):
self.agents = agents
self.dependencies = dependencies
async def check_health(self) -> Dict[str, Any]:
"""Comprehensive health check"""
health_status = {
"status": "healthy",
"checks": {}
}
# Check agents
for agent in self.agents:
try:
await agent.arun("test")
health_status["checks"][f"agent_{agent.name}"] = "healthy"
except Exception as e:
health_status["status"] = "unhealthy"
health_status["checks"][f"agent_{agent.name}"] = str(e)
# Check dependencies
for name, dependency in self.dependencies.items():
try:
await dependency.ping()
health_status["checks"][name] = "healthy"
except Exception as e:
health_status["status"] = "unhealthy"
health_status["checks"][name] = str(e)
return health_status
Security Best Practices
1. API Key Management
# security/key_manager.py
import os
from cryptography.fernet import Fernet
import boto3
from typing import Optional
class SecureKeyManager:
def __init__(self, use_aws_secrets: bool = False):
self.use_aws_secrets = use_aws_secrets
self.fernet = Fernet(os.getenv("ENCRYPTION_KEY", Fernet.generate_key()))
if use_aws_secrets:
self.secrets_client = boto3.client('secretsmanager')
def get_api_key(self, key_name: str) -> Optional[str]:
"""Securely retrieve API key"""
if self.use_aws_secrets:
try:
response = self.secrets_client.get_secret_value(SecretId=key_name)
return response['SecretString']
except Exception as e:
logger.error(f"Failed to retrieve secret: {e}")
return None
else:
# Get from environment and decrypt
encrypted = os.getenv(key_name)
if encrypted:
return self.fernet.decrypt(encrypted.encode()).decode()
return None
2. Rate Limiting
# security/rate_limiter.py
from collections import defaultdict
import time
from typing import Dict, Tuple
class RateLimiter:
def __init__(self, requests_per_minute: int = 60):
self.requests_per_minute = requests_per_minute
self.requests: Dict[str, List[float]] = defaultdict(list)
def is_allowed(self, client_id: str) -> Tuple[bool, Optional[float]]:
"""Check if request is allowed"""
now = time.time()
minute_ago = now - 60
# Clean old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > minute_ago
]
if len(self.requests[client_id]) >= self.requests_per_minute:
# Calculate wait time
oldest_request = min(self.requests[client_id])
wait_time = 60 - (now - oldest_request)
return False, wait_time
self.requests[client_id].append(now)
return True, None
Cost Optimization
1. Model Selection Strategy
# optimization/model_selector.py
from typing import Dict, Optional
class CostOptimizedModelSelector:
def __init__(self):
self.model_costs = {
"gpt-4": 0.03,
"gpt-3.5-turbo": 0.002,
"claude-3-opus": 0.015,
"claude-3-sonnet": 0.003
}
self.model_capabilities = {
"gpt-4": ["complex_reasoning", "code", "analysis"],
"gpt-3.5-turbo": ["general", "conversation"],
"claude-3-opus": ["complex_reasoning", "long_context"],
"claude-3-sonnet": ["general", "fast"]
}
def select_model(self, task_type: str, max_cost: Optional[float] = None) -> str:
"""Select most cost-effective model for task"""
suitable_models = [
model for model, capabilities in self.model_capabilities.items()
if task_type in capabilities or "general" in capabilities
]
if max_cost:
suitable_models = [
model for model in suitable_models
if self.model_costs[model] <= max_cost
]
# Return cheapest suitable model
return min(suitable_models, key=lambda m: self.model_costs[m])
2. Token Usage Optimization
# optimization/token_optimizer.py
import tiktoken
class TokenOptimizer:
def __init__(self, model: str = "gpt-4"):
self.encoder = tiktoken.encoding_for_model(model)
self.max_tokens = 8000 # Leave room for response
def optimize_prompt(self, prompt: str, context: str) -> str:
"""Optimize prompt to fit within token limits"""
prompt_tokens = len(self.encoder.encode(prompt))
context_tokens = len(self.encoder.encode(context))
total_tokens = prompt_tokens + context_tokens
if total_tokens <= self.max_tokens:
return f"{prompt}\n\nContext: {context}"
# Truncate context to fit
available_tokens = self.max_tokens - prompt_tokens - 50 # Buffer
context_parts = context.split('. ')
optimized_context = ""
current_tokens = 0
for part in context_parts:
part_tokens = len(self.encoder.encode(part))
if current_tokens + part_tokens <= available_tokens:
optimized_context += part + ". "
current_tokens += part_tokens
else:
break
return f"{prompt}\n\nContext (truncated): {optimized_context}"
Deployment Checklist
- Pre-Deployment
- Deployment
- Post-Deployment
- All tests passing
- Security scan completed
- Performance benchmarks met
- Documentation updated
- Environment variables configured
- Secrets management setup
- Monitoring dashboards created
- Alerting rules configured
- Backup strategy defined
- Rollback plan prepared
- Deploy to staging environment
- Run smoke tests
- Verify monitoring metrics
- Check error rates
- Validate API responses
- Test failover scenarios
- Deploy to production (blue-green)
- Monitor initial traffic
- Verify all health checks
- Update status page
- Monitor error rates
- Check performance metrics
- Review cost analysis
- Collect user feedback
- Document lessons learned
- Update runbooks
- Schedule post-mortem
- Plan optimizations
Troubleshooting Production Issues
Common Issues and Solutions
High Latency
High Latency
Symptoms: Slow response times, timeoutsDiagnostics:Solutions:
# Check agent performance
async def diagnose_latency():
start = time.time()
result = await agent.arun("test")
latency = time.time() - start
if latency > 5: # 5 seconds threshold
logger.warning(f"High latency detected: {latency}s")
# Check cache hit rate
# Check model response time
# Check network latency
- Enable caching for common queries
- Use faster models for simple tasks
- Implement request queuing
- Add more agent instances
Memory Leaks
Memory Leaks
Symptoms: Increasing memory usage, OOM errorsDiagnostics:Solutions:
import tracemalloc
import gc
# Track memory usage
tracemalloc.start()
# ... run agents ...
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:10]:
print(stat)
- Implement proper cleanup in agents
- Use context managers
- Limit conversation history
- Regular garbage collection
API Rate Limits
API Rate Limits
Symptoms: 429 errors, rejected requestsSolutions:
- Implement exponential backoff
- Use multiple API keys
- Add request queuing
- Cache frequent requests
Next Steps
- Review the Monitoring Guide
- Set up Telemetry
- Configure Multi-Provider Setup
- Implement Cost Optimization

