Skip to main content

Performance Tuning Guidelines

Optimizing performance in multi-agent systems requires a systematic approach to identify bottlenecks and implement targeted improvements. This guide provides strategies for achieving optimal performance.

Performance Analysis Framework

Key Performance Indicators (KPIs)

  1. Response Time: End-to-end request latency
  2. Throughput: Requests processed per second
  3. Resource Utilization: CPU, memory, and I/O usage
  4. Concurrency: Parallel agent execution efficiency
  5. Token Efficiency: Tokens used per task

Performance Profiling

1. Agent Performance Profiler

Comprehensive profiling for multi-agent systems:
import time
import psutil
import cProfile
import pstats
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
import threading
from contextlib import contextmanager
import io

@dataclass
class PerformanceMetrics:
    agent_name: str
    operation: str
    start_time: float
    end_time: Optional[float] = None
    cpu_percent_start: float = 0
    cpu_percent_end: float = 0
    memory_mb_start: float = 0
    memory_mb_end: float = 0
    tokens_used: int = 0
    cache_hits: int = 0
    cache_misses: int = 0
    custom_metrics: Dict[str, Any] = field(default_factory=dict)

class PerformanceProfiler:
    def __init__(self):
        self.metrics: List[PerformanceMetrics] = []
        self.active_profiles: Dict[str, cProfile.Profile] = {}
        self._lock = threading.Lock()
        self.process = psutil.Process()
    
    @contextmanager
    def profile_operation(self, agent_name: str, operation: str):
        """Profile a specific operation"""
        # Start profiling
        profile = cProfile.Profile()
        profile_key = f"{agent_name}:{operation}"
        
        # Collect initial metrics
        metric = PerformanceMetrics(
            agent_name=agent_name,
            operation=operation,
            start_time=time.time(),
            cpu_percent_start=self.process.cpu_percent(),
            memory_mb_start=self.process.memory_info().rss / 1024 / 1024
        )
        
        with self._lock:
            self.active_profiles[profile_key] = profile
        
        profile.enable()
        
        try:
            yield metric
        finally:
            profile.disable()
            
            # Collect final metrics
            metric.end_time = time.time()
            metric.cpu_percent_end = self.process.cpu_percent()
            metric.memory_mb_end = self.process.memory_info().rss / 1024 / 1024
            
            with self._lock:
                self.metrics.append(metric)
                if profile_key in self.active_profiles:
                    del self.active_profiles[profile_key]
    
    def get_profile_stats(self, agent_name: str, operation: str, 
                         top_n: int = 20) -> str:
        """Get detailed profile statistics"""
        profile_key = f"{agent_name}:{operation}"
        
        # Find all metrics for this operation
        operation_metrics = [
            m for m in self.metrics 
            if m.agent_name == agent_name and m.operation == operation
        ]
        
        if not operation_metrics:
            return f"No profiling data for {profile_key}"
        
        # Aggregate statistics
        total_time = sum(m.end_time - m.start_time for m in operation_metrics if m.end_time)
        avg_time = total_time / len(operation_metrics)
        
        # Memory statistics
        memory_deltas = [
            m.memory_mb_end - m.memory_mb_start 
            for m in operation_metrics 
            if m.memory_mb_end > 0
        ]
        avg_memory_delta = sum(memory_deltas) / len(memory_deltas) if memory_deltas else 0
        
        stats = f"""
Performance Profile: {profile_key}
===================================
Executions: {len(operation_metrics)}
Total Time: {total_time:.2f}s
Average Time: {avg_time:.2f}s
Average Memory Delta: {avg_memory_delta:.2f}MB

Top Time-Consuming Operations:
"""
        
        # Add detailed timing breakdown if available
        if hasattr(operation_metrics[-1], '_profile_stats'):
            s = io.StringIO()
            ps = pstats.Stats(operation_metrics[-1]._profile_stats, stream=s)
            ps.strip_dirs().sort_stats('cumulative').print_stats(top_n)
            stats += s.getvalue()
        
        return stats
    
    def identify_bottlenecks(self, threshold_seconds: float = 1.0) -> List[Dict[str, Any]]:
        """Identify performance bottlenecks"""
        bottlenecks = []
        
        # Group metrics by agent and operation
        operation_groups = {}
        for metric in self.metrics:
            if metric.end_time is None:
                continue
                
            key = f"{metric.agent_name}:{metric.operation}"
            if key not in operation_groups:
                operation_groups[key] = []
            operation_groups[key].append(metric)
        
        # Analyze each operation
        for key, metrics in operation_groups.items():
            execution_times = [m.end_time - m.start_time for m in metrics]
            avg_time = sum(execution_times) / len(execution_times)
            
            if avg_time > threshold_seconds:
                memory_deltas = [
                    m.memory_mb_end - m.memory_mb_start 
                    for m in metrics 
                    if m.memory_mb_end > 0
                ]
                
                bottlenecks.append({
                    "operation": key,
                    "avg_execution_time": avg_time,
                    "max_execution_time": max(execution_times),
                    "total_executions": len(metrics),
                    "avg_memory_delta": sum(memory_deltas) / len(memory_deltas) if memory_deltas else 0,
                    "severity": "high" if avg_time > threshold_seconds * 2 else "medium"
                })
        
        # Sort by average execution time
        bottlenecks.sort(key=lambda x: x["avg_execution_time"], reverse=True)
        
        return bottlenecks

