Production Deployment Guide

This guide covers best practices, configurations, and strategies for deploying PraisonAI agents in production environments.

Overview

Deploying AI agents in production requires careful consideration of:
  • Performance and scalability
  • Security and compliance
  • Monitoring and observability
  • Cost optimization
  • Error handling and recovery

Pre-Deployment Checklist

1. Code Preparation

2. Infrastructure Setup

Container Deployment

# Dockerfile
FROM python:3.11-slim

# Security: Run as non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .
RUN chown -R appuser:appuser /app

# Switch to non-root user
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD python -c "import requests; requests.get('http://localhost:8000/health')"

EXPOSE 8000

CMD ["gunicorn", "app:app", "--bind", "0.0.0.0:8000", "--workers", "4"]

Kubernetes Deployment

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: praisonai-agents
  labels:
    app: praisonai-agents
spec:
  replicas: 3
  selector:
    matchLabels:
      app: praisonai-agents
  template:
    metadata:
      labels:
        app: praisonai-agents
    spec:
      containers:
      - name: praisonai
        image: your-registry/praisonai-agents:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: openai-key
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: praisonai-service
spec:
  selector:
    app: praisonai-agents
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

Performance Optimization

1. Caching Strategy

# cache/agent_cache.py
from functools import lru_cache
import hashlib
import redis
import json
from typing import Optional, Any

class AgentCache:
    def __init__(self, redis_url: Optional[str] = None):
        self.redis_client = redis.from_url(redis_url) if redis_url else None
        self.local_cache = {}
    
    def cache_key(self, agent_name: str, input_text: str) -> str:
        """Generate cache key"""
        content = f"{agent_name}:{input_text}"
        return hashlib.md5(content.encode()).hexdigest()
    
    async def get(self, agent_name: str, input_text: str) -> Optional[Any]:
        """Get cached result"""
        key = self.cache_key(agent_name, input_text)
        
        # Try local cache first
        if key in self.local_cache:
            return self.local_cache[key]
        
        # Try Redis
        if self.redis_client:
            cached = self.redis_client.get(key)
            if cached:
                return json.loads(cached)
        
        return None
    
    async def set(self, agent_name: str, input_text: str, result: Any, ttl: int = 3600):
        """Cache result"""
        key = self.cache_key(agent_name, input_text)
        
        # Local cache
        self.local_cache[key] = result
        
        # Redis cache
        if self.redis_client:
            self.redis_client.setex(key, ttl, json.dumps(result))

2. Connection Pooling

# utils/connection_pool.py
import asyncio
from typing import Dict, Any
import aiohttp

class ConnectionPool:
    def __init__(self, max_connections: int = 100):
        self.connector = aiohttp.TCPConnector(
            limit=max_connections,
            limit_per_host=30,
            ttl_dns_cache=300
        )
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(connector=self.connector)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def request(self, method: str, url: str, **kwargs) -> Dict[str, Any]:
        async with self.session.request(method, url, **kwargs) as response:
            return await response.json()

3. Load Balancing

# load_balancer/agent_balancer.py
from typing import List, Optional
import random
from collections import defaultdict

class AgentLoadBalancer:
    def __init__(self, agents: List[Agent]):
        self.agents = agents
        self.agent_loads = defaultdict(int)
        self.agent_errors = defaultdict(int)
    
    def select_agent(self) -> Optional[Agent]:
        """Select agent with least load"""
        available_agents = [
            agent for agent in self.agents
            if self.agent_errors[agent.name] < 3  # Skip agents with many errors
        ]
        
        if not available_agents:
            return None
        
        # Select agent with minimum load
        return min(available_agents, key=lambda a: self.agent_loads[a.name])
    
    async def execute_with_balancing(self, task: str) -> Any:
        agent = self.select_agent()
        if not agent:
            raise Exception("No available agents")
        
        self.agent_loads[agent.name] += 1
        try:
            result = await agent.arun(task)
            self.agent_errors[agent.name] = 0  # Reset error count on success
            return result
        except Exception as e:
            self.agent_errors[agent.name] += 1
            raise
        finally:
            self.agent_loads[agent.name] -= 1

Monitoring and Observability

1. Metrics Collection

# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time

# Define metrics
agent_requests = Counter('agent_requests_total', 'Total agent requests', ['agent_name', 'status'])
agent_latency = Histogram('agent_request_duration_seconds', 'Agent request latency', ['agent_name'])
active_agents = Gauge('active_agents', 'Number of active agents')
token_usage = Counter('token_usage_total', 'Total tokens used', ['model', 'agent_name'])

class MetricsCollector:
    @staticmethod
    def record_request(agent_name: str, status: str):
        agent_requests.labels(agent_name=agent_name, status=status).inc()
    
    @staticmethod
    def record_latency(agent_name: str, duration: float):
        agent_latency.labels(agent_name=agent_name).observe(duration)
    
    @staticmethod
    def record_tokens(model: str, agent_name: str, tokens: int):
        token_usage.labels(model=model, agent_name=agent_name).inc(tokens)

2. Logging Configuration

# logging_config.py
import logging
import json
from pythonjsonlogger import jsonlogger

