Latency Tracking

Implement comprehensive performance monitoring to track, analyse, and optimise latency across your PraisonAI applications.

Overview

Latency tracking in PraisonAI enables:

  • Phase-specific performance monitoring
  • Request-level metrics collection
  • Thread-safe concurrent tracking
  • Integration with monitoring systems
  • Performance optimisation insights

Basic Latency Tracking

Using the Latency Tracker Tool

from praisonaiagents import Agent
from examples.custom_tools import LatencyTrackerTool

# Create latency tracker
latency_tracker = LatencyTrackerTool()

# Create agent with latency tracking
agent = Agent(
    name="TrackedAgent",
    instructions="Process requests with performance monitoring",
    tools=[latency_tracker, other_tools]
)

# Use the agent
response = agent.chat("Analyse this data and generate a report")

# Get metrics
metrics = latency_tracker.get_metrics()
print(f"Total requests: {metrics['total_requests']}")
print(f"Average latency: {metrics['average_latency_ms']}ms")

Direct Phase Tracking

# Track specific phases
with latency_tracker.track("data_processing"):
    # Process data
    processed_data = process_large_dataset()

with latency_tracker.track("llm_generation"):
    # Generate response
    response = agent.chat(processed_data)

# Get phase-specific metrics
phase_metrics = latency_tracker.get_metrics_by_phase()
for phase, stats in phase_metrics.items():
    print(f"{phase}: avg={stats['avg']:.2f}ms, max={stats['max']:.2f}ms")

Advanced Tracking Patterns

Context Manager Pattern

import time
from contextlib import contextmanager

class EnhancedLatencyTracker:
    def __init__(self):
        self.metrics = {}
        self.active_timers = {}
    
    @contextmanager
    def track_phase(self, phase_name, metadata=None):
        start_time = time.time()
        timer_id = f"{phase_name}_{id(start_time)}"
        
        self.active_timers[timer_id] = {
            "phase": phase_name,
            "start": start_time,
            "metadata": metadata or {}
        }
        
        try:
            yield timer_id
        finally:
            duration = (time.time() - start_time) * 1000  # ms
            
            if phase_name not in self.metrics:
                self.metrics[phase_name] = []
            
            self.metrics[phase_name].append({
                "duration": duration,
                "timestamp": start_time,
                "metadata": self.active_timers[timer_id]["metadata"]
            })
            
            del self.active_timers[timer_id]

# Usage
tracker = EnhancedLatencyTracker()

with tracker.track_phase("api_call", {"endpoint": "/analyse"}):
    # Make API call
    result = make_api_call()

with tracker.track_phase("post_processing", {"size": len(result)}):
    # Process results
    final_result = process_results(result)

Decorator Pattern

from functools import wraps
import asyncio

def track_latency(phase_name=None):
    def decorator(func):
        actual_phase = phase_name or func.__name__
        
        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            tracker = get_global_tracker()  # Get tracker instance
            
            with tracker.track(actual_phase):
                return func(*args, **kwargs)
        
        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            tracker = get_global_tracker()
            
            with tracker.track(actual_phase):
                return await func(*args, **kwargs)
        
        if asyncio.iscoroutinefunction(func):
            return async_wrapper
        return sync_wrapper
    
    return decorator

# Usage
@track_latency("database_query")
def fetch_user_data(user_id):
    # Database operation
    return db.query(f"SELECT * FROM users WHERE id = {user_id}")

@track_latency()
async def process_request(request):
    # Async processing
    result = await async_operation(request)
    return result

Agent Wrapper Pattern

class TrackedAgent:
    def __init__(self, agent, tracker):
        self.agent = agent
        self.tracker = tracker
        self.request_count = 0
    
    def chat(self, message, **kwargs):
        self.request_count += 1
        request_id = f"req_{self.request_count}"
        
        # Track overall request
        with self.tracker.track(f"request_{request_id}"):
            
            # Track planning phase
            with self.tracker.track("planning"):
                plan = self.agent.plan(message)
            
            # Track execution phase
            with self.tracker.track("execution"):
                result = self.agent.execute(plan, **kwargs)
            
            # Track formatting phase
            with self.tracker.track("formatting"):
                response = self.agent.format_response(result)
        
        return response
    
    def get_performance_report(self):
        metrics = self.tracker.get_metrics()
        
        return {
            "total_requests": self.request_count,
            "average_request_time": metrics.get("avg", 0),
            "phase_breakdown": self.tracker.get_metrics_by_phase(),
            "slowest_phase": self._identify_slowest_phase()
        }
    
    def _identify_slowest_phase(self):
        phase_metrics = self.tracker.get_metrics_by_phase()
        
        slowest = max(
            phase_metrics.items(),
            key=lambda x: x[1]["avg"],
            default=(None, {"avg": 0})
        )
        
        return slowest[0]