2. Async Performance Monitor

Monitor async operations and concurrency:
import asyncio
from typing import Set

class AsyncPerformanceMonitor:
    def __init__(self):
        self.active_tasks: Set[str] = set()
        self.task_metrics: Dict[str, Dict[str, Any]] = {}
        self._lock = asyncio.Lock()
        self.max_concurrent_tasks = 0
    
    @contextmanager
    async def monitor_async_operation(self, operation_name: str):
        """Monitor an async operation"""
        task_id = f"{operation_name}_{id(asyncio.current_task())}"
        
        async with self._lock:
            self.active_tasks.add(task_id)
            self.max_concurrent_tasks = max(
                self.max_concurrent_tasks, 
                len(self.active_tasks)
            )
            
            self.task_metrics[task_id] = {
                "operation": operation_name,
                "start_time": time.time(),
                "status": "running"
            }
        
        try:
            yield
            
            async with self._lock:
                self.task_metrics[task_id]["status"] = "completed"
                self.task_metrics[task_id]["end_time"] = time.time()
                
        except Exception as e:
            async with self._lock:
                self.task_metrics[task_id]["status"] = "failed"
                self.task_metrics[task_id]["error"] = str(e)
                self.task_metrics[task_id]["end_time"] = time.time()
            raise
            
        finally:
            async with self._lock:
                self.active_tasks.discard(task_id)
    
    def get_concurrency_report(self) -> Dict[str, Any]:
        """Get concurrency performance report"""
        completed_tasks = [
            m for m in self.task_metrics.values() 
            if m["status"] == "completed" and "end_time" in m
        ]
        
        if not completed_tasks:
            return {"message": "No completed tasks"}
        
        # Calculate concurrency metrics
        execution_times = [
            t["end_time"] - t["start_time"] 
            for t in completed_tasks
        ]
        
        # Group by operation
        operation_stats = {}
        for task in completed_tasks:
            op = task["operation"]
            if op not in operation_stats:
                operation_stats[op] = {
                    "count": 0,
                    "total_time": 0,
                    "max_concurrent": 0
                }
            
            operation_stats[op]["count"] += 1
            operation_stats[op]["total_time"] += task["end_time"] - task["start_time"]
        
        return {
            "max_concurrent_tasks": self.max_concurrent_tasks,
            "current_active_tasks": len(self.active_tasks),
            "total_completed_tasks": len(completed_tasks),
            "avg_execution_time": sum(execution_times) / len(execution_times),
            "operation_stats": operation_stats
        }

