Getting Started
Core Concepts
Workflows
Features
- CLI
- AutoAgents
- Image Generation
- Self Reflection Agents
- RAG Agents
- Reasoning Extract Agents
- Reasoning Agents
- Multimodal Agents
- LangChain Agents
- Async Agents
- Mini AI Agents
- Generate Reasoning Data
- Code Agent
- Math Agent
- Structured AI Agents
- Stateful Agents
- Callbacks Agent
- Chat with PDF
- Camera Integration
- Advanced Memory System
- Knowledge Base
- Sessions & Remote Agents
- Approval System
- Guardrails
- Agent Handoffs
Models
Developers
Getting Started (No Code)
Monitoring
Latency Tracking
Monitor and optimise performance with comprehensive latency tracking in PraisonAI
Latency Tracking
Implement comprehensive performance monitoring to track, analyse, and optimise latency across your PraisonAI applications.
Overview
Latency tracking in PraisonAI enables:
- Phase-specific performance monitoring
- Request-level metrics collection
- Thread-safe concurrent tracking
- Integration with monitoring systems
- Performance optimisation insights
Basic Latency Tracking
Using the Latency Tracker Tool
Copy
from praisonaiagents import Agent
from examples.custom_tools import LatencyTrackerTool
# Create latency tracker
latency_tracker = LatencyTrackerTool()
# Create agent with latency tracking
agent = Agent(
name="TrackedAgent",
instructions="Process requests with performance monitoring",
tools=[latency_tracker, other_tools]
)
# Use the agent
response = agent.chat("Analyse this data and generate a report")
# Get metrics
metrics = latency_tracker.get_metrics()
print(f"Total requests: {metrics['total_requests']}")
print(f"Average latency: {metrics['average_latency_ms']}ms")
Direct Phase Tracking
Copy
# Track specific phases
with latency_tracker.track("data_processing"):
# Process data
processed_data = process_large_dataset()
with latency_tracker.track("llm_generation"):
# Generate response
response = agent.chat(processed_data)
# Get phase-specific metrics
phase_metrics = latency_tracker.get_metrics_by_phase()
for phase, stats in phase_metrics.items():
print(f"{phase}: avg={stats['avg']:.2f}ms, max={stats['max']:.2f}ms")
Advanced Tracking Patterns
Context Manager Pattern
Copy
import time
from contextlib import contextmanager
class EnhancedLatencyTracker:
def __init__(self):
self.metrics = {}
self.active_timers = {}
@contextmanager
def track_phase(self, phase_name, metadata=None):
start_time = time.time()
timer_id = f"{phase_name}_{id(start_time)}"
self.active_timers[timer_id] = {
"phase": phase_name,
"start": start_time,
"metadata": metadata or {}
}
try:
yield timer_id
finally:
duration = (time.time() - start_time) * 1000 # ms
if phase_name not in self.metrics:
self.metrics[phase_name] = []
self.metrics[phase_name].append({
"duration": duration,
"timestamp": start_time,
"metadata": self.active_timers[timer_id]["metadata"]
})
del self.active_timers[timer_id]
# Usage
tracker = EnhancedLatencyTracker()
with tracker.track_phase("api_call", {"endpoint": "/analyse"}):
# Make API call
result = make_api_call()
with tracker.track_phase("post_processing", {"size": len(result)}):
# Process results
final_result = process_results(result)
Decorator Pattern
Copy
from functools import wraps
import asyncio
def track_latency(phase_name=None):
def decorator(func):
actual_phase = phase_name or func.__name__
@wraps(func)
def sync_wrapper(*args, **kwargs):
tracker = get_global_tracker() # Get tracker instance
with tracker.track(actual_phase):
return func(*args, **kwargs)
@wraps(func)
async def async_wrapper(*args, **kwargs):
tracker = get_global_tracker()
with tracker.track(actual_phase):
return await func(*args, **kwargs)
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
# Usage
@track_latency("database_query")
def fetch_user_data(user_id):
# Database operation
return db.query(f"SELECT * FROM users WHERE id = {user_id}")
@track_latency()
async def process_request(request):
# Async processing
result = await async_operation(request)
return result
Agent Wrapper Pattern
Copy
class TrackedAgent:
def __init__(self, agent, tracker):
self.agent = agent
self.tracker = tracker
self.request_count = 0
def chat(self, message, **kwargs):
self.request_count += 1
request_id = f"req_{self.request_count}"
# Track overall request
with self.tracker.track(f"request_{request_id}"):
# Track planning phase
with self.tracker.track("planning"):
plan = self.agent.plan(message)
# Track execution phase
with self.tracker.track("execution"):
result = self.agent.execute(plan, **kwargs)
# Track formatting phase
with self.tracker.track("formatting"):
response = self.agent.format_response(result)
return response
def get_performance_report(self):
metrics = self.tracker.get_metrics()
return {
"total_requests": self.request_count,
"average_request_time": metrics.get("avg", 0),
"phase_breakdown": self.tracker.get_metrics_by_phase(),
"slowest_phase": self._identify_slowest_phase()
}
def _identify_slowest_phase(self):
phase_metrics = self.tracker.get_metrics_by_phase()
slowest = max(
phase_metrics.items(),
key=lambda x: x[1]["avg"],
default=(None, {"avg": 0})
)
return slowest[0]
MCP Server Integration
Tracking MCP Requests
Copy
from praisonaiagents import Agent
class MCPLatencyTracker:
def __init__(self):
self.mcp_metrics = {}
def track_mcp_request(self, server_name, tool_name):
key = f"{server_name}.{tool_name}"
if key not in self.mcp_metrics:
self.mcp_metrics[key] = {
"count": 0,
"total_ms": 0,
"errors": 0
}
start_time = time.time()
def record_result(success=True):
duration = (time.time() - start_time) * 1000
self.mcp_metrics[key]["count"] += 1
self.mcp_metrics[key]["total_ms"] += duration
if not success:
self.mcp_metrics[key]["errors"] += 1
return record_result
# Track MCP server calls
tracker = MCPLatencyTracker()
# Example with filesystem MCP
record = tracker.track_mcp_request("filesystem", "read_file")
try:
content = mcp_filesystem.read_file("data.txt")
record(success=True)
except Exception:
record(success=False)
raise
# Get MCP-specific metrics
for key, stats in tracker.mcp_metrics.items():
avg_latency = stats["total_ms"] / stats["count"] if stats["count"] > 0 else 0
error_rate = stats["errors"] / stats["count"] if stats["count"] > 0 else 0
print(f"{key}: {avg_latency:.2f}ms avg, {error_rate:.1%} error rate")
Metrics Analysis
Statistical Analysis
Copy
import numpy as np
from collections import defaultdict
class LatencyAnalyser:
def __init__(self, tracker):
self.tracker = tracker
def analyse_phase(self, phase_name):
metrics = self.tracker.get_metrics_by_phase().get(phase_name, {})
if not metrics.get("count"):
return None
# Get all latency values
latencies = metrics.get("all_values", [])
if not latencies:
return None
return {
"mean": np.mean(latencies),
"median": np.median(latencies),
"std_dev": np.std(latencies),
"percentiles": {
"p50": np.percentile(latencies, 50),
"p90": np.percentile(latencies, 90),
"p95": np.percentile(latencies, 95),
"p99": np.percentile(latencies, 99)
},
"outliers": self._detect_outliers(latencies)
}
def _detect_outliers(self, values):
q1 = np.percentile(values, 25)
q3 = np.percentile(values, 75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
return [v for v in values if v < lower_bound or v > upper_bound]
def generate_report(self):
report = {
"summary": self.tracker.get_metrics(),
"phases": {}
}
for phase in self.tracker.get_metrics_by_phase():
analysis = self.analyse_phase(phase)
if analysis:
report["phases"][phase] = analysis
return report
Trend Detection
Copy
class LatencyTrendDetector:
def __init__(self, window_size=100):
self.window_size = window_size
self.history = defaultdict(list)
def add_measurement(self, phase, latency):
self.history[phase].append({
"timestamp": time.time(),
"latency": latency
})
# Keep only recent measurements
if len(self.history[phase]) > self.window_size:
self.history[phase].pop(0)
def detect_trend(self, phase):
if len(self.history[phase]) < 10:
return "insufficient_data"
latencies = [m["latency"] for m in self.history[phase]]
# Simple linear regression
x = np.arange(len(latencies))
coefficients = np.polyfit(x, latencies, 1)
slope = coefficients[0]
# Determine trend
if abs(slope) < 0.1:
return "stable"
elif slope > 0:
return "increasing"
else:
return "decreasing"
def get_trend_report(self):
return {
phase: {
"trend": self.detect_trend(phase),
"recent_avg": np.mean([m["latency"] for m in self.history[phase][-10:]])
if len(self.history[phase]) >= 10 else None
}
for phase in self.history
}
Integration with Monitoring Systems
Prometheus Integration
Copy
from prometheus_client import Counter, Histogram, Gauge
import time
class PrometheusLatencyTracker:
def __init__(self):
# Define metrics
self.request_count = Counter(
'praisonai_requests_total',
'Total number of requests',
['agent', 'phase']
)
self.request_duration = Histogram(
'praisonai_request_duration_seconds',
'Request duration in seconds',
['agent', 'phase'],
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0)
)
self.active_requests = Gauge(
'praisonai_active_requests',
'Number of active requests',
['agent']
)
def track_request(self, agent_name, phase):
self.request_count.labels(agent=agent_name, phase=phase).inc()
self.active_requests.labels(agent=agent_name).inc()
start_time = time.time()
def complete():
duration = time.time() - start_time
self.request_duration.labels(
agent=agent_name,
phase=phase
).observe(duration)
self.active_requests.labels(agent=agent_name).dec()
return complete
# Usage with agent
prom_tracker = PrometheusLatencyTracker()
class MonitoredAgent(Agent):
def chat(self, message):
# Track overall request
complete_request = prom_tracker.track_request(self.name, "total")
try:
# Track planning
complete_planning = prom_tracker.track_request(self.name, "planning")
plan = self.plan(message)
complete_planning()
# Track execution
complete_execution = prom_tracker.track_request(self.name, "execution")
result = self.execute(plan)
complete_execution()
return result
finally:
complete_request()
CloudWatch Integration
Copy
import boto3
from datetime import datetime
class CloudWatchLatencyTracker:
def __init__(self, namespace='PraisonAI'):
self.cloudwatch = boto3.client('cloudwatch')
self.namespace = namespace
self.metrics_buffer = []
def record_latency(self, agent_name, phase, latency_ms):
metric = {
'MetricName': 'Latency',
'Value': latency_ms,
'Unit': 'Milliseconds',
'Timestamp': datetime.utcnow(),
'Dimensions': [
{
'Name': 'Agent',
'Value': agent_name
},
{
'Name': 'Phase',
'Value': phase
}
]
}
self.metrics_buffer.append(metric)
# Batch send when buffer is full
if len(self.metrics_buffer) >= 20:
self.flush_metrics()
def flush_metrics(self):
if self.metrics_buffer:
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=self.metrics_buffer
)
self.metrics_buffer = []
def create_alarm(self, agent_name, threshold_ms=1000):
self.cloudwatch.put_metric_alarm(
AlarmName=f'{agent_name}-HighLatency',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='Latency',
Namespace=self.namespace,
Period=300,
Statistic='Average',
Threshold=threshold_ms,
ActionsEnabled=True,
AlarmDescription=f'Alarm when {agent_name} latency exceeds {threshold_ms}ms',
Dimensions=[
{
'Name': 'Agent',
'Value': agent_name
}
]
)
Performance Optimisation
Identifying Bottlenecks
Copy
class PerformanceOptimiser:
def __init__(self, tracker):
self.tracker = tracker
def identify_bottlenecks(self, threshold_percentile=90):
phase_metrics = self.tracker.get_metrics_by_phase()
bottlenecks = []
# Calculate total average time
total_avg = sum(m["avg"] for m in phase_metrics.values())
for phase, metrics in phase_metrics.items():
# Check if phase takes disproportionate time
phase_percentage = (metrics["avg"] / total_avg) * 100
if phase_percentage > threshold_percentile:
bottlenecks.append({
"phase": phase,
"percentage": phase_percentage,
"avg_latency": metrics["avg"],
"recommendation": self._get_recommendation(phase, metrics)
})
return sorted(bottlenecks, key=lambda x: x["percentage"], reverse=True)
def _get_recommendation(self, phase, metrics):
recommendations = {
"llm_generation": "Consider using a faster model or implementing caching",
"tool_execution": "Optimise tool implementations or add parallelisation",
"planning": "Simplify agent instructions or add planning caches",
"memory_search": "Optimise embeddings or implement better indexing"
}
return recommendations.get(phase, "Investigate implementation for optimisation opportunities")
Caching Strategy
Copy
from functools import lru_cache
import hashlib
class CachedLatencyTracker:
def __init__(self, tracker):
self.tracker = tracker
self.cache_hits = 0
self.cache_misses = 0
@lru_cache(maxsize=1000)
def cached_operation(self, operation_key):
# Track cache miss
self.cache_misses += 1
with self.tracker.track("cached_operation"):
result = expensive_operation(operation_key)
return result
def get_with_tracking(self, key):
# Check if in cache
if key in self.cached_operation.cache_info():
self.cache_hits += 1
with self.tracker.track("cache_hit"):
return self.cached_operation(key)
else:
with self.tracker.track("cache_miss"):
return self.cached_operation(key)
def get_cache_statistics(self):
cache_info = self.cached_operation.cache_info()
return {
"hit_rate": self.cache_hits / (self.cache_hits + self.cache_misses)
if (self.cache_hits + self.cache_misses) > 0 else 0,
"hits": self.cache_hits,
"misses": self.cache_misses,
"cache_size": cache_info.currsize,
"max_size": cache_info.maxsize
}
Best Practices
1. Granular Tracking
Copy
# Track at appropriate granularity
with tracker.track("api_request"):
with tracker.track("api_request.auth"):
authenticate()
with tracker.track("api_request.fetch"):
data = fetch_data()
with tracker.track("api_request.process"):
result = process_data(data)
2. Conditional Tracking
Copy
# Only track in production or when debugging
if os.getenv("ENABLE_LATENCY_TRACKING", "false").lower() == "true":
tracker = LatencyTrackerTool()
agent = TrackedAgent(base_agent, tracker)
else:
agent = base_agent
3. Sampling
Copy
import random
class SampledTracker:
def __init__(self, base_tracker, sample_rate=0.1):
self.base_tracker = base_tracker
self.sample_rate = sample_rate
def track(self, phase):
if random.random() < self.sample_rate:
return self.base_tracker.track(phase)
else:
# Return no-op context manager
return nullcontext()
Common Patterns
Request Tracing
Copy
import uuid
def trace_request(request_id=None):
request_id = request_id or str(uuid.uuid4())
# Add request ID to all tracking
def track_with_id(phase):
return tracker.track(f"{request_id}.{phase}")
return track_with_id, request_id
# Usage
track, request_id = trace_request()
with track("total"):
with track("phase1"):
do_phase1()
with track("phase2"):
do_phase2()
print(f"Request {request_id} completed")
Performance Budgets
Copy
class PerformanceBudget:
def __init__(self, budgets):
self.budgets = budgets # {"phase": max_ms}
self.violations = []
def check(self, phase, latency_ms):
if phase in self.budgets and latency_ms > self.budgets[phase]:
self.violations.append({
"phase": phase,
"latency": latency_ms,
"budget": self.budgets[phase],
"exceeded_by": latency_ms - self.budgets[phase]
})
return False
return True
def get_violations(self):
return self.violations
Troubleshooting
High Latency Issues
- Check phase breakdown to identify slow components
- Look for outliers that skew averages
- Monitor trends to detect degradation
- Review concurrent request handling
Memory Leaks
- Limit metric history size
- Implement cleanup for old metrics
- Use weak references where appropriate
Threading Issues
- Use thread-safe data structures
- Implement proper locking for shared state
- Test with concurrent workloads
On this page
- Latency Tracking
- Overview
- Basic Latency Tracking
- Using the Latency Tracker Tool
- Direct Phase Tracking
- Advanced Tracking Patterns
- Context Manager Pattern
- Decorator Pattern
- Agent Wrapper Pattern
- MCP Server Integration
- Tracking MCP Requests
- Metrics Analysis
- Statistical Analysis
- Trend Detection
- Integration with Monitoring Systems
- Prometheus Integration
- CloudWatch Integration
- Performance Optimisation
- Identifying Bottlenecks
- Caching Strategy
- Best Practices
- 1. Granular Tracking
- 2. Conditional Tracking
- 3. Sampling
- Common Patterns
- Request Tracing
- Performance Budgets
- Troubleshooting
- High Latency Issues
- Memory Leaks
- Threading Issues
Assistant
Responses are generated using AI and may contain mistakes.