MCP Server Integration

Tracking MCP Requests

from praisonaiagents import Agent

class MCPLatencyTracker:
    def __init__(self):
        self.mcp_metrics = {}
    
    def track_mcp_request(self, server_name, tool_name):
        key = f"{server_name}.{tool_name}"
        
        if key not in self.mcp_metrics:
            self.mcp_metrics[key] = {
                "count": 0,
                "total_ms": 0,
                "errors": 0
            }
        
        start_time = time.time()
        
        def record_result(success=True):
            duration = (time.time() - start_time) * 1000
            self.mcp_metrics[key]["count"] += 1
            self.mcp_metrics[key]["total_ms"] += duration
            
            if not success:
                self.mcp_metrics[key]["errors"] += 1
        
        return record_result

# Track MCP server calls
tracker = MCPLatencyTracker()

# Example with filesystem MCP
record = tracker.track_mcp_request("filesystem", "read_file")
try:
    content = mcp_filesystem.read_file("data.txt")
    record(success=True)
except Exception:
    record(success=False)
    raise

# Get MCP-specific metrics
for key, stats in tracker.mcp_metrics.items():
    avg_latency = stats["total_ms"] / stats["count"] if stats["count"] > 0 else 0
    error_rate = stats["errors"] / stats["count"] if stats["count"] > 0 else 0
    
    print(f"{key}: {avg_latency:.2f}ms avg, {error_rate:.1%} error rate")

Metrics Analysis

Statistical Analysis

import numpy as np
from collections import defaultdict

class LatencyAnalyser:
    def __init__(self, tracker):
        self.tracker = tracker
    
    def analyse_phase(self, phase_name):
        metrics = self.tracker.get_metrics_by_phase().get(phase_name, {})
        
        if not metrics.get("count"):
            return None
        
        # Get all latency values
        latencies = metrics.get("all_values", [])
        
        if not latencies:
            return None
        
        return {
            "mean": np.mean(latencies),
            "median": np.median(latencies),
            "std_dev": np.std(latencies),
            "percentiles": {
                "p50": np.percentile(latencies, 50),
                "p90": np.percentile(latencies, 90),
                "p95": np.percentile(latencies, 95),
                "p99": np.percentile(latencies, 99)
            },
            "outliers": self._detect_outliers(latencies)
        }
    
    def _detect_outliers(self, values):
        q1 = np.percentile(values, 25)
        q3 = np.percentile(values, 75)
        iqr = q3 - q1
        
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        
        return [v for v in values if v < lower_bound or v > upper_bound]
    
    def generate_report(self):
        report = {
            "summary": self.tracker.get_metrics(),
            "phases": {}
        }
        
        for phase in self.tracker.get_metrics_by_phase():
            analysis = self.analyse_phase(phase)
            if analysis:
                report["phases"][phase] = analysis
        
        return report

Trend Detection

class LatencyTrendDetector:
    def __init__(self, window_size=100):
        self.window_size = window_size
        self.history = defaultdict(list)
    
    def add_measurement(self, phase, latency):
        self.history[phase].append({
            "timestamp": time.time(),
            "latency": latency
        })
        
        # Keep only recent measurements
        if len(self.history[phase]) > self.window_size:
            self.history[phase].pop(0)
    
    def detect_trend(self, phase):
        if len(self.history[phase]) < 10:
            return "insufficient_data"
        
        latencies = [m["latency"] for m in self.history[phase]]
        
        # Simple linear regression
        x = np.arange(len(latencies))
        coefficients = np.polyfit(x, latencies, 1)
        slope = coefficients[0]
        
        # Determine trend
        if abs(slope) < 0.1:
            return "stable"
        elif slope > 0:
            return "increasing"
        else:
            return "decreasing"
    
    def get_trend_report(self):
        return {
            phase: {
                "trend": self.detect_trend(phase),
                "recent_avg": np.mean([m["latency"] for m in self.history[phase][-10:]])
                if len(self.history[phase]) >= 10 else None
            }
            for phase in self.history
        }