Optimization Strategies

1. Caching Strategy

Implement intelligent caching for expensive operations:
from functools import lru_cache
import hashlib
import pickle

class IntelligentCache:
    def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):
        self.max_size = max_size
        self.ttl_seconds = ttl_seconds
        self.cache: Dict[str, Dict[str, Any]] = {}
        self.access_count: Dict[str, int] = {}
        self.computation_time: Dict[str, float] = {}
        self._lock = threading.Lock()
    
    def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
        """Generate cache key from function arguments"""
        key_data = {
            "func": func_name,
            "args": args,
            "kwargs": kwargs
        }
        
        # Create hash of the data
        key_str = pickle.dumps(key_data)
        return hashlib.sha256(key_str).hexdigest()
    
    def cached(self, ttl_override: Optional[int] = None):
        """Decorator for caching function results"""
        def decorator(func):
            def wrapper(*args, **kwargs):
                cache_key = self._generate_key(func.__name__, args, kwargs)
                
                # Check cache
                with self._lock:
                    if cache_key in self.cache:
                        entry = self.cache[cache_key]
                        
                        # Check TTL
                        age = time.time() - entry["timestamp"]
                        ttl = ttl_override or self.ttl_seconds
                        
                        if age < ttl:
                            self.access_count[cache_key] = self.access_count.get(cache_key, 0) + 1
                            return entry["value"]
                
                # Compute value
                start_time = time.time()
                result = func(*args, **kwargs)
                computation_time = time.time() - start_time
                
                # Store in cache
                with self._lock:
                    # Evict if necessary
                    if len(self.cache) >= self.max_size:
                        self._evict_least_valuable()
                    
                    self.cache[cache_key] = {
                        "value": result,
                        "timestamp": time.time()
                    }
                    self.access_count[cache_key] = 1
                    self.computation_time[cache_key] = computation_time
                
                return result
            
            return wrapper
        return decorator
    
    def _evict_least_valuable(self):
        """Evict least valuable cache entry"""
        if not self.cache:
            return
        
        # Calculate value score for each entry
        scores = {}
        current_time = time.time()
        
        for key, entry in self.cache.items():
            age = current_time - entry["timestamp"]
            access_count = self.access_count.get(key, 1)
            comp_time = self.computation_time.get(key, 0.1)
            
            # Value = (access_count * computation_time) / age
            value_score = (access_count * comp_time) / max(age, 1)
            scores[key] = value_score
        
        # Evict lowest score
        evict_key = min(scores.keys(), key=lambda k: scores[k])
        del self.cache[evict_key]
        self.access_count.pop(evict_key, None)
        self.computation_time.pop(evict_key, None)
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """Get cache performance statistics"""
        with self._lock:
            if not self.access_count:
                return {"cache_size": len(self.cache)}
            
            total_hits = sum(count - 1 for count in self.access_count.values())
            total_misses = len(self.access_count)
            hit_rate = total_hits / (total_hits + total_misses) if (total_hits + total_misses) > 0 else 0
            
            # Calculate time saved
            time_saved = sum(
                (count - 1) * self.computation_time.get(key, 0)
                for key, count in self.access_count.items()
            )
            
            return {
                "cache_size": len(self.cache),
                "hit_rate": hit_rate,
                "total_hits": total_hits,
                "total_misses": total_misses,
                "estimated_time_saved": time_saved,
                "avg_computation_time": sum(self.computation_time.values()) / len(self.computation_time)
            }

2. Batch Processing Optimization

Optimize batch operations for better throughput:
from typing import TypeVar, Generic

T = TypeVar('T')
R = TypeVar('R')

