Debugging Multi-Agent Systems
Debugging multi-agent systems presents unique challenges due to their distributed nature, asynchronous operations, and complex interactions. This guide provides strategies and tools for effective debugging.Debugging Challenges
Common Issues in Multi-Agent Systems
- Race Conditions: Timing-dependent bugs
- State Inconsistencies: Agents having different views of shared state
- Communication Failures: Lost or corrupted messages between agents
- Cascading Failures: One agent’s failure affecting others
- Non-Deterministic Behavior: Different outcomes from same inputs
Debugging Infrastructure
1. Comprehensive Logging System
Implement structured logging across all agents:Copy
import logging
import json
from datetime import datetime
from typing import Dict, Any, Optional
import traceback
from contextlib import contextmanager
class MultiAgentDebugLogger:
def __init__(self, log_level: str = "DEBUG"):
self.loggers = {}
self.correlation_ids = {}
self.log_level = getattr(logging, log_level.upper())
# Configure root logger
logging.basicConfig(
level=self.log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def get_logger(self, agent_name: str) -> logging.Logger:
"""Get or create logger for an agent"""
if agent_name not in self.loggers:
logger = logging.getLogger(f"agent.{agent_name}")
logger.setLevel(self.log_level)
# Add custom handler for structured logging
handler = StructuredLogHandler()
logger.addHandler(handler)
self.loggers[agent_name] = logger
return self.loggers[agent_name]
@contextmanager
def correlation_context(self, correlation_id: str):
"""Context manager for correlation ID"""
import threading
thread_id = threading.current_thread().ident
self.correlation_ids[thread_id] = correlation_id
try:
yield
finally:
if thread_id in self.correlation_ids:
del self.correlation_ids[thread_id]
def log_agent_event(self, agent_name: str, event_type: str,
data: Dict[str, Any], level: str = "INFO"):
"""Log a structured agent event"""
logger = self.get_logger(agent_name)
# Get correlation ID if available
import threading
thread_id = threading.current_thread().ident
correlation_id = self.correlation_ids.get(thread_id)
event = {
"timestamp": datetime.utcnow().isoformat(),
"agent": agent_name,
"event_type": event_type,
"correlation_id": correlation_id,
"data": data
}
log_method = getattr(logger, level.lower())
log_method(json.dumps(event))
def log_agent_interaction(self, from_agent: str, to_agent: str,
message_type: str, content: Any):
"""Log interaction between agents"""
interaction = {
"from": from_agent,
"to": to_agent,
"message_type": message_type,
"content": str(content)[:1000] # Truncate large messages
}
self.log_agent_event(
from_agent,
"agent_interaction",
interaction
)
class StructuredLogHandler(logging.Handler):
"""Custom handler for structured logging"""
def emit(self, record):
try:
# Parse JSON if message is JSON
try:
log_data = json.loads(record.getMessage())
except:
log_data = {"message": record.getMessage()}
# Add metadata
log_data.update({
"level": record.levelname,
"logger": record.name,
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
"thread": record.thread,
"process": record.process
})
# Add exception info if present
if record.exc_info:
log_data["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info)
}
# Output formatted log
print(json.dumps(log_data, indent=2))
except Exception:
self.handleError(record)
2. Distributed Tracing
Implement tracing across agent interactions:Copy
import uuid
from dataclasses import dataclass, field
from typing import List, Optional
import time
@dataclass
class Span:
span_id: str
parent_id: Optional[str]
trace_id: str
operation: str
agent_name: str
start_time: float
end_time: Optional[float] = None
tags: Dict[str, Any] = field(default_factory=dict)
logs: List[Dict[str, Any]] = field(default_factory=list)
status: str = "in_progress"
def finish(self, status: str = "success"):
"""Finish the span"""
self.end_time = time.time()
self.status = status
def add_tag(self, key: str, value: Any):
"""Add a tag to the span"""
self.tags[key] = value
def log(self, message: str, **kwargs):
"""Add a log entry to the span"""
self.logs.append({
"timestamp": time.time(),
"message": message,
**kwargs
})
class DistributedTracer:
def __init__(self):
self.traces = {}
self.active_spans = {}
self._lock = threading.RLock()
def start_trace(self, operation: str, agent_name: str) -> Span:
"""Start a new trace"""
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
span = Span(
span_id=span_id,
parent_id=None,
trace_id=trace_id,
operation=operation,
agent_name=agent_name,
start_time=time.time()
)
with self._lock:
self.traces[trace_id] = [span]
self.active_spans[span_id] = span
return span
def start_span(self, parent_span: Span, operation: str,
agent_name: str) -> Span:
"""Start a child span"""
span_id = str(uuid.uuid4())
span = Span(
span_id=span_id,
parent_id=parent_span.span_id,
trace_id=parent_span.trace_id,
operation=operation,
agent_name=agent_name,
start_time=time.time()
)
with self._lock:
self.traces[parent_span.trace_id].append(span)
self.active_spans[span_id] = span
return span
@contextmanager
def span(self, operation: str, agent_name: str, parent_span: Optional[Span] = None):
"""Context manager for spans"""
if parent_span:
span = self.start_span(parent_span, operation, agent_name)
else:
span = self.start_trace(operation, agent_name)
try:
yield span
span.finish("success")
except Exception as e:
span.log(f"Error: {str(e)}", error_type=type(e).__name__)
span.finish("error")
raise
finally:
with self._lock:
if span.span_id in self.active_spans:
del self.active_spans[span.span_id]
def get_trace(self, trace_id: str) -> List[Span]:
"""Get all spans for a trace"""
with self._lock:
return self.traces.get(trace_id, [])
def visualize_trace(self, trace_id: str) -> str:
"""Generate a visual representation of the trace"""
spans = self.get_trace(trace_id)
if not spans:
return "Trace not found"
# Sort spans by start time
spans.sort(key=lambda s: s.start_time)
# Build visualization
lines = [f"Trace ID: {trace_id}\n"]
# Create a mapping of span_id to children
children = {}
for span in spans:
if span.parent_id:
if span.parent_id not in children:
children[span.parent_id] = []
children[span.parent_id].append(span)
# Recursively print spans
def print_span(span: Span, indent: int = 0):
duration = (span.end_time or time.time()) - span.start_time
status_symbol = "✓" if span.status == "success" else "✗"
line = f"{' ' * indent}{status_symbol} {span.agent_name}: {span.operation} ({duration:.3f}s)"
if span.tags:
line += f" tags={span.tags}"
lines.append(line)
# Print children
for child in children.get(span.span_id, []):
print_span(child, indent + 1)
# Print root spans
for span in spans:
if span.parent_id is None:
print_span(span)
return "\n".join(lines)
3. State Inspection Tools
Tools for inspecting agent and system state:Copy
class AgentStateInspector:
def __init__(self):
self.snapshots = {}
self.state_history = defaultdict(list)
def capture_state(self, agent_name: str, state: Dict[str, Any],
timestamp: Optional[float] = None):
"""Capture agent state snapshot"""
if timestamp is None:
timestamp = time.time()
snapshot = {
"timestamp": timestamp,
"state": self._deep_copy_state(state)
}
self.snapshots[agent_name] = snapshot
self.state_history[agent_name].append(snapshot)
def _deep_copy_state(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""Deep copy state, handling non-serializable objects"""
import copy
try:
return copy.deepcopy(state)
except:
# Fallback for non-copyable objects
copied = {}
for key, value in state.items():
try:
copied[key] = copy.deepcopy(value)
except:
copied[key] = f"<{type(value).__name__} object>"
return copied
def compare_states(self, agent_name: str, time1: float, time2: float) -> Dict[str, Any]:
"""Compare agent states at two different times"""
history = self.state_history[agent_name]
# Find closest snapshots to requested times
snapshot1 = min(history, key=lambda s: abs(s["timestamp"] - time1))
snapshot2 = min(history, key=lambda s: abs(s["timestamp"] - time2))
return self._diff_states(snapshot1["state"], snapshot2["state"])
def _diff_states(self, state1: Dict[str, Any], state2: Dict[str, Any]) -> Dict[str, Any]:
"""Compute difference between two states"""
diff = {
"added": {},
"removed": {},
"changed": {}
}
all_keys = set(state1.keys()) | set(state2.keys())
for key in all_keys:
if key not in state1:
diff["added"][key] = state2[key]
elif key not in state2:
diff["removed"][key] = state1[key]
elif state1[key] != state2[key]:
diff["changed"][key] = {
"from": state1[key],
"to": state2[key]
}
return diff
def get_state_timeline(self, agent_name: str,
key_path: str) -> List[Tuple[float, Any]]:
"""Get timeline of changes for a specific state key"""
timeline = []
for snapshot in self.state_history[agent_name]:
value = self._get_nested_value(snapshot["state"], key_path)
if value is not None:
timeline.append((snapshot["timestamp"], value))
return timeline
def _get_nested_value(self, state: Dict[str, Any], key_path: str) -> Any:
"""Get nested value using dot notation"""
keys = key_path.split('.')
value = state
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return None
return value
4. Debug Command Interface
Interactive debugging interface:Copy
import cmd
import pprint
class AgentDebugger(cmd.Cmd):
intro = "Multi-Agent System Debugger. Type help or ? to list commands."
prompt = "(debug) "
def __init__(self, agent_system):
super().__init__()
self.agent_system = agent_system
self.tracer = DistributedTracer()
self.inspector = AgentStateInspector()
self.breakpoints = set()
self.watch_expressions = {}
def do_agents(self, arg):
"""List all agents in the system"""
agents = self.agent_system.get_all_agents()
for agent in agents:
status = "active" if agent.is_active else "inactive"
print(f"{agent.name} ({status})")
def do_state(self, arg):
"""Show state of an agent: state <agent_name>"""
if not arg:
print("Usage: state <agent_name>")
return
agent = self.agent_system.get_agent(arg)
if not agent:
print(f"Agent '{arg}' not found")
return
state = agent.get_state()
pprint.pprint(state)
def do_trace(self, arg):
"""Start tracing: trace <on|off>"""
if arg == "on":
self.agent_system.enable_tracing(self.tracer)
print("Tracing enabled")
elif arg == "off":
self.agent_system.disable_tracing()
print("Tracing disabled")
else:
print("Usage: trace <on|off>")
def do_break(self, arg):
"""Set breakpoint: break <agent_name>.<method_name>"""
if not arg:
print("Usage: break <agent_name>.<method_name>")
return
self.breakpoints.add(arg)
print(f"Breakpoint set at {arg}")
def do_watch(self, arg):
"""Watch expression: watch <expression>"""
if not arg:
print("Usage: watch <expression>")
return
watch_id = len(self.watch_expressions) + 1
self.watch_expressions[watch_id] = arg
print(f"Watch {watch_id}: {arg}")
def do_step(self, arg):
"""Step through execution"""
self.agent_system.step()
self._check_watches()
def do_continue(self, arg):
"""Continue execution"""
self.agent_system.resume()
def do_messages(self, arg):
"""Show message queue: messages [agent_name]"""
if arg:
messages = self.agent_system.get_agent_messages(arg)
else:
messages = self.agent_system.get_all_messages()
for msg in messages:
print(f"{msg['from']} -> {msg['to']}: {msg['type']} - {msg['content'][:50]}...")
def do_history(self, arg):
"""Show execution history: history [limit]"""
limit = int(arg) if arg else 20
history = self.agent_system.get_execution_history(limit)
for entry in history:
print(f"[{entry['timestamp']}] {entry['agent']}: {entry['action']}")
def _check_watches(self):
"""Check and display watch expressions"""
for watch_id, expression in self.watch_expressions.items():
try:
# Evaluate expression in agent context
value = eval(expression, {"agents": self.agent_system.agents})
print(f"Watch {watch_id}: {expression} = {value}")
except Exception as e:
print(f"Watch {watch_id}: {expression} - Error: {e}")
Debugging Strategies
1. Deterministic Replay
Capture and replay agent interactions:Copy
import pickle
class InteractionRecorder:
def __init__(self):
self.recordings = {}
self.current_recording = None
def start_recording(self, name: str):
"""Start recording interactions"""
self.current_recording = {
"name": name,
"start_time": time.time(),
"interactions": [],
"random_seeds": [],
"external_calls": []
}
def record_interaction(self, interaction: Dict[str, Any]):
"""Record an agent interaction"""
if self.current_recording:
self.current_recording["interactions"].append({
"timestamp": time.time(),
"data": interaction
})
def record_random_seed(self, seed: int):
"""Record random seed for deterministic replay"""
if self.current_recording:
self.current_recording["random_seeds"].append(seed)
def stop_recording(self) -> str:
"""Stop recording and save"""
if not self.current_recording:
return None
recording_id = str(uuid.uuid4())
self.recordings[recording_id] = self.current_recording
self.current_recording = None
return recording_id
def save_recording(self, recording_id: str, filepath: str):
"""Save recording to file"""
if recording_id not in self.recordings:
raise ValueError(f"Recording {recording_id} not found")
with open(filepath, 'wb') as f:
pickle.dump(self.recordings[recording_id], f)
def load_recording(self, filepath: str) -> str:
"""Load recording from file"""
with open(filepath, 'rb') as f:
recording = pickle.load(f)
recording_id = str(uuid.uuid4())
self.recordings[recording_id] = recording
return recording_id
class InteractionReplayer:
def __init__(self, agent_system):
self.agent_system = agent_system
self.current_replay = None
self.replay_index = 0
def start_replay(self, recording: Dict[str, Any]):
"""Start replaying a recording"""
self.current_replay = recording
self.replay_index = 0
# Set random seeds for determinism
if recording["random_seeds"]:
import random
import numpy as np
random.seed(recording["random_seeds"][0])
np.random.seed(recording["random_seeds"][0])
def replay_next(self) -> bool:
"""Replay next interaction"""
if not self.current_replay or self.replay_index >= len(self.current_replay["interactions"]):
return False
interaction = self.current_replay["interactions"][self.replay_index]
# Replay the interaction
self._execute_interaction(interaction["data"])
self.replay_index += 1
return True
def replay_all(self, speed: float = 1.0):
"""Replay all interactions"""
if not self.current_replay:
return
start_time = self.current_replay["interactions"][0]["timestamp"]
for interaction in self.current_replay["interactions"]:
# Calculate delay
delay = (interaction["timestamp"] - start_time) / speed
time.sleep(max(0, delay))
self._execute_interaction(interaction["data"])
def _execute_interaction(self, interaction: Dict[str, Any]):
"""Execute a recorded interaction"""
# Route interaction to appropriate agent
if interaction["type"] == "message":
self.agent_system.send_message(
from_agent=interaction["from"],
to_agent=interaction["to"],
content=interaction["content"]
)
elif interaction["type"] == "state_change":
agent = self.agent_system.get_agent(interaction["agent"])
if agent:
agent.set_state(interaction["new_state"])
2. Chaos Engineering
Test system resilience:Copy
import random
class ChaosMonkey:
def __init__(self, agent_system, chaos_level: float = 0.1):
self.agent_system = agent_system
self.chaos_level = chaos_level # Probability of chaos
self.chaos_events = []
def inject_chaos(self):
"""Randomly inject chaos into the system"""
if random.random() > self.chaos_level:
return
chaos_type = random.choice([
"kill_agent",
"delay_message",
"corrupt_message",
"network_partition",
"resource_exhaustion"
])
self._execute_chaos(chaos_type)
def _execute_chaos(self, chaos_type: str):
"""Execute specific chaos event"""
event = {
"timestamp": time.time(),
"type": chaos_type,
"details": {}
}
if chaos_type == "kill_agent":
agents = self.agent_system.get_all_agents()
if agents:
victim = random.choice(agents)
self.agent_system.kill_agent(victim.name)
event["details"]["agent"] = victim.name
elif chaos_type == "delay_message":
delay = random.uniform(1, 5) # 1-5 second delay
self.agent_system.add_message_delay(delay)
event["details"]["delay"] = delay
elif chaos_type == "corrupt_message":
self.agent_system.corrupt_next_message()
event["details"]["corruption"] = "next_message"
elif chaos_type == "network_partition":
agents = self.agent_system.get_all_agents()
if len(agents) >= 2:
partition_size = len(agents) // 2
partition = random.sample(agents, partition_size)
self.agent_system.create_network_partition(
[a.name for a in partition]
)
event["details"]["partition"] = [a.name for a in partition]
elif chaos_type == "resource_exhaustion":
resource = random.choice(["memory", "cpu", "tokens"])
self.agent_system.simulate_resource_exhaustion(resource)
event["details"]["resource"] = resource
self.chaos_events.append(event)
# Log chaos event
logger = MultiAgentDebugLogger()
logger.log_agent_event(
"chaos_monkey",
"chaos_injected",
event,
level="WARNING"
)
3. Performance Profiling
Profile agent performance:Copy
import cProfile
import pstats
from io import StringIO
class AgentPerformanceProfiler:
def __init__(self):
self.profiles = {}
self.metrics = defaultdict(lambda: {
"execution_times": [],
"memory_usage": [],
"message_latency": []
})
@contextmanager
def profile_agent(self, agent_name: str):
"""Profile agent execution"""
profiler = cProfile.Profile()
# Memory before
import psutil
process = psutil.Process()
mem_before = process.memory_info().rss / 1024 / 1024 # MB
start_time = time.time()
profiler.enable()
try:
yield
finally:
profiler.disable()
# Execution time
execution_time = time.time() - start_time
self.metrics[agent_name]["execution_times"].append(execution_time)
# Memory after
mem_after = process.memory_info().rss / 1024 / 1024 # MB
memory_delta = mem_after - mem_before
self.metrics[agent_name]["memory_usage"].append(memory_delta)
# Store profile
self.profiles[agent_name] = profiler
def get_profile_stats(self, agent_name: str, top_n: int = 10) -> str:
"""Get profile statistics for an agent"""
if agent_name not in self.profiles:
return f"No profile found for agent {agent_name}"
s = StringIO()
ps = pstats.Stats(self.profiles[agent_name], stream=s)
ps.strip_dirs().sort_stats('cumulative').print_stats(top_n)
return s.getvalue()
def get_performance_summary(self, agent_name: str) -> Dict[str, Any]:
"""Get performance summary for an agent"""
metrics = self.metrics[agent_name]
if not metrics["execution_times"]:
return {"error": "No metrics available"}
return {
"execution_time": {
"avg": np.mean(metrics["execution_times"]),
"min": np.min(metrics["execution_times"]),
"max": np.max(metrics["execution_times"]),
"p95": np.percentile(metrics["execution_times"], 95)
},
"memory_usage": {
"avg": np.mean(metrics["memory_usage"]) if metrics["memory_usage"] else 0,
"max": np.max(metrics["memory_usage"]) if metrics["memory_usage"] else 0
},
"samples": len(metrics["execution_times"])
}
def identify_bottlenecks(self) -> List[Dict[str, Any]]:
"""Identify performance bottlenecks"""
bottlenecks = []
for agent_name, metrics in self.metrics.items():
if not metrics["execution_times"]:
continue
avg_time = np.mean(metrics["execution_times"])
# Check for slow agents
if avg_time > 1.0: # More than 1 second average
bottlenecks.append({
"type": "slow_agent",
"agent": agent_name,
"avg_execution_time": avg_time,
"severity": "high" if avg_time > 5.0 else "medium"
})
# Check for memory leaks
if metrics["memory_usage"]:
memory_growth = np.polyfit(
range(len(metrics["memory_usage"])),
metrics["memory_usage"],
1
)[0]
if memory_growth > 1.0: # Growing > 1MB per execution
bottlenecks.append({
"type": "memory_leak",
"agent": agent_name,
"growth_rate_mb": memory_growth,
"severity": "high"
})
return bottlenecks
Debugging Tools Integration
1. Visual Debugger
Web-based visual debugging interface:Copy
from flask import Flask, render_template_string, jsonify
import json
class VisualDebugger:
def __init__(self, agent_system):
self.agent_system = agent_system
self.app = Flask(__name__)
self.setup_routes()
def setup_routes(self):
@self.app.route('/')
def index():
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>Multi-Agent System Debugger</title>
<script src="https://cdn.jsdelivr.net/npm/d3@7"></script>
<style>
.agent { fill: #69b3a2; }
.agent.error { fill: #ff6b6b; }
.link { stroke: #999; stroke-opacity: 0.6; }
.message { fill: #feca57; }
</style>
</head>
<body>
<h1>Multi-Agent System Debugger</h1>
<div id="graph"></div>
<div id="logs"></div>
<script>
// D3.js visualization code
const width = 800;
const height = 600;
const svg = d3.select("#graph")
.append("svg")
.attr("width", width)
.attr("height", height);
// Update function
function updateGraph() {
fetch('/api/system-state')
.then(response => response.json())
.then(data => {
// Update visualization
renderAgents(data.agents);
renderMessages(data.messages);
});
}
// Update every second
setInterval(updateGraph, 1000);
</script>
</body>
</html>
''')
@self.app.route('/api/system-state')
def system_state():
agents = []
for agent in self.agent_system.get_all_agents():
agents.append({
"name": agent.name,
"status": "active" if agent.is_active else "inactive",
"state": agent.get_state()
})
messages = []
for msg in self.agent_system.get_message_queue():
messages.append({
"from": msg["from"],
"to": msg["to"],
"type": msg["type"]
})
return jsonify({
"agents": agents,
"messages": messages,
"timestamp": time.time()
})
@self.app.route('/api/agent/<agent_name>')
def agent_detail(agent_name):
agent = self.agent_system.get_agent(agent_name)
if not agent:
return jsonify({"error": "Agent not found"}), 404
return jsonify({
"name": agent.name,
"state": agent.get_state(),
"history": agent.get_history(),
"metrics": agent.get_metrics()
})
def run(self, host='localhost', port=5000):
"""Run the visual debugger"""
self.app.run(host=host, port=port, debug=True)
Best Practices
-
Use Correlation IDs: Track requests across agents
Copy
def generate_correlation_id() -> str: return f"req_{uuid.uuid4().hex[:8]}" def propagate_correlation_id(correlation_id: str, message: Dict): message["correlation_id"] = correlation_id return message -
Implement Health Checks: Regular system health monitoring
Copy
class HealthChecker: def check_agent_health(self, agent) -> Dict[str, Any]: return { "responsive": agent.ping(), "memory_usage": agent.get_memory_usage(), "queue_size": len(agent.message_queue), "last_activity": agent.last_activity_time } -
Use Debug Assertions: Add assertions that can be enabled in debug mode
Copy
DEBUG_MODE = os.environ.get('DEBUG', 'false').lower() == 'true' def debug_assert(condition: bool, message: str): if DEBUG_MODE and not condition: raise AssertionError(f"Debug assertion failed: {message}")
Testing Debugging Tools
Copy
import pytest
def test_distributed_tracing():
tracer = DistributedTracer()
# Create trace
with tracer.span("main_operation", "agent1") as span1:
span1.add_tag("user_id", "123")
with tracer.span("sub_operation", "agent2", span1) as span2:
span2.log("Processing data")
time.sleep(0.1)
# Verify trace
trace = tracer.get_trace(span1.trace_id)
assert len(trace) == 2
assert trace[0].operation == "main_operation"
assert trace[1].parent_id == trace[0].span_id
def test_state_inspector():
inspector = AgentStateInspector()
# Capture states
inspector.capture_state("agent1", {"counter": 1, "status": "active"})
time.sleep(0.1)
inspector.capture_state("agent1", {"counter": 2, "status": "active"})
# Get timeline
timeline = inspector.get_state_timeline("agent1", "counter")
assert len(timeline) == 2
assert timeline[0][1] == 1
assert timeline[1][1] == 2
def test_chaos_monkey():
# Mock agent system
agent_system = Mock()
agent_system.get_all_agents.return_value = [
Mock(name="agent1"),
Mock(name="agent2")
]
chaos = ChaosMonkey(agent_system, chaos_level=1.0) # Always inject chaos
chaos.inject_chaos()
# Verify chaos was injected
assert len(chaos.chaos_events) == 1
assert chaos.chaos_events[0]["type"] in [
"kill_agent", "delay_message", "corrupt_message",
"network_partition", "resource_exhaustion"
]