Integration with Monitoring Systems

Prometheus Integration

from prometheus_client import Counter, Histogram, Gauge
import time

class PrometheusLatencyTracker:
    def __init__(self):
        # Define metrics
        self.request_count = Counter(
            'praisonai_requests_total',
            'Total number of requests',
            ['agent', 'phase']
        )
        
        self.request_duration = Histogram(
            'praisonai_request_duration_seconds',
            'Request duration in seconds',
            ['agent', 'phase'],
            buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0)
        )
        
        self.active_requests = Gauge(
            'praisonai_active_requests',
            'Number of active requests',
            ['agent']
        )
    
    def track_request(self, agent_name, phase):
        self.request_count.labels(agent=agent_name, phase=phase).inc()
        self.active_requests.labels(agent=agent_name).inc()
        
        start_time = time.time()
        
        def complete():
            duration = time.time() - start_time
            self.request_duration.labels(
                agent=agent_name,
                phase=phase
            ).observe(duration)
            self.active_requests.labels(agent=agent_name).dec()
        
        return complete

# Usage with agent
prom_tracker = PrometheusLatencyTracker()

class MonitoredAgent(Agent):
    def chat(self, message):
        # Track overall request
        complete_request = prom_tracker.track_request(self.name, "total")
        
        try:
            # Track planning
            complete_planning = prom_tracker.track_request(self.name, "planning")
            plan = self.plan(message)
            complete_planning()
            
            # Track execution
            complete_execution = prom_tracker.track_request(self.name, "execution")
            result = self.execute(plan)
            complete_execution()
            
            return result
        finally:
            complete_request()

CloudWatch Integration

import boto3
from datetime import datetime

class CloudWatchLatencyTracker:
    def __init__(self, namespace='PraisonAI'):
        self.cloudwatch = boto3.client('cloudwatch')
        self.namespace = namespace
        self.metrics_buffer = []
    
    def record_latency(self, agent_name, phase, latency_ms):
        metric = {
            'MetricName': 'Latency',
            'Value': latency_ms,
            'Unit': 'Milliseconds',
            'Timestamp': datetime.utcnow(),
            'Dimensions': [
                {
                    'Name': 'Agent',
                    'Value': agent_name
                },
                {
                    'Name': 'Phase',
                    'Value': phase
                }
            ]
        }
        
        self.metrics_buffer.append(metric)
        
        # Batch send when buffer is full
        if len(self.metrics_buffer) >= 20:
            self.flush_metrics()
    
    def flush_metrics(self):
        if self.metrics_buffer:
            self.cloudwatch.put_metric_data(
                Namespace=self.namespace,
                MetricData=self.metrics_buffer
            )
            self.metrics_buffer = []
    
    def create_alarm(self, agent_name, threshold_ms=1000):
        self.cloudwatch.put_metric_alarm(
            AlarmName=f'{agent_name}-HighLatency',
            ComparisonOperator='GreaterThanThreshold',
            EvaluationPeriods=2,
            MetricName='Latency',
            Namespace=self.namespace,
            Period=300,
            Statistic='Average',
            Threshold=threshold_ms,
            ActionsEnabled=True,
            AlarmDescription=f'Alarm when {agent_name} latency exceeds {threshold_ms}ms',
            Dimensions=[
                {
                    'Name': 'Agent',
                    'Value': agent_name
                }
            ]
        )

Performance Optimisation

Identifying Bottlenecks