class BatchProcessor(Generic[T, R]):
    def __init__(self,
                 batch_size: int = 100,
                 max_wait_time: float = 0.1,
                 max_workers: int = 4):
        self.batch_size = batch_size
        self.max_wait_time = max_wait_time
        self.max_workers = max_workers
        
        self.pending_items: List[Tuple[T, asyncio.Future]] = []
        self.processing = False
        self._lock = asyncio.Lock()
    
    async def process(self, item: T, processor_func: Callable[[List[T]], List[R]]) -> R:
        """Add item for batch processing"""
        future = asyncio.Future()
        
        async with self._lock:
            self.pending_items.append((item, future))
            
            # Start processing if not already running
            if not self.processing:
                asyncio.create_task(self._process_batches(processor_func))
        
        return await future
    
    async def _process_batches(self, processor_func: Callable[[List[T]], List[R]]):
        """Process items in batches"""
        self.processing = True
        
        try:
            while True:
                # Wait for batch to fill or timeout
                start_wait = time.time()
                
                while len(self.pending_items) < self.batch_size:
                    if time.time() - start_wait > self.max_wait_time:
                        break
                    
                    if not self.pending_items:
                        await asyncio.sleep(0.01)
                        continue
                    
                    await asyncio.sleep(0.001)
                
                # Get batch
                async with self._lock:
                    if not self.pending_items:
                        break
                    
                    batch_items = self.pending_items[:self.batch_size]
                    self.pending_items = self.pending_items[self.batch_size:]
                
                # Process batch
                items = [item for item, _ in batch_items]
                futures = [future for _, future in batch_items]
                
                try:
                    # Process in parallel if supported
                    if asyncio.iscoroutinefunction(processor_func):
                        results = await processor_func(items)
                    else:
                        # Run in thread pool for CPU-bound operations
                        loop = asyncio.get_event_loop()
                        results = await loop.run_in_executor(None, processor_func, items)
                    
                    # Distribute results
                    for future, result in zip(futures, results):
                        future.set_result(result)
                        
                except Exception as e:
                    # Set exception for all futures in batch
                    for future in futures:
                        future.set_exception(e)
        
        finally:
            self.processing = False

3. Connection Pooling

Optimize resource connections:
import asyncio
from asyncio import Queue
import aiohttp

class ConnectionPool:
    def __init__(self,
                 min_connections: int = 5,
                 max_connections: int = 20,
                 connection_timeout: float = 30.0):
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.connection_timeout = connection_timeout
        
        self.available_connections: Queue = Queue()
        self.active_connections = 0
        self.total_connections = 0
        
        self._lock = asyncio.Lock()
        self._connector = None
        self._stats = {
            "connections_created": 0,
            "connections_reused": 0,
            "connection_errors": 0,
            "wait_time_total": 0
        }
    
    async def initialize(self):
        """Initialize connection pool"""
        self._connector = aiohttp.TCPConnector(
            limit=self.max_connections,
            limit_per_host=self.max_connections
        )
        
        # Create minimum connections
        for _ in range(self.min_connections):
            conn = await self._create_connection()
            await self.available_connections.put(conn)
    
    async def acquire(self) -> aiohttp.ClientSession:
        """Acquire a connection from the pool"""
        start_time = time.time()
        
        # Try to get available connection
        try:
            connection = await asyncio.wait_for(
                self.available_connections.get(),
                timeout=0.1
            )
            
            self._stats["connections_reused"] += 1
            
        except asyncio.TimeoutError:
            # Create new connection if under limit
            async with self._lock:
                if self.total_connections < self.max_connections:
                    connection = await self._create_connection()
                else:
                    # Wait for connection to become available
                    connection = await self.available_connections.get()
                    self._stats["connections_reused"] += 1
        
        self._stats["wait_time_total"] += time.time() - start_time
        
        async with self._lock:
            self.active_connections += 1
        
        return connection
    
    async def release(self, connection: aiohttp.ClientSession):
        """Release connection back to pool"""
        async with self._lock:
            self.active_connections -= 1
        
        # Check if connection is still valid
        if not connection.closed:
            await self.available_connections.put(connection)
        else:
            async with self._lock:
                self.total_connections -= 1
            
            # Create replacement if below minimum
            if self.total_connections < self.min_connections:
                try:
                    new_conn = await self._create_connection()
                    await self.available_connections.put(new_conn)
                except Exception:
                    pass
    
    async def _create_connection(self) -> aiohttp.ClientSession:
        """Create a new connection"""
        try:
            session = aiohttp.ClientSession(
                connector=self._connector,
                timeout=aiohttp.ClientTimeout(total=self.connection_timeout)
            )
            
            async with self._lock:
                self.total_connections += 1
                self._stats["connections_created"] += 1
            
            return session
            
        except Exception as e:
            self._stats["connection_errors"] += 1
            raise
    
    def get_pool_stats(self) -> Dict[str, Any]:
        """Get connection pool statistics"""
        return {
            "total_connections": self.total_connections,
            "active_connections": self.active_connections,
            "available_connections": self.available_connections.qsize(),
            "connection_reuse_rate": (
                self._stats["connections_reused"] / 
                max(self._stats["connections_created"] + self._stats["connections_reused"], 1)
            ),
            **self._stats
        }