def setup_production_logging():
    # JSON formatter for structured logs
    logHandler = logging.StreamHandler()
    formatter = jsonlogger.JsonFormatter(
        fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    logHandler.setFormatter(formatter)
    
    # Configure root logger
    logging.root.setLevel(logging.INFO)
    logging.root.addHandler(logHandler)
    
    # Configure specific loggers
    logging.getLogger('praisonaiagents').setLevel(logging.INFO)
    logging.getLogger('httpx').setLevel(logging.WARNING)
    
    return logging.getLogger(__name__)

3. Health Checks

# health/health_check.py
from typing import Dict, List
import asyncio

class HealthChecker:
    def __init__(self, agents: List[Agent], dependencies: Dict[str, Any]):
        self.agents = agents
        self.dependencies = dependencies
    
    async def check_health(self) -> Dict[str, Any]:
        """Comprehensive health check"""
        health_status = {
            "status": "healthy",
            "checks": {}
        }
        
        # Check agents
        for agent in self.agents:
            try:
                await agent.arun("test")
                health_status["checks"][f"agent_{agent.name}"] = "healthy"
            except Exception as e:
                health_status["status"] = "unhealthy"
                health_status["checks"][f"agent_{agent.name}"] = str(e)
        
        # Check dependencies
        for name, dependency in self.dependencies.items():
            try:
                await dependency.ping()
                health_status["checks"][name] = "healthy"
            except Exception as e:
                health_status["status"] = "unhealthy"
                health_status["checks"][name] = str(e)
        
        return health_status

Security Best Practices

1. API Key Management

# security/key_manager.py
import os
from cryptography.fernet import Fernet
import boto3
from typing import Optional

class SecureKeyManager:
    def __init__(self, use_aws_secrets: bool = False):
        self.use_aws_secrets = use_aws_secrets
        self.fernet = Fernet(os.getenv("ENCRYPTION_KEY", Fernet.generate_key()))
        
        if use_aws_secrets:
            self.secrets_client = boto3.client('secretsmanager')
    
    def get_api_key(self, key_name: str) -> Optional[str]:
        """Securely retrieve API key"""
        if self.use_aws_secrets:
            try:
                response = self.secrets_client.get_secret_value(SecretId=key_name)
                return response['SecretString']
            except Exception as e:
                logger.error(f"Failed to retrieve secret: {e}")
                return None
        else:
            # Get from environment and decrypt
            encrypted = os.getenv(key_name)
            if encrypted:
                return self.fernet.decrypt(encrypted.encode()).decode()
            return None

2. Rate Limiting

# security/rate_limiter.py
from collections import defaultdict
import time
from typing import Dict, Tuple

class RateLimiter:
    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.requests: Dict[str, List[float]] = defaultdict(list)
    
    def is_allowed(self, client_id: str) -> Tuple[bool, Optional[float]]:
        """Check if request is allowed"""
        now = time.time()
        minute_ago = now - 60
        
        # Clean old requests
        self.requests[client_id] = [
            req_time for req_time in self.requests[client_id]
            if req_time > minute_ago
        ]
        
        if len(self.requests[client_id]) >= self.requests_per_minute:
            # Calculate wait time
            oldest_request = min(self.requests[client_id])
            wait_time = 60 - (now - oldest_request)
            return False, wait_time
        
        self.requests[client_id].append(now)
        return True, None

Cost Optimization

1. Model Selection Strategy

# optimization/model_selector.py
from typing import Dict, Optional

class CostOptimizedModelSelector:
    def __init__(self):
        self.model_costs = {
            "gpt-4": 0.03,
            "gpt-3.5-turbo": 0.002,
            "claude-3-opus": 0.015,
            "claude-3-sonnet": 0.003
        }
        self.model_capabilities = {
            "gpt-4": ["complex_reasoning", "code", "analysis"],
            "gpt-3.5-turbo": ["general", "conversation"],
            "claude-3-opus": ["complex_reasoning", "long_context"],
            "claude-3-sonnet": ["general", "fast"]
        }
    
    def select_model(self, task_type: str, max_cost: Optional[float] = None) -> str:
        """Select most cost-effective model for task"""
        suitable_models = [
            model for model, capabilities in self.model_capabilities.items()
            if task_type in capabilities or "general" in capabilities
        ]
        
        if max_cost:
            suitable_models = [
                model for model in suitable_models
                if self.model_costs[model] <= max_cost
            ]
        
        # Return cheapest suitable model
        return min(suitable_models, key=lambda m: self.model_costs[m])

2. Token Usage Optimization

# optimization/token_optimizer.py
import tiktoken

class TokenOptimizer:
    def __init__(self, model: str = "gpt-4"):
        self.encoder = tiktoken.encoding_for_model(model)
        self.max_tokens = 8000  # Leave room for response
    
    def optimize_prompt(self, prompt: str, context: str) -> str:
        """Optimize prompt to fit within token limits"""
        prompt_tokens = len(self.encoder.encode(prompt))
        context_tokens = len(self.encoder.encode(context))
        
        total_tokens = prompt_tokens + context_tokens
        
        if total_tokens <= self.max_tokens:
            return f"{prompt}\n\nContext: {context}"
        
        # Truncate context to fit
        available_tokens = self.max_tokens - prompt_tokens - 50  # Buffer
        context_parts = context.split('. ')
        
        optimized_context = ""
        current_tokens = 0
        
        for part in context_parts:
            part_tokens = len(self.encoder.encode(part))
            if current_tokens + part_tokens <= available_tokens:
                optimized_context += part + ". "
                current_tokens += part_tokens
            else:
                break
        
        return f"{prompt}\n\nContext (truncated): {optimized_context}"

Deployment Checklist

  • All tests passing
  • Security scan completed
  • Performance benchmarks met
  • Documentation updated
  • Environment variables configured
  • Secrets management setup
  • Monitoring dashboards created
  • Alerting rules configured
  • Backup strategy defined
  • Rollback plan prepared

Troubleshooting Production Issues

Common Issues and Solutions

Next Steps

  1. Review the Monitoring Guide
  2. Set up Telemetry
  3. Configure Multi-Provider Setup
  4. Implement Cost Optimization

Additional Resources