Monitor and optimise performance with comprehensive latency tracking in PraisonAI
from praisonaiagents import Agent
from examples.custom_tools import LatencyTrackerTool
# Create latency tracker
latency_tracker = LatencyTrackerTool()
# Create agent with latency tracking
agent = Agent(
name="TrackedAgent",
instructions="Process requests with performance monitoring",
tools=[latency_tracker, other_tools]
)
# Use the agent
response = agent.chat("Analyse this data and generate a report")
# Get metrics
metrics = latency_tracker.get_metrics()
print(f"Total requests: {metrics['total_requests']}")
print(f"Average latency: {metrics['average_latency_ms']}ms")
# Track specific phases
with latency_tracker.track("data_processing"):
# Process data
processed_data = process_large_dataset()
with latency_tracker.track("llm_generation"):
# Generate response
response = agent.chat(processed_data)
# Get phase-specific metrics
phase_metrics = latency_tracker.get_metrics_by_phase()
for phase, stats in phase_metrics.items():
print(f"{phase}: avg={stats['avg']:.2f}ms, max={stats['max']:.2f}ms")
import time
from contextlib import contextmanager
class EnhancedLatencyTracker:
def __init__(self):
self.metrics = {}
self.active_timers = {}
@contextmanager
def track_phase(self, phase_name, metadata=None):
start_time = time.time()
timer_id = f"{phase_name}_{id(start_time)}"
self.active_timers[timer_id] = {
"phase": phase_name,
"start": start_time,
"metadata": metadata or {}
}
try:
yield timer_id
finally:
duration = (time.time() - start_time) * 1000 # ms
if phase_name not in self.metrics:
self.metrics[phase_name] = []
self.metrics[phase_name].append({
"duration": duration,
"timestamp": start_time,
"metadata": self.active_timers[timer_id]["metadata"]
})
del self.active_timers[timer_id]
# Usage
tracker = EnhancedLatencyTracker()
with tracker.track_phase("api_call", {"endpoint": "/analyse"}):
# Make API call
result = make_api_call()
with tracker.track_phase("post_processing", {"size": len(result)}):
# Process results
final_result = process_results(result)
from functools import wraps
import asyncio
def track_latency(phase_name=None):
def decorator(func):
actual_phase = phase_name or func.__name__
@wraps(func)
def sync_wrapper(*args, **kwargs):
tracker = get_global_tracker() # Get tracker instance
with tracker.track(actual_phase):
return func(*args, **kwargs)
@wraps(func)
async def async_wrapper(*args, **kwargs):
tracker = get_global_tracker()
with tracker.track(actual_phase):
return await func(*args, **kwargs)
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
# Usage
@track_latency("database_query")
def fetch_user_data(user_id):
# Database operation
return db.query(f"SELECT * FROM users WHERE id = {user_id}")
@track_latency()
async def process_request(request):
# Async processing
result = await async_operation(request)
return result
class TrackedAgent:
def __init__(self, agent, tracker):
self.agent = agent
self.tracker = tracker
self.request_count = 0
def chat(self, message, **kwargs):
self.request_count += 1
request_id = f"req_{self.request_count}"
# Track overall request
with self.tracker.track(f"request_{request_id}"):
# Track planning phase
with self.tracker.track("planning"):
plan = self.agent.plan(message)
# Track execution phase
with self.tracker.track("execution"):
result = self.agent.execute(plan, **kwargs)
# Track formatting phase
with self.tracker.track("formatting"):
response = self.agent.format_response(result)
return response
def get_performance_report(self):
metrics = self.tracker.get_metrics()
return {
"total_requests": self.request_count,
"average_request_time": metrics.get("avg", 0),
"phase_breakdown": self.tracker.get_metrics_by_phase(),
"slowest_phase": self._identify_slowest_phase()
}
def _identify_slowest_phase(self):
phase_metrics = self.tracker.get_metrics_by_phase()
slowest = max(
phase_metrics.items(),
key=lambda x: x[1]["avg"],
default=(None, {"avg": 0})
)
return slowest[0]
from praisonaiagents import Agent
class MCPLatencyTracker:
def __init__(self):
self.mcp_metrics = {}
def track_mcp_request(self, server_name, tool_name):
key = f"{server_name}.{tool_name}"
if key not in self.mcp_metrics:
self.mcp_metrics[key] = {
"count": 0,
"total_ms": 0,
"errors": 0
}
start_time = time.time()
def record_result(success=True):
duration = (time.time() - start_time) * 1000
self.mcp_metrics[key]["count"] += 1
self.mcp_metrics[key]["total_ms"] += duration
if not success:
self.mcp_metrics[key]["errors"] += 1
return record_result
# Track MCP server calls
tracker = MCPLatencyTracker()
# Example with filesystem MCP
record = tracker.track_mcp_request("filesystem", "read_file")
try:
content = mcp_filesystem.read_file("data.txt")
record(success=True)
except Exception:
record(success=False)
raise
# Get MCP-specific metrics
for key, stats in tracker.mcp_metrics.items():
avg_latency = stats["total_ms"] / stats["count"] if stats["count"] > 0 else 0
error_rate = stats["errors"] / stats["count"] if stats["count"] > 0 else 0
print(f"{key}: {avg_latency:.2f}ms avg, {error_rate:.1%} error rate")
import numpy as np
from collections import defaultdict
class LatencyAnalyser:
def __init__(self, tracker):
self.tracker = tracker
def analyse_phase(self, phase_name):
metrics = self.tracker.get_metrics_by_phase().get(phase_name, {})
if not metrics.get("count"):
return None
# Get all latency values
latencies = metrics.get("all_values", [])
if not latencies:
return None
return {
"mean": np.mean(latencies),
"median": np.median(latencies),
"std_dev": np.std(latencies),
"percentiles": {
"p50": np.percentile(latencies, 50),
"p90": np.percentile(latencies, 90),
"p95": np.percentile(latencies, 95),
"p99": np.percentile(latencies, 99)
},
"outliers": self._detect_outliers(latencies)
}
def _detect_outliers(self, values):
q1 = np.percentile(values, 25)
q3 = np.percentile(values, 75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
return [v for v in values if v < lower_bound or v > upper_bound]
def generate_report(self):
report = {
"summary": self.tracker.get_metrics(),
"phases": {}
}
for phase in self.tracker.get_metrics_by_phase():
analysis = self.analyse_phase(phase)
if analysis:
report["phases"][phase] = analysis
return report
class LatencyTrendDetector:
def __init__(self, window_size=100):
self.window_size = window_size
self.history = defaultdict(list)
def add_measurement(self, phase, latency):
self.history[phase].append({
"timestamp": time.time(),
"latency": latency
})
# Keep only recent measurements
if len(self.history[phase]) > self.window_size:
self.history[phase].pop(0)
def detect_trend(self, phase):
if len(self.history[phase]) < 10:
return "insufficient_data"
latencies = [m["latency"] for m in self.history[phase]]
# Simple linear regression
x = np.arange(len(latencies))
coefficients = np.polyfit(x, latencies, 1)
slope = coefficients[0]
# Determine trend
if abs(slope) < 0.1:
return "stable"
elif slope > 0:
return "increasing"
else:
return "decreasing"
def get_trend_report(self):
return {
phase: {
"trend": self.detect_trend(phase),
"recent_avg": np.mean([m["latency"] for m in self.history[phase][-10:]])
if len(self.history[phase]) >= 10 else None
}
for phase in self.history
}
from prometheus_client import Counter, Histogram, Gauge
import time
class PrometheusLatencyTracker:
def __init__(self):
# Define metrics
self.request_count = Counter(
'praisonai_requests_total',
'Total number of requests',
['agent', 'phase']
)
self.request_duration = Histogram(
'praisonai_request_duration_seconds',
'Request duration in seconds',
['agent', 'phase'],
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0)
)
self.active_requests = Gauge(
'praisonai_active_requests',
'Number of active requests',
['agent']
)
def track_request(self, agent_name, phase):
self.request_count.labels(agent=agent_name, phase=phase).inc()
self.active_requests.labels(agent=agent_name).inc()
start_time = time.time()
def complete():
duration = time.time() - start_time
self.request_duration.labels(
agent=agent_name,
phase=phase
).observe(duration)
self.active_requests.labels(agent=agent_name).dec()
return complete
# Usage with agent
prom_tracker = PrometheusLatencyTracker()
class MonitoredAgent(Agent):
def chat(self, message):
# Track overall request
complete_request = prom_tracker.track_request(self.name, "total")
try:
# Track planning
complete_planning = prom_tracker.track_request(self.name, "planning")
plan = self.plan(message)
complete_planning()
# Track execution
complete_execution = prom_tracker.track_request(self.name, "execution")
result = self.execute(plan)
complete_execution()
return result
finally:
complete_request()
import boto3
from datetime import datetime
class CloudWatchLatencyTracker:
def __init__(self, namespace='PraisonAI'):
self.cloudwatch = boto3.client('cloudwatch')
self.namespace = namespace
self.metrics_buffer = []
def record_latency(self, agent_name, phase, latency_ms):
metric = {
'MetricName': 'Latency',
'Value': latency_ms,
'Unit': 'Milliseconds',
'Timestamp': datetime.utcnow(),
'Dimensions': [
{
'Name': 'Agent',
'Value': agent_name
},
{
'Name': 'Phase',
'Value': phase
}
]
}
self.metrics_buffer.append(metric)
# Batch send when buffer is full
if len(self.metrics_buffer) >= 20:
self.flush_metrics()
def flush_metrics(self):
if self.metrics_buffer:
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=self.metrics_buffer
)
self.metrics_buffer = []
def create_alarm(self, agent_name, threshold_ms=1000):
self.cloudwatch.put_metric_alarm(
AlarmName=f'{agent_name}-HighLatency',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='Latency',
Namespace=self.namespace,
Period=300,
Statistic='Average',
Threshold=threshold_ms,
ActionsEnabled=True,
AlarmDescription=f'Alarm when {agent_name} latency exceeds {threshold_ms}ms',
Dimensions=[
{
'Name': 'Agent',
'Value': agent_name
}
]
)
class PerformanceOptimiser:
def __init__(self, tracker):
self.tracker = tracker
def identify_bottlenecks(self, threshold_percentile=90):
phase_metrics = self.tracker.get_metrics_by_phase()
bottlenecks = []
# Calculate total average time
total_avg = sum(m["avg"] for m in phase_metrics.values())
for phase, metrics in phase_metrics.items():
# Check if phase takes disproportionate time
phase_percentage = (metrics["avg"] / total_avg) * 100
if phase_percentage > threshold_percentile:
bottlenecks.append({
"phase": phase,
"percentage": phase_percentage,
"avg_latency": metrics["avg"],
"recommendation": self._get_recommendation(phase, metrics)
})
return sorted(bottlenecks, key=lambda x: x["percentage"], reverse=True)
def _get_recommendation(self, phase, metrics):
recommendations = {
"llm_generation": "Consider using a faster model or implementing caching",
"tool_execution": "Optimise tool implementations or add parallelisation",
"planning": "Simplify agent instructions or add planning caches",
"memory_search": "Optimise embeddings or implement better indexing"
}
return recommendations.get(phase, "Investigate implementation for optimisation opportunities")
from functools import lru_cache
import hashlib
class CachedLatencyTracker:
def __init__(self, tracker):
self.tracker = tracker
self.cache_hits = 0
self.cache_misses = 0
@lru_cache(maxsize=1000)
def cached_operation(self, operation_key):
# Track cache miss
self.cache_misses += 1
with self.tracker.track("cached_operation"):
result = expensive_operation(operation_key)
return result
def get_with_tracking(self, key):
# Check if in cache
if key in self.cached_operation.cache_info():
self.cache_hits += 1
with self.tracker.track("cache_hit"):
return self.cached_operation(key)
else:
with self.tracker.track("cache_miss"):
return self.cached_operation(key)
def get_cache_statistics(self):
cache_info = self.cached_operation.cache_info()
return {
"hit_rate": self.cache_hits / (self.cache_hits + self.cache_misses)
if (self.cache_hits + self.cache_misses) > 0 else 0,
"hits": self.cache_hits,
"misses": self.cache_misses,
"cache_size": cache_info.currsize,
"max_size": cache_info.maxsize
}
# Track at appropriate granularity
with tracker.track("api_request"):
with tracker.track("api_request.auth"):
authenticate()
with tracker.track("api_request.fetch"):
data = fetch_data()
with tracker.track("api_request.process"):
result = process_data(data)
# Only track in production or when debugging
if os.getenv("ENABLE_LATENCY_TRACKING", "false").lower() == "true":
tracker = LatencyTrackerTool()
agent = TrackedAgent(base_agent, tracker)
else:
agent = base_agent
import random
class SampledTracker:
def __init__(self, base_tracker, sample_rate=0.1):
self.base_tracker = base_tracker
self.sample_rate = sample_rate
def track(self, phase):
if random.random() < self.sample_rate:
return self.base_tracker.track(phase)
else:
# Return no-op context manager
return nullcontext()
import uuid
def trace_request(request_id=None):
request_id = request_id or str(uuid.uuid4())
# Add request ID to all tracking
def track_with_id(phase):
return tracker.track(f"{request_id}.{phase}")
return track_with_id, request_id
# Usage
track, request_id = trace_request()
with track("total"):
with track("phase1"):
do_phase1()
with track("phase2"):
do_phase2()
print(f"Request {request_id} completed")
class PerformanceBudget:
def __init__(self, budgets):
self.budgets = budgets # {"phase": max_ms}
self.violations = []
def check(self, phase, latency_ms):
if phase in self.budgets and latency_ms > self.budgets[phase]:
self.violations.append({
"phase": phase,
"latency": latency_ms,
"budget": self.budgets[phase],
"exceeded_by": latency_ms - self.budgets[phase]
})
return False
return True
def get_violations(self):
return self.violations