4. Memory Optimization

Optimize memory usage patterns:
import gc
import weakref
from pympler import tracker

class MemoryOptimizer:
    def __init__(self):
        self.memory_tracker = tracker.SummaryTracker()
        self.large_objects = weakref.WeakValueDictionary()
        self.gc_stats = []
    
    def track_large_object(self, obj_id: str, obj: Any):
        """Track large objects for memory optimization"""
        self.large_objects[obj_id] = obj
    
    def optimize_memory(self) -> Dict[str, Any]:
        """Perform memory optimization"""
        stats = {}
        
        # Get current memory usage
        process = psutil.Process()
        stats["memory_before_mb"] = process.memory_info().rss / 1024 / 1024
        
        # Clear weak references
        stats["weak_refs_cleared"] = len(self.large_objects)
        self.large_objects.clear()
        
        # Run garbage collection
        gc_stats_before = gc.get_stats()
        collected = gc.collect(2)  # Full collection
        gc_stats_after = gc.get_stats()
        
        stats["objects_collected"] = collected
        stats["memory_after_mb"] = process.memory_info().rss / 1024 / 1024
        stats["memory_freed_mb"] = stats["memory_before_mb"] - stats["memory_after_mb"]
        
        # Track GC statistics
        self.gc_stats.append({
            "timestamp": time.time(),
            "collected": collected,
            "memory_freed": stats["memory_freed_mb"]
        })
        
        return stats
    
    def get_memory_report(self) -> Dict[str, Any]:
        """Get detailed memory usage report"""
        # Get memory summary
        summary = self.memory_tracker.create_summary()
        
        # Find top memory consumers
        top_consumers = []
        for entry in sorted(summary, key=lambda x: x[2], reverse=True)[:10]:
            filename, lineno, size, count = entry
            top_consumers.append({
                "location": f"{filename}:{lineno}",
                "size_mb": size / 1024 / 1024,
                "count": count
            })
        
        # Process memory info
        process = psutil.Process()
        memory_info = process.memory_info()
        
        return {
            "current_memory_mb": memory_info.rss / 1024 / 1024,
            "peak_memory_mb": memory_info.peak_wset / 1024 / 1024 if hasattr(memory_info, 'peak_wset') else 0,
            "top_consumers": top_consumers,
            "gc_stats": self.gc_stats[-10:],  # Last 10 GC runs
            "large_objects_tracked": len(self.large_objects)
        }
    
    @staticmethod
    def optimize_data_structure(data: Any) -> Any:
        """Optimize data structures for memory efficiency"""
        if isinstance(data, list):
            # Use array for homogeneous numeric data
            if all(isinstance(x, (int, float)) for x in data):
                import array
                return array.array('d' if any(isinstance(x, float) for x in data) else 'l', data)
        
        elif isinstance(data, dict):
            # Use __slots__ for objects if possible
            if len(data) < 10:  # Small dicts
                class SlottedDict:
                    __slots__ = list(data.keys())
                    
                    def __init__(self, **kwargs):
                        for k, v in kwargs.items():
                            setattr(self, k, v)
                
                return SlottedDict(**data)
        
        return data