class PerformanceOptimiser:
    def __init__(self, tracker):
        self.tracker = tracker
    
    def identify_bottlenecks(self, threshold_percentile=90):
        phase_metrics = self.tracker.get_metrics_by_phase()
        bottlenecks = []
        
        # Calculate total average time
        total_avg = sum(m["avg"] for m in phase_metrics.values())
        
        for phase, metrics in phase_metrics.items():
            # Check if phase takes disproportionate time
            phase_percentage = (metrics["avg"] / total_avg) * 100
            
            if phase_percentage > threshold_percentile:
                bottlenecks.append({
                    "phase": phase,
                    "percentage": phase_percentage,
                    "avg_latency": metrics["avg"],
                    "recommendation": self._get_recommendation(phase, metrics)
                })
        
        return sorted(bottlenecks, key=lambda x: x["percentage"], reverse=True)
    
    def _get_recommendation(self, phase, metrics):
        recommendations = {
            "llm_generation": "Consider using a faster model or implementing caching",
            "tool_execution": "Optimise tool implementations or add parallelisation",
            "planning": "Simplify agent instructions or add planning caches",
            "memory_search": "Optimise embeddings or implement better indexing"
        }
        
        return recommendations.get(phase, "Investigate implementation for optimisation opportunities")

Caching Strategy

from functools import lru_cache
import hashlib

class CachedLatencyTracker:
    def __init__(self, tracker):
        self.tracker = tracker
        self.cache_hits = 0
        self.cache_misses = 0
    
    @lru_cache(maxsize=1000)
    def cached_operation(self, operation_key):
        # Track cache miss
        self.cache_misses += 1
        
        with self.tracker.track("cached_operation"):
            result = expensive_operation(operation_key)
        
        return result
    
    def get_with_tracking(self, key):
        # Check if in cache
        if key in self.cached_operation.cache_info():
            self.cache_hits += 1
            with self.tracker.track("cache_hit"):
                return self.cached_operation(key)
        else:
            with self.tracker.track("cache_miss"):
                return self.cached_operation(key)
    
    def get_cache_statistics(self):
        cache_info = self.cached_operation.cache_info()
        
        return {
            "hit_rate": self.cache_hits / (self.cache_hits + self.cache_misses)
            if (self.cache_hits + self.cache_misses) > 0 else 0,
            "hits": self.cache_hits,
            "misses": self.cache_misses,
            "cache_size": cache_info.currsize,
            "max_size": cache_info.maxsize
        }

Best Practices

1. Granular Tracking

# Track at appropriate granularity
with tracker.track("api_request"):
    with tracker.track("api_request.auth"):
        authenticate()
    
    with tracker.track("api_request.fetch"):
        data = fetch_data()
    
    with tracker.track("api_request.process"):
        result = process_data(data)

2. Conditional Tracking

# Only track in production or when debugging
if os.getenv("ENABLE_LATENCY_TRACKING", "false").lower() == "true":
    tracker = LatencyTrackerTool()
    agent = TrackedAgent(base_agent, tracker)
else:
    agent = base_agent

3. Sampling

import random

class SampledTracker:
    def __init__(self, base_tracker, sample_rate=0.1):
        self.base_tracker = base_tracker
        self.sample_rate = sample_rate
    
    def track(self, phase):
        if random.random() < self.sample_rate:
            return self.base_tracker.track(phase)
        else:
            # Return no-op context manager
            return nullcontext()

Common Patterns

Request Tracing

import uuid

def trace_request(request_id=None):
    request_id = request_id or str(uuid.uuid4())
    
    # Add request ID to all tracking
    def track_with_id(phase):
        return tracker.track(f"{request_id}.{phase}")
    
    return track_with_id, request_id

# Usage
track, request_id = trace_request()

with track("total"):
    with track("phase1"):
        do_phase1()
    
    with track("phase2"):
        do_phase2()

print(f"Request {request_id} completed")

Performance Budgets

class PerformanceBudget:
    def __init__(self, budgets):
        self.budgets = budgets  # {"phase": max_ms}
        self.violations = []
    
    def check(self, phase, latency_ms):
        if phase in self.budgets and latency_ms > self.budgets[phase]:
            self.violations.append({
                "phase": phase,
                "latency": latency_ms,
                "budget": self.budgets[phase],
                "exceeded_by": latency_ms - self.budgets[phase]
            })
            return False
        return True
    
    def get_violations(self):
        return self.violations

Troubleshooting

High Latency Issues

  1. Check phase breakdown to identify slow components
  2. Look for outliers that skew averages
  3. Monitor trends to detect degradation
  4. Review concurrent request handling

Memory Leaks

  1. Limit metric history size
  2. Implement cleanup for old metrics
  3. Use weak references where appropriate

Threading Issues

  1. Use thread-safe data structures
  2. Implement proper locking for shared state
  3. Test with concurrent workloads