Performance Tuning Guidelines
Optimizing performance in multi-agent systems requires a systematic approach to identify bottlenecks and implement targeted improvements. This guide provides strategies for achieving optimal performance.Performance Analysis Framework
Key Performance Indicators (KPIs)
- Response Time: End-to-end request latency
- Throughput: Requests processed per second
- Resource Utilization: CPU, memory, and I/O usage
- Concurrency: Parallel agent execution efficiency
- Token Efficiency: Tokens used per task
Performance Profiling
1. Agent Performance Profiler
Comprehensive profiling for multi-agent systems:Copy
import time
import psutil
import cProfile
import pstats
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
import threading
from contextlib import contextmanager
import io
@dataclass
class PerformanceMetrics:
agent_name: str
operation: str
start_time: float
end_time: Optional[float] = None
cpu_percent_start: float = 0
cpu_percent_end: float = 0
memory_mb_start: float = 0
memory_mb_end: float = 0
tokens_used: int = 0
cache_hits: int = 0
cache_misses: int = 0
custom_metrics: Dict[str, Any] = field(default_factory=dict)
class PerformanceProfiler:
def __init__(self):
self.metrics: List[PerformanceMetrics] = []
self.active_profiles: Dict[str, cProfile.Profile] = {}
self._lock = threading.Lock()
self.process = psutil.Process()
@contextmanager
def profile_operation(self, agent_name: str, operation: str):
"""Profile a specific operation"""
# Start profiling
profile = cProfile.Profile()
profile_key = f"{agent_name}:{operation}"
# Collect initial metrics
metric = PerformanceMetrics(
agent_name=agent_name,
operation=operation,
start_time=time.time(),
cpu_percent_start=self.process.cpu_percent(),
memory_mb_start=self.process.memory_info().rss / 1024 / 1024
)
with self._lock:
self.active_profiles[profile_key] = profile
profile.enable()
try:
yield metric
finally:
profile.disable()
# Collect final metrics
metric.end_time = time.time()
metric.cpu_percent_end = self.process.cpu_percent()
metric.memory_mb_end = self.process.memory_info().rss / 1024 / 1024
with self._lock:
self.metrics.append(metric)
if profile_key in self.active_profiles:
del self.active_profiles[profile_key]
def get_profile_stats(self, agent_name: str, operation: str,
top_n: int = 20) -> str:
"""Get detailed profile statistics"""
profile_key = f"{agent_name}:{operation}"
# Find all metrics for this operation
operation_metrics = [
m for m in self.metrics
if m.agent_name == agent_name and m.operation == operation
]
if not operation_metrics:
return f"No profiling data for {profile_key}"
# Aggregate statistics
total_time = sum(m.end_time - m.start_time for m in operation_metrics if m.end_time)
avg_time = total_time / len(operation_metrics)
# Memory statistics
memory_deltas = [
m.memory_mb_end - m.memory_mb_start
for m in operation_metrics
if m.memory_mb_end > 0
]
avg_memory_delta = sum(memory_deltas) / len(memory_deltas) if memory_deltas else 0
stats = f"""
Performance Profile: {profile_key}
===================================
Executions: {len(operation_metrics)}
Total Time: {total_time:.2f}s
Average Time: {avg_time:.2f}s
Average Memory Delta: {avg_memory_delta:.2f}MB
Top Time-Consuming Operations:
"""
# Add detailed timing breakdown if available
if hasattr(operation_metrics[-1], '_profile_stats'):
s = io.StringIO()
ps = pstats.Stats(operation_metrics[-1]._profile_stats, stream=s)
ps.strip_dirs().sort_stats('cumulative').print_stats(top_n)
stats += s.getvalue()
return stats
def identify_bottlenecks(self, threshold_seconds: float = 1.0) -> List[Dict[str, Any]]:
"""Identify performance bottlenecks"""
bottlenecks = []
# Group metrics by agent and operation
operation_groups = {}
for metric in self.metrics:
if metric.end_time is None:
continue
key = f"{metric.agent_name}:{metric.operation}"
if key not in operation_groups:
operation_groups[key] = []
operation_groups[key].append(metric)
# Analyze each operation
for key, metrics in operation_groups.items():
execution_times = [m.end_time - m.start_time for m in metrics]
avg_time = sum(execution_times) / len(execution_times)
if avg_time > threshold_seconds:
memory_deltas = [
m.memory_mb_end - m.memory_mb_start
for m in metrics
if m.memory_mb_end > 0
]
bottlenecks.append({
"operation": key,
"avg_execution_time": avg_time,
"max_execution_time": max(execution_times),
"total_executions": len(metrics),
"avg_memory_delta": sum(memory_deltas) / len(memory_deltas) if memory_deltas else 0,
"severity": "high" if avg_time > threshold_seconds * 2 else "medium"
})
# Sort by average execution time
bottlenecks.sort(key=lambda x: x["avg_execution_time"], reverse=True)
return bottlenecks
2. Async Performance Monitor
Monitor async operations and concurrency:Copy
import asyncio
from typing import Set
class AsyncPerformanceMonitor:
def __init__(self):
self.active_tasks: Set[str] = set()
self.task_metrics: Dict[str, Dict[str, Any]] = {}
self._lock = asyncio.Lock()
self.max_concurrent_tasks = 0
@contextmanager
async def monitor_async_operation(self, operation_name: str):
"""Monitor an async operation"""
task_id = f"{operation_name}_{id(asyncio.current_task())}"
async with self._lock:
self.active_tasks.add(task_id)
self.max_concurrent_tasks = max(
self.max_concurrent_tasks,
len(self.active_tasks)
)
self.task_metrics[task_id] = {
"operation": operation_name,
"start_time": time.time(),
"status": "running"
}
try:
yield
async with self._lock:
self.task_metrics[task_id]["status"] = "completed"
self.task_metrics[task_id]["end_time"] = time.time()
except Exception as e:
async with self._lock:
self.task_metrics[task_id]["status"] = "failed"
self.task_metrics[task_id]["error"] = str(e)
self.task_metrics[task_id]["end_time"] = time.time()
raise
finally:
async with self._lock:
self.active_tasks.discard(task_id)
def get_concurrency_report(self) -> Dict[str, Any]:
"""Get concurrency performance report"""
completed_tasks = [
m for m in self.task_metrics.values()
if m["status"] == "completed" and "end_time" in m
]
if not completed_tasks:
return {"message": "No completed tasks"}
# Calculate concurrency metrics
execution_times = [
t["end_time"] - t["start_time"]
for t in completed_tasks
]
# Group by operation
operation_stats = {}
for task in completed_tasks:
op = task["operation"]
if op not in operation_stats:
operation_stats[op] = {
"count": 0,
"total_time": 0,
"max_concurrent": 0
}
operation_stats[op]["count"] += 1
operation_stats[op]["total_time"] += task["end_time"] - task["start_time"]
return {
"max_concurrent_tasks": self.max_concurrent_tasks,
"current_active_tasks": len(self.active_tasks),
"total_completed_tasks": len(completed_tasks),
"avg_execution_time": sum(execution_times) / len(execution_times),
"operation_stats": operation_stats
}
Optimization Strategies
1. Caching Strategy
Implement intelligent caching for expensive operations:Copy
from functools import lru_cache
import hashlib
import pickle
class IntelligentCache:
def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.cache: Dict[str, Dict[str, Any]] = {}
self.access_count: Dict[str, int] = {}
self.computation_time: Dict[str, float] = {}
self._lock = threading.Lock()
def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""Generate cache key from function arguments"""
key_data = {
"func": func_name,
"args": args,
"kwargs": kwargs
}
# Create hash of the data
key_str = pickle.dumps(key_data)
return hashlib.sha256(key_str).hexdigest()
def cached(self, ttl_override: Optional[int] = None):
"""Decorator for caching function results"""
def decorator(func):
def wrapper(*args, **kwargs):
cache_key = self._generate_key(func.__name__, args, kwargs)
# Check cache
with self._lock:
if cache_key in self.cache:
entry = self.cache[cache_key]
# Check TTL
age = time.time() - entry["timestamp"]
ttl = ttl_override or self.ttl_seconds
if age < ttl:
self.access_count[cache_key] = self.access_count.get(cache_key, 0) + 1
return entry["value"]
# Compute value
start_time = time.time()
result = func(*args, **kwargs)
computation_time = time.time() - start_time
# Store in cache
with self._lock:
# Evict if necessary
if len(self.cache) >= self.max_size:
self._evict_least_valuable()
self.cache[cache_key] = {
"value": result,
"timestamp": time.time()
}
self.access_count[cache_key] = 1
self.computation_time[cache_key] = computation_time
return result
return wrapper
return decorator
def _evict_least_valuable(self):
"""Evict least valuable cache entry"""
if not self.cache:
return
# Calculate value score for each entry
scores = {}
current_time = time.time()
for key, entry in self.cache.items():
age = current_time - entry["timestamp"]
access_count = self.access_count.get(key, 1)
comp_time = self.computation_time.get(key, 0.1)
# Value = (access_count * computation_time) / age
value_score = (access_count * comp_time) / max(age, 1)
scores[key] = value_score
# Evict lowest score
evict_key = min(scores.keys(), key=lambda k: scores[k])
del self.cache[evict_key]
self.access_count.pop(evict_key, None)
self.computation_time.pop(evict_key, None)
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics"""
with self._lock:
if not self.access_count:
return {"cache_size": len(self.cache)}
total_hits = sum(count - 1 for count in self.access_count.values())
total_misses = len(self.access_count)
hit_rate = total_hits / (total_hits + total_misses) if (total_hits + total_misses) > 0 else 0
# Calculate time saved
time_saved = sum(
(count - 1) * self.computation_time.get(key, 0)
for key, count in self.access_count.items()
)
return {
"cache_size": len(self.cache),
"hit_rate": hit_rate,
"total_hits": total_hits,
"total_misses": total_misses,
"estimated_time_saved": time_saved,
"avg_computation_time": sum(self.computation_time.values()) / len(self.computation_time)
}
2. Batch Processing Optimization
Optimize batch operations for better throughput:Copy
from typing import TypeVar, Generic
T = TypeVar('T')
R = TypeVar('R')
class BatchProcessor(Generic[T, R]):
def __init__(self,
batch_size: int = 100,
max_wait_time: float = 0.1,
max_workers: int = 4):
self.batch_size = batch_size
self.max_wait_time = max_wait_time
self.max_workers = max_workers
self.pending_items: List[Tuple[T, asyncio.Future]] = []
self.processing = False
self._lock = asyncio.Lock()
async def process(self, item: T, processor_func: Callable[[List[T]], List[R]]) -> R:
"""Add item for batch processing"""
future = asyncio.Future()
async with self._lock:
self.pending_items.append((item, future))
# Start processing if not already running
if not self.processing:
asyncio.create_task(self._process_batches(processor_func))
return await future
async def _process_batches(self, processor_func: Callable[[List[T]], List[R]]):
"""Process items in batches"""
self.processing = True
try:
while True:
# Wait for batch to fill or timeout
start_wait = time.time()
while len(self.pending_items) < self.batch_size:
if time.time() - start_wait > self.max_wait_time:
break
if not self.pending_items:
await asyncio.sleep(0.01)
continue
await asyncio.sleep(0.001)
# Get batch
async with self._lock:
if not self.pending_items:
break
batch_items = self.pending_items[:self.batch_size]
self.pending_items = self.pending_items[self.batch_size:]
# Process batch
items = [item for item, _ in batch_items]
futures = [future for _, future in batch_items]
try:
# Process in parallel if supported
if asyncio.iscoroutinefunction(processor_func):
results = await processor_func(items)
else:
# Run in thread pool for CPU-bound operations
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(None, processor_func, items)
# Distribute results
for future, result in zip(futures, results):
future.set_result(result)
except Exception as e:
# Set exception for all futures in batch
for future in futures:
future.set_exception(e)
finally:
self.processing = False
3. Connection Pooling
Optimize resource connections:Copy
import asyncio
from asyncio import Queue
import aiohttp
class ConnectionPool:
def __init__(self,
min_connections: int = 5,
max_connections: int = 20,
connection_timeout: float = 30.0):
self.min_connections = min_connections
self.max_connections = max_connections
self.connection_timeout = connection_timeout
self.available_connections: Queue = Queue()
self.active_connections = 0
self.total_connections = 0
self._lock = asyncio.Lock()
self._connector = None
self._stats = {
"connections_created": 0,
"connections_reused": 0,
"connection_errors": 0,
"wait_time_total": 0
}
async def initialize(self):
"""Initialize connection pool"""
self._connector = aiohttp.TCPConnector(
limit=self.max_connections,
limit_per_host=self.max_connections
)
# Create minimum connections
for _ in range(self.min_connections):
conn = await self._create_connection()
await self.available_connections.put(conn)
async def acquire(self) -> aiohttp.ClientSession:
"""Acquire a connection from the pool"""
start_time = time.time()
# Try to get available connection
try:
connection = await asyncio.wait_for(
self.available_connections.get(),
timeout=0.1
)
self._stats["connections_reused"] += 1
except asyncio.TimeoutError:
# Create new connection if under limit
async with self._lock:
if self.total_connections < self.max_connections:
connection = await self._create_connection()
else:
# Wait for connection to become available
connection = await self.available_connections.get()
self._stats["connections_reused"] += 1
self._stats["wait_time_total"] += time.time() - start_time
async with self._lock:
self.active_connections += 1
return connection
async def release(self, connection: aiohttp.ClientSession):
"""Release connection back to pool"""
async with self._lock:
self.active_connections -= 1
# Check if connection is still valid
if not connection.closed:
await self.available_connections.put(connection)
else:
async with self._lock:
self.total_connections -= 1
# Create replacement if below minimum
if self.total_connections < self.min_connections:
try:
new_conn = await self._create_connection()
await self.available_connections.put(new_conn)
except Exception:
pass
async def _create_connection(self) -> aiohttp.ClientSession:
"""Create a new connection"""
try:
session = aiohttp.ClientSession(
connector=self._connector,
timeout=aiohttp.ClientTimeout(total=self.connection_timeout)
)
async with self._lock:
self.total_connections += 1
self._stats["connections_created"] += 1
return session
except Exception as e:
self._stats["connection_errors"] += 1
raise
def get_pool_stats(self) -> Dict[str, Any]:
"""Get connection pool statistics"""
return {
"total_connections": self.total_connections,
"active_connections": self.active_connections,
"available_connections": self.available_connections.qsize(),
"connection_reuse_rate": (
self._stats["connections_reused"] /
max(self._stats["connections_created"] + self._stats["connections_reused"], 1)
),
**self._stats
}
4. Memory Optimization
Optimize memory usage patterns:Copy
import gc
import weakref
from pympler import tracker
class MemoryOptimizer:
def __init__(self):
self.memory_tracker = tracker.SummaryTracker()
self.large_objects = weakref.WeakValueDictionary()
self.gc_stats = []
def track_large_object(self, obj_id: str, obj: Any):
"""Track large objects for memory optimization"""
self.large_objects[obj_id] = obj
def optimize_memory(self) -> Dict[str, Any]:
"""Perform memory optimization"""
stats = {}
# Get current memory usage
process = psutil.Process()
stats["memory_before_mb"] = process.memory_info().rss / 1024 / 1024
# Clear weak references
stats["weak_refs_cleared"] = len(self.large_objects)
self.large_objects.clear()
# Run garbage collection
gc_stats_before = gc.get_stats()
collected = gc.collect(2) # Full collection
gc_stats_after = gc.get_stats()
stats["objects_collected"] = collected
stats["memory_after_mb"] = process.memory_info().rss / 1024 / 1024
stats["memory_freed_mb"] = stats["memory_before_mb"] - stats["memory_after_mb"]
# Track GC statistics
self.gc_stats.append({
"timestamp": time.time(),
"collected": collected,
"memory_freed": stats["memory_freed_mb"]
})
return stats
def get_memory_report(self) -> Dict[str, Any]:
"""Get detailed memory usage report"""
# Get memory summary
summary = self.memory_tracker.create_summary()
# Find top memory consumers
top_consumers = []
for entry in sorted(summary, key=lambda x: x[2], reverse=True)[:10]:
filename, lineno, size, count = entry
top_consumers.append({
"location": f"{filename}:{lineno}",
"size_mb": size / 1024 / 1024,
"count": count
})
# Process memory info
process = psutil.Process()
memory_info = process.memory_info()
return {
"current_memory_mb": memory_info.rss / 1024 / 1024,
"peak_memory_mb": memory_info.peak_wset / 1024 / 1024 if hasattr(memory_info, 'peak_wset') else 0,
"top_consumers": top_consumers,
"gc_stats": self.gc_stats[-10:], # Last 10 GC runs
"large_objects_tracked": len(self.large_objects)
}
@staticmethod
def optimize_data_structure(data: Any) -> Any:
"""Optimize data structures for memory efficiency"""
if isinstance(data, list):
# Use array for homogeneous numeric data
if all(isinstance(x, (int, float)) for x in data):
import array
return array.array('d' if any(isinstance(x, float) for x in data) else 'l', data)
elif isinstance(data, dict):
# Use __slots__ for objects if possible
if len(data) < 10: # Small dicts
class SlottedDict:
__slots__ = list(data.keys())
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
return SlottedDict(**data)
return data
Performance Testing
Load Testing Framework
Copy
import asyncio
import statistics
from typing import List, Callable
class LoadTester:
def __init__(self):
self.results = []
async def run_load_test(self,
test_func: Callable,
concurrent_users: int = 10,
requests_per_user: int = 100,
ramp_up_time: float = 10.0) -> Dict[str, Any]:
"""Run load test with gradual ramp-up"""
# Calculate ramp-up delay
ramp_up_delay = ramp_up_time / concurrent_users
# Create user tasks
user_tasks = []
for user_id in range(concurrent_users):
# Stagger user start times
start_delay = user_id * ramp_up_delay
user_task = asyncio.create_task(
self._simulate_user(
user_id,
test_func,
requests_per_user,
start_delay
)
)
user_tasks.append(user_task)
# Wait for all users to complete
await asyncio.gather(*user_tasks)
# Calculate statistics
return self._calculate_statistics()
async def _simulate_user(self,
user_id: int,
test_func: Callable,
num_requests: int,
start_delay: float):
"""Simulate a single user"""
# Wait for ramp-up
await asyncio.sleep(start_delay)
for request_id in range(num_requests):
start_time = time.time()
success = False
error = None
try:
await test_func(user_id, request_id)
success = True
except Exception as e:
error = str(e)
response_time = time.time() - start_time
self.results.append({
"user_id": user_id,
"request_id": request_id,
"response_time": response_time,
"success": success,
"error": error,
"timestamp": start_time
})
def _calculate_statistics(self) -> Dict[str, Any]:
"""Calculate load test statistics"""
if not self.results:
return {"error": "No results collected"}
# Response times
response_times = [r["response_time"] for r in self.results]
successful_times = [r["response_time"] for r in self.results if r["success"]]
# Error rate
total_requests = len(self.results)
failed_requests = sum(1 for r in self.results if not r["success"])
error_rate = failed_requests / total_requests
# Throughput
test_duration = max(r["timestamp"] for r in self.results) - min(r["timestamp"] for r in self.results)
throughput = total_requests / test_duration if test_duration > 0 else 0
return {
"total_requests": total_requests,
"successful_requests": total_requests - failed_requests,
"failed_requests": failed_requests,
"error_rate": error_rate,
"throughput_rps": throughput,
"response_time": {
"min": min(response_times),
"max": max(response_times),
"mean": statistics.mean(response_times),
"median": statistics.median(response_times),
"p95": statistics.quantiles(response_times, n=20)[18] if len(response_times) > 20 else max(response_times),
"p99": statistics.quantiles(response_times, n=100)[98] if len(response_times) > 100 else max(response_times)
}
}
Optimization Checklist
1. Code-Level Optimizations
Copy
class OptimizationChecker:
"""Check for common optimization opportunities"""
@staticmethod
def check_n_plus_one_queries(operations: List[Dict]) -> List[str]:
"""Detect N+1 query patterns"""
issues = []
# Group operations by type and timing
operation_groups = {}
for op in operations:
key = op["type"]
if key not in operation_groups:
operation_groups[key] = []
operation_groups[key].append(op["timestamp"])
# Check for repeated operations in tight loops
for op_type, timestamps in operation_groups.items():
if len(timestamps) > 10:
# Check if operations are clustered
timestamps.sort()
clusters = []
current_cluster = [timestamps[0]]
for ts in timestamps[1:]:
if ts - current_cluster[-1] < 0.1: # Within 100ms
current_cluster.append(ts)
else:
if len(current_cluster) > 5:
clusters.append(current_cluster)
current_cluster = [ts]
if clusters:
issues.append(
f"Potential N+1 query pattern detected for {op_type}: "
f"{len(clusters)} clusters with avg size {sum(len(c) for c in clusters) / len(clusters):.1f}"
)
return issues
@staticmethod
def check_synchronous_io(code_metrics: Dict) -> List[str]:
"""Check for synchronous I/O in async context"""
issues = []
sync_io_patterns = [
"time.sleep",
"requests.get",
"open(",
"file.read",
"file.write"
]
for pattern in sync_io_patterns:
if pattern in code_metrics.get("function_calls", []):
issues.append(
f"Synchronous I/O detected: {pattern}. "
f"Consider using async alternative."
)
return issues
Best Practices
-
Profile Before Optimizing: Always measure before making changes
Copy
with profiler.profile_operation("agent", "inference"): result = await agent.process_request(request) -
Set Performance Budgets: Define acceptable performance thresholds
Copy
PERFORMANCE_BUDGETS = { "api_response_time_p95": 1.0, # seconds "memory_per_request": 50, # MB "tokens_per_request": 1000, # max tokens } -
Monitor Production Performance: Track real-world metrics
Copy
@app.middleware("http") async def add_performance_monitoring(request, call_next): start_time = time.time() response = await call_next(request) # Record metrics latency = time.time() - start_time metrics.record("http_request_duration", latency, { "method": request.method, "endpoint": request.url.path, "status": response.status_code }) return response
Testing Performance Optimizations
Copy
import pytest
import asyncio
@pytest.mark.asyncio
async def test_batch_processor():
processor = BatchProcessor[int, int](batch_size=5, max_wait_time=0.05)
# Process function that doubles values
def double_batch(items: List[int]) -> List[int]:
return [x * 2 for x in items]
# Submit multiple items
tasks = []
for i in range(10):
task = processor.process(i, double_batch)
tasks.append(task)
results = await asyncio.gather(*tasks)
assert results == [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
def test_intelligent_cache():
cache = IntelligentCache(max_size=3)
call_count = 0
@cache.cached()
def expensive_function(x):
nonlocal call_count
call_count += 1
time.sleep(0.1) # Simulate expensive operation
return x * 2
# First call - cache miss
result1 = expensive_function(5)
assert result1 == 10
assert call_count == 1
# Second call - cache hit
result2 = expensive_function(5)
assert result2 == 10
assert call_count == 1
# Check cache stats
stats = cache.get_cache_stats()
assert stats["hit_rate"] == 0.5
assert stats["estimated_time_saved"] >= 0.1
@pytest.mark.asyncio
async def test_connection_pool():
pool = ConnectionPool(min_connections=2, max_connections=5)
await pool.initialize()
# Acquire multiple connections
connections = []
for _ in range(3):
conn = await pool.acquire()
connections.append(conn)
stats = pool.get_pool_stats()
assert stats["active_connections"] == 3
# Release connections
for conn in connections:
await pool.release(conn)
stats = pool.get_pool_stats()
assert stats["active_connections"] == 0