Performance Testing

Load Testing Framework

import asyncio
import statistics
from typing import List, Callable

class LoadTester:
    def __init__(self):
        self.results = []
        
    async def run_load_test(self,
                          test_func: Callable,
                          concurrent_users: int = 10,
                          requests_per_user: int = 100,
                          ramp_up_time: float = 10.0) -> Dict[str, Any]:
        """Run load test with gradual ramp-up"""
        
        # Calculate ramp-up delay
        ramp_up_delay = ramp_up_time / concurrent_users
        
        # Create user tasks
        user_tasks = []
        
        for user_id in range(concurrent_users):
            # Stagger user start times
            start_delay = user_id * ramp_up_delay
            
            user_task = asyncio.create_task(
                self._simulate_user(
                    user_id,
                    test_func,
                    requests_per_user,
                    start_delay
                )
            )
            user_tasks.append(user_task)
        
        # Wait for all users to complete
        await asyncio.gather(*user_tasks)
        
        # Calculate statistics
        return self._calculate_statistics()
    
    async def _simulate_user(self,
                           user_id: int,
                           test_func: Callable,
                           num_requests: int,
                           start_delay: float):
        """Simulate a single user"""
        
        # Wait for ramp-up
        await asyncio.sleep(start_delay)
        
        for request_id in range(num_requests):
            start_time = time.time()
            success = False
            error = None
            
            try:
                await test_func(user_id, request_id)
                success = True
            except Exception as e:
                error = str(e)
            
            response_time = time.time() - start_time
            
            self.results.append({
                "user_id": user_id,
                "request_id": request_id,
                "response_time": response_time,
                "success": success,
                "error": error,
                "timestamp": start_time
            })
    
    def _calculate_statistics(self) -> Dict[str, Any]:
        """Calculate load test statistics"""
        if not self.results:
            return {"error": "No results collected"}
        
        # Response times
        response_times = [r["response_time"] for r in self.results]
        successful_times = [r["response_time"] for r in self.results if r["success"]]
        
        # Error rate
        total_requests = len(self.results)
        failed_requests = sum(1 for r in self.results if not r["success"])
        error_rate = failed_requests / total_requests
        
        # Throughput
        test_duration = max(r["timestamp"] for r in self.results) - min(r["timestamp"] for r in self.results)
        throughput = total_requests / test_duration if test_duration > 0 else 0
        
        return {
            "total_requests": total_requests,
            "successful_requests": total_requests - failed_requests,
            "failed_requests": failed_requests,
            "error_rate": error_rate,
            "throughput_rps": throughput,
            "response_time": {
                "min": min(response_times),
                "max": max(response_times),
                "mean": statistics.mean(response_times),
                "median": statistics.median(response_times),
                "p95": statistics.quantiles(response_times, n=20)[18] if len(response_times) > 20 else max(response_times),
                "p99": statistics.quantiles(response_times, n=100)[98] if len(response_times) > 100 else max(response_times)
            }
        }

Optimization Checklist

1. Code-Level Optimizations

class OptimizationChecker:
    """Check for common optimization opportunities"""
    
    @staticmethod
    def check_n_plus_one_queries(operations: List[Dict]) -> List[str]:
        """Detect N+1 query patterns"""
        issues = []
        
        # Group operations by type and timing
        operation_groups = {}
        
        for op in operations:
            key = op["type"]
            if key not in operation_groups:
                operation_groups[key] = []
            operation_groups[key].append(op["timestamp"])
        
        # Check for repeated operations in tight loops
        for op_type, timestamps in operation_groups.items():
            if len(timestamps) > 10:
                # Check if operations are clustered
                timestamps.sort()
                
                clusters = []
                current_cluster = [timestamps[0]]
                
                for ts in timestamps[1:]:
                    if ts - current_cluster[-1] < 0.1:  # Within 100ms
                        current_cluster.append(ts)
                    else:
                        if len(current_cluster) > 5:
                            clusters.append(current_cluster)
                        current_cluster = [ts]
                
                if clusters:
                    issues.append(
                        f"Potential N+1 query pattern detected for {op_type}: "
                        f"{len(clusters)} clusters with avg size {sum(len(c) for c in clusters) / len(clusters):.1f}"
                    )
        
        return issues
    
    @staticmethod
    def check_synchronous_io(code_metrics: Dict) -> List[str]:
        """Check for synchronous I/O in async context"""
        issues = []
        
        sync_io_patterns = [
            "time.sleep",
            "requests.get",
            "open(",
            "file.read",
            "file.write"
        ]
        
        for pattern in sync_io_patterns:
            if pattern in code_metrics.get("function_calls", []):
                issues.append(
                    f"Synchronous I/O detected: {pattern}. "
                    f"Consider using async alternative."
                )
        
        return issues

Best Practices

  1. Profile Before Optimizing: Always measure before making changes
    with profiler.profile_operation("agent", "inference"):
        result = await agent.process_request(request)
    
  2. Set Performance Budgets: Define acceptable performance thresholds
    PERFORMANCE_BUDGETS = {
        "api_response_time_p95": 1.0,  # seconds
        "memory_per_request": 50,       # MB
        "tokens_per_request": 1000,     # max tokens
    }
    
  3. Monitor Production Performance: Track real-world metrics
    @app.middleware("http")
    async def add_performance_monitoring(request, call_next):
        start_time = time.time()
        response = await call_next(request)
        
        # Record metrics
        latency = time.time() - start_time
        metrics.record("http_request_duration", latency, {
            "method": request.method,
            "endpoint": request.url.path,
            "status": response.status_code
        })
        
        return response
    

Testing Performance Optimizations

import pytest
import asyncio

@pytest.mark.asyncio
async def test_batch_processor():
    processor = BatchProcessor[int, int](batch_size=5, max_wait_time=0.05)
    
    # Process function that doubles values
    def double_batch(items: List[int]) -> List[int]:
        return [x * 2 for x in items]
    
    # Submit multiple items
    tasks = []
    for i in range(10):
        task = processor.process(i, double_batch)
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    
    assert results == [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

def test_intelligent_cache():
    cache = IntelligentCache(max_size=3)
    
    call_count = 0
    
    @cache.cached()
    def expensive_function(x):
        nonlocal call_count
        call_count += 1
        time.sleep(0.1)  # Simulate expensive operation
        return x * 2
    
    # First call - cache miss
    result1 = expensive_function(5)
    assert result1 == 10
    assert call_count == 1
    
    # Second call - cache hit
    result2 = expensive_function(5)
    assert result2 == 10
    assert call_count == 1
    
    # Check cache stats
    stats = cache.get_cache_stats()
    assert stats["hit_rate"] == 0.5
    assert stats["estimated_time_saved"] >= 0.1

@pytest.mark.asyncio
async def test_connection_pool():
    pool = ConnectionPool(min_connections=2, max_connections=5)
    await pool.initialize()
    
    # Acquire multiple connections
    connections = []
    for _ in range(3):
        conn = await pool.acquire()
        connections.append(conn)
    
    stats = pool.get_pool_stats()
    assert stats["active_connections"] == 3
    
    # Release connections
    for conn in connections:
        await pool.release(conn)
    
    stats = pool.get_pool_stats()
    assert stats["active_connections"] == 0

Conclusion

Performance tuning in multi-agent systems requires a systematic approach combining profiling, analysis, and targeted optimizations. By following these guidelines and continuously monitoring performance metrics, you can build systems that scale efficiently while maintaining responsiveness.