Skip to main content

Debugging Multi-Agent Systems

Debugging multi-agent systems presents unique challenges due to their distributed nature, asynchronous operations, and complex interactions. This guide provides strategies and tools for effective debugging.

Debugging Challenges

Common Issues in Multi-Agent Systems

  1. Race Conditions: Timing-dependent bugs
  2. State Inconsistencies: Agents having different views of shared state
  3. Communication Failures: Lost or corrupted messages between agents
  4. Cascading Failures: One agent’s failure affecting others
  5. Non-Deterministic Behavior: Different outcomes from same inputs

Debugging Infrastructure

1. Comprehensive Logging System

Implement structured logging across all agents:
import logging
import json
from datetime import datetime
from typing import Dict, Any, Optional
import traceback
from contextlib import contextmanager

class MultiAgentDebugLogger:
    def __init__(self, log_level: str = "DEBUG"):
        self.loggers = {}
        self.correlation_ids = {}
        self.log_level = getattr(logging, log_level.upper())
        
        # Configure root logger
        logging.basicConfig(
            level=self.log_level,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
    
    def get_logger(self, agent_name: str) -> logging.Logger:
        """Get or create logger for an agent"""
        if agent_name not in self.loggers:
            logger = logging.getLogger(f"agent.{agent_name}")
            logger.setLevel(self.log_level)
            
            # Add custom handler for structured logging
            handler = StructuredLogHandler()
            logger.addHandler(handler)
            
            self.loggers[agent_name] = logger
        
        return self.loggers[agent_name]
    
    @contextmanager
    def correlation_context(self, correlation_id: str):
        """Context manager for correlation ID"""
        import threading
        thread_id = threading.current_thread().ident
        
        self.correlation_ids[thread_id] = correlation_id
        try:
            yield
        finally:
            if thread_id in self.correlation_ids:
                del self.correlation_ids[thread_id]
    
    def log_agent_event(self, agent_name: str, event_type: str, 
                       data: Dict[str, Any], level: str = "INFO"):
        """Log a structured agent event"""
        logger = self.get_logger(agent_name)
        
        # Get correlation ID if available
        import threading
        thread_id = threading.current_thread().ident
        correlation_id = self.correlation_ids.get(thread_id)
        
        event = {
            "timestamp": datetime.utcnow().isoformat(),
            "agent": agent_name,
            "event_type": event_type,
            "correlation_id": correlation_id,
            "data": data
        }
        
        log_method = getattr(logger, level.lower())
        log_method(json.dumps(event))
    
    def log_agent_interaction(self, from_agent: str, to_agent: str,
                            message_type: str, content: Any):
        """Log interaction between agents"""
        interaction = {
            "from": from_agent,
            "to": to_agent,
            "message_type": message_type,
            "content": str(content)[:1000]  # Truncate large messages
        }
        
        self.log_agent_event(
            from_agent,
            "agent_interaction",
            interaction
        )

class StructuredLogHandler(logging.Handler):
    """Custom handler for structured logging"""
    
    def emit(self, record):
        try:
            # Parse JSON if message is JSON
            try:
                log_data = json.loads(record.getMessage())
            except:
                log_data = {"message": record.getMessage()}
            
            # Add metadata
            log_data.update({
                "level": record.levelname,
                "logger": record.name,
                "timestamp": datetime.fromtimestamp(record.created).isoformat(),
                "thread": record.thread,
                "process": record.process
            })
            
            # Add exception info if present
            if record.exc_info:
                log_data["exception"] = {
                    "type": record.exc_info[0].__name__,
                    "message": str(record.exc_info[1]),
                    "traceback": traceback.format_exception(*record.exc_info)
                }
            
            # Output formatted log
            print(json.dumps(log_data, indent=2))
            
        except Exception:
            self.handleError(record)

2. Distributed Tracing

Implement tracing across agent interactions:
import uuid
from dataclasses import dataclass, field
from typing import List, Optional
import time

@dataclass
class Span:
    span_id: str
    parent_id: Optional[str]
    trace_id: str
    operation: str
    agent_name: str
    start_time: float
    end_time: Optional[float] = None
    tags: Dict[str, Any] = field(default_factory=dict)
    logs: List[Dict[str, Any]] = field(default_factory=list)
    status: str = "in_progress"
    
    def finish(self, status: str = "success"):
        """Finish the span"""
        self.end_time = time.time()
        self.status = status
    
    def add_tag(self, key: str, value: Any):
        """Add a tag to the span"""
        self.tags[key] = value
    
    def log(self, message: str, **kwargs):
        """Add a log entry to the span"""
        self.logs.append({
            "timestamp": time.time(),
            "message": message,
            **kwargs
        })

class DistributedTracer:
    def __init__(self):
        self.traces = {}
        self.active_spans = {}
        self._lock = threading.RLock()
    
    def start_trace(self, operation: str, agent_name: str) -> Span:
        """Start a new trace"""
        trace_id = str(uuid.uuid4())
        span_id = str(uuid.uuid4())
        
        span = Span(
            span_id=span_id,
            parent_id=None,
            trace_id=trace_id,
            operation=operation,
            agent_name=agent_name,
            start_time=time.time()
        )
        
        with self._lock:
            self.traces[trace_id] = [span]
            self.active_spans[span_id] = span
        
        return span
    
    def start_span(self, parent_span: Span, operation: str, 
                   agent_name: str) -> Span:
        """Start a child span"""
        span_id = str(uuid.uuid4())
        
        span = Span(
            span_id=span_id,
            parent_id=parent_span.span_id,
            trace_id=parent_span.trace_id,
            operation=operation,
            agent_name=agent_name,
            start_time=time.time()
        )
        
        with self._lock:
            self.traces[parent_span.trace_id].append(span)
            self.active_spans[span_id] = span
        
        return span
    
    @contextmanager
    def span(self, operation: str, agent_name: str, parent_span: Optional[Span] = None):
        """Context manager for spans"""
        if parent_span:
            span = self.start_span(parent_span, operation, agent_name)
        else:
            span = self.start_trace(operation, agent_name)
        
        try:
            yield span
            span.finish("success")
        except Exception as e:
            span.log(f"Error: {str(e)}", error_type=type(e).__name__)
            span.finish("error")
            raise
        finally:
            with self._lock:
                if span.span_id in self.active_spans:
                    del self.active_spans[span.span_id]
    
    def get_trace(self, trace_id: str) -> List[Span]:
        """Get all spans for a trace"""
        with self._lock:
            return self.traces.get(trace_id, [])
    
    def visualize_trace(self, trace_id: str) -> str:
        """Generate a visual representation of the trace"""
        spans = self.get_trace(trace_id)
        if not spans:
            return "Trace not found"
        
        # Sort spans by start time
        spans.sort(key=lambda s: s.start_time)
        
        # Build visualization
        lines = [f"Trace ID: {trace_id}\n"]
        
        # Create a mapping of span_id to children
        children = {}
        for span in spans:
            if span.parent_id:
                if span.parent_id not in children:
                    children[span.parent_id] = []
                children[span.parent_id].append(span)
        
        # Recursively print spans
        def print_span(span: Span, indent: int = 0):
            duration = (span.end_time or time.time()) - span.start_time
            status_symbol = "✓" if span.status == "success" else "✗"
            
            line = f"{'  ' * indent}{status_symbol} {span.agent_name}: {span.operation} ({duration:.3f}s)"
            
            if span.tags:
                line += f" tags={span.tags}"
            
            lines.append(line)
            
            # Print children
            for child in children.get(span.span_id, []):
                print_span(child, indent + 1)
        
        # Print root spans
        for span in spans:
            if span.parent_id is None:
                print_span(span)
        
        return "\n".join(lines)

3. State Inspection Tools

Tools for inspecting agent and system state:
class AgentStateInspector:
    def __init__(self):
        self.snapshots = {}
        self.state_history = defaultdict(list)
    
    def capture_state(self, agent_name: str, state: Dict[str, Any], 
                     timestamp: Optional[float] = None):
        """Capture agent state snapshot"""
        if timestamp is None:
            timestamp = time.time()
        
        snapshot = {
            "timestamp": timestamp,
            "state": self._deep_copy_state(state)
        }
        
        self.snapshots[agent_name] = snapshot
        self.state_history[agent_name].append(snapshot)
    
    def _deep_copy_state(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """Deep copy state, handling non-serializable objects"""
        import copy
        
        try:
            return copy.deepcopy(state)
        except:
            # Fallback for non-copyable objects
            copied = {}
            for key, value in state.items():
                try:
                    copied[key] = copy.deepcopy(value)
                except:
                    copied[key] = f"<{type(value).__name__} object>"
            return copied
    
    def compare_states(self, agent_name: str, time1: float, time2: float) -> Dict[str, Any]:
        """Compare agent states at two different times"""
        history = self.state_history[agent_name]
        
        # Find closest snapshots to requested times
        snapshot1 = min(history, key=lambda s: abs(s["timestamp"] - time1))
        snapshot2 = min(history, key=lambda s: abs(s["timestamp"] - time2))
        
        return self._diff_states(snapshot1["state"], snapshot2["state"])
    
    def _diff_states(self, state1: Dict[str, Any], state2: Dict[str, Any]) -> Dict[str, Any]:
        """Compute difference between two states"""
        diff = {
            "added": {},
            "removed": {},
            "changed": {}
        }
        
        all_keys = set(state1.keys()) | set(state2.keys())
        
        for key in all_keys:
            if key not in state1:
                diff["added"][key] = state2[key]
            elif key not in state2:
                diff["removed"][key] = state1[key]
            elif state1[key] != state2[key]:
                diff["changed"][key] = {
                    "from": state1[key],
                    "to": state2[key]
                }
        
        return diff
    
    def get_state_timeline(self, agent_name: str, 
                          key_path: str) -> List[Tuple[float, Any]]:
        """Get timeline of changes for a specific state key"""
        timeline = []
        
        for snapshot in self.state_history[agent_name]:
            value = self._get_nested_value(snapshot["state"], key_path)
            if value is not None:
                timeline.append((snapshot["timestamp"], value))
        
        return timeline
    
    def _get_nested_value(self, state: Dict[str, Any], key_path: str) -> Any:
        """Get nested value using dot notation"""
        keys = key_path.split('.')
        value = state
        
        for key in keys:
            if isinstance(value, dict) and key in value:
                value = value[key]
            else:
                return None
        
        return value

4. Debug Command Interface

Interactive debugging interface:
import cmd
import pprint

class AgentDebugger(cmd.Cmd):
    intro = "Multi-Agent System Debugger. Type help or ? to list commands."
    prompt = "(debug) "
    
    def __init__(self, agent_system):
        super().__init__()
        self.agent_system = agent_system
        self.tracer = DistributedTracer()
        self.inspector = AgentStateInspector()
        self.breakpoints = set()
        self.watch_expressions = {}
    
    def do_agents(self, arg):
        """List all agents in the system"""
        agents = self.agent_system.get_all_agents()
        for agent in agents:
            status = "active" if agent.is_active else "inactive"
            print(f"{agent.name} ({status})")
    
    def do_state(self, arg):
        """Show state of an agent: state <agent_name>"""
        if not arg:
            print("Usage: state <agent_name>")
            return
        
        agent = self.agent_system.get_agent(arg)
        if not agent:
            print(f"Agent '{arg}' not found")
            return
        
        state = agent.get_state()
        pprint.pprint(state)
    
    def do_trace(self, arg):
        """Start tracing: trace <on|off>"""
        if arg == "on":
            self.agent_system.enable_tracing(self.tracer)
            print("Tracing enabled")
        elif arg == "off":
            self.agent_system.disable_tracing()
            print("Tracing disabled")
        else:
            print("Usage: trace <on|off>")
    
    def do_break(self, arg):
        """Set breakpoint: break <agent_name>.<method_name>"""
        if not arg:
            print("Usage: break <agent_name>.<method_name>")
            return
        
        self.breakpoints.add(arg)
        print(f"Breakpoint set at {arg}")
    
    def do_watch(self, arg):
        """Watch expression: watch <expression>"""
        if not arg:
            print("Usage: watch <expression>")
            return
        
        watch_id = len(self.watch_expressions) + 1
        self.watch_expressions[watch_id] = arg
        print(f"Watch {watch_id}: {arg}")
    
    def do_step(self, arg):
        """Step through execution"""
        self.agent_system.step()
        self._check_watches()
    
    def do_continue(self, arg):
        """Continue execution"""
        self.agent_system.resume()
    
    def do_messages(self, arg):
        """Show message queue: messages [agent_name]"""
        if arg:
            messages = self.agent_system.get_agent_messages(arg)
        else:
            messages = self.agent_system.get_all_messages()
        
        for msg in messages:
            print(f"{msg['from']} -> {msg['to']}: {msg['type']} - {msg['content'][:50]}...")
    
    def do_history(self, arg):
        """Show execution history: history [limit]"""
        limit = int(arg) if arg else 20
        history = self.agent_system.get_execution_history(limit)
        
        for entry in history:
            print(f"[{entry['timestamp']}] {entry['agent']}: {entry['action']}")
    
    def _check_watches(self):
        """Check and display watch expressions"""
        for watch_id, expression in self.watch_expressions.items():
            try:
                # Evaluate expression in agent context
                value = eval(expression, {"agents": self.agent_system.agents})
                print(f"Watch {watch_id}: {expression} = {value}")
            except Exception as e:
                print(f"Watch {watch_id}: {expression} - Error: {e}")

Debugging Strategies

1. Deterministic Replay

Capture and replay agent interactions:
import pickle

class InteractionRecorder:
    def __init__(self):
        self.recordings = {}
        self.current_recording = None
    
    def start_recording(self, name: str):
        """Start recording interactions"""
        self.current_recording = {
            "name": name,
            "start_time": time.time(),
            "interactions": [],
            "random_seeds": [],
            "external_calls": []
        }
    
    def record_interaction(self, interaction: Dict[str, Any]):
        """Record an agent interaction"""
        if self.current_recording:
            self.current_recording["interactions"].append({
                "timestamp": time.time(),
                "data": interaction
            })
    
    def record_random_seed(self, seed: int):
        """Record random seed for deterministic replay"""
        if self.current_recording:
            self.current_recording["random_seeds"].append(seed)
    
    def stop_recording(self) -> str:
        """Stop recording and save"""
        if not self.current_recording:
            return None
        
        recording_id = str(uuid.uuid4())
        self.recordings[recording_id] = self.current_recording
        self.current_recording = None
        
        return recording_id
    
    def save_recording(self, recording_id: str, filepath: str):
        """Save recording to file"""
        if recording_id not in self.recordings:
            raise ValueError(f"Recording {recording_id} not found")
        
        with open(filepath, 'wb') as f:
            pickle.dump(self.recordings[recording_id], f)
    
    def load_recording(self, filepath: str) -> str:
        """Load recording from file"""
        with open(filepath, 'rb') as f:
            recording = pickle.load(f)
        
        recording_id = str(uuid.uuid4())
        self.recordings[recording_id] = recording
        
        return recording_id

class InteractionReplayer:
    def __init__(self, agent_system):
        self.agent_system = agent_system
        self.current_replay = None
        self.replay_index = 0
    
    def start_replay(self, recording: Dict[str, Any]):
        """Start replaying a recording"""
        self.current_replay = recording
        self.replay_index = 0
        
        # Set random seeds for determinism
        if recording["random_seeds"]:
            import random
            import numpy as np
            
            random.seed(recording["random_seeds"][0])
            np.random.seed(recording["random_seeds"][0])
    
    def replay_next(self) -> bool:
        """Replay next interaction"""
        if not self.current_replay or self.replay_index >= len(self.current_replay["interactions"]):
            return False
        
        interaction = self.current_replay["interactions"][self.replay_index]
        
        # Replay the interaction
        self._execute_interaction(interaction["data"])
        
        self.replay_index += 1
        return True
    
    def replay_all(self, speed: float = 1.0):
        """Replay all interactions"""
        if not self.current_replay:
            return
        
        start_time = self.current_replay["interactions"][0]["timestamp"]
        
        for interaction in self.current_replay["interactions"]:
            # Calculate delay
            delay = (interaction["timestamp"] - start_time) / speed
            time.sleep(max(0, delay))
            
            self._execute_interaction(interaction["data"])
    
    def _execute_interaction(self, interaction: Dict[str, Any]):
        """Execute a recorded interaction"""
        # Route interaction to appropriate agent
        if interaction["type"] == "message":
            self.agent_system.send_message(
                from_agent=interaction["from"],
                to_agent=interaction["to"],
                content=interaction["content"]
            )
        elif interaction["type"] == "state_change":
            agent = self.agent_system.get_agent(interaction["agent"])
            if agent:
                agent.set_state(interaction["new_state"])

2. Chaos Engineering

Test system resilience:
import random

class ChaosMonkey:
    def __init__(self, agent_system, chaos_level: float = 0.1):
        self.agent_system = agent_system
        self.chaos_level = chaos_level  # Probability of chaos
        self.chaos_events = []
    
    def inject_chaos(self):
        """Randomly inject chaos into the system"""
        if random.random() > self.chaos_level:
            return
        
        chaos_type = random.choice([
            "kill_agent",
            "delay_message",
            "corrupt_message",
            "network_partition",
            "resource_exhaustion"
        ])
        
        self._execute_chaos(chaos_type)
    
    def _execute_chaos(self, chaos_type: str):
        """Execute specific chaos event"""
        event = {
            "timestamp": time.time(),
            "type": chaos_type,
            "details": {}
        }
        
        if chaos_type == "kill_agent":
            agents = self.agent_system.get_all_agents()
            if agents:
                victim = random.choice(agents)
                self.agent_system.kill_agent(victim.name)
                event["details"]["agent"] = victim.name
        
        elif chaos_type == "delay_message":
            delay = random.uniform(1, 5)  # 1-5 second delay
            self.agent_system.add_message_delay(delay)
            event["details"]["delay"] = delay
        
        elif chaos_type == "corrupt_message":
            self.agent_system.corrupt_next_message()
            event["details"]["corruption"] = "next_message"
        
        elif chaos_type == "network_partition":
            agents = self.agent_system.get_all_agents()
            if len(agents) >= 2:
                partition_size = len(agents) // 2
                partition = random.sample(agents, partition_size)
                self.agent_system.create_network_partition(
                    [a.name for a in partition]
                )
                event["details"]["partition"] = [a.name for a in partition]
        
        elif chaos_type == "resource_exhaustion":
            resource = random.choice(["memory", "cpu", "tokens"])
            self.agent_system.simulate_resource_exhaustion(resource)
            event["details"]["resource"] = resource
        
        self.chaos_events.append(event)
        
        # Log chaos event
        logger = MultiAgentDebugLogger()
        logger.log_agent_event(
            "chaos_monkey",
            "chaos_injected",
            event,
            level="WARNING"
        )

3. Performance Profiling

Profile agent performance:
import cProfile
import pstats
from io import StringIO

class AgentPerformanceProfiler:
    def __init__(self):
        self.profiles = {}
        self.metrics = defaultdict(lambda: {
            "execution_times": [],
            "memory_usage": [],
            "message_latency": []
        })
    
    @contextmanager
    def profile_agent(self, agent_name: str):
        """Profile agent execution"""
        profiler = cProfile.Profile()
        
        # Memory before
        import psutil
        process = psutil.Process()
        mem_before = process.memory_info().rss / 1024 / 1024  # MB
        
        start_time = time.time()
        
        profiler.enable()
        try:
            yield
        finally:
            profiler.disable()
            
            # Execution time
            execution_time = time.time() - start_time
            self.metrics[agent_name]["execution_times"].append(execution_time)
            
            # Memory after
            mem_after = process.memory_info().rss / 1024 / 1024  # MB
            memory_delta = mem_after - mem_before
            self.metrics[agent_name]["memory_usage"].append(memory_delta)
            
            # Store profile
            self.profiles[agent_name] = profiler
    
    def get_profile_stats(self, agent_name: str, top_n: int = 10) -> str:
        """Get profile statistics for an agent"""
        if agent_name not in self.profiles:
            return f"No profile found for agent {agent_name}"
        
        s = StringIO()
        ps = pstats.Stats(self.profiles[agent_name], stream=s)
        ps.strip_dirs().sort_stats('cumulative').print_stats(top_n)
        
        return s.getvalue()
    
    def get_performance_summary(self, agent_name: str) -> Dict[str, Any]:
        """Get performance summary for an agent"""
        metrics = self.metrics[agent_name]
        
        if not metrics["execution_times"]:
            return {"error": "No metrics available"}
        
        return {
            "execution_time": {
                "avg": np.mean(metrics["execution_times"]),
                "min": np.min(metrics["execution_times"]),
                "max": np.max(metrics["execution_times"]),
                "p95": np.percentile(metrics["execution_times"], 95)
            },
            "memory_usage": {
                "avg": np.mean(metrics["memory_usage"]) if metrics["memory_usage"] else 0,
                "max": np.max(metrics["memory_usage"]) if metrics["memory_usage"] else 0
            },
            "samples": len(metrics["execution_times"])
        }
    
    def identify_bottlenecks(self) -> List[Dict[str, Any]]:
        """Identify performance bottlenecks"""
        bottlenecks = []
        
        for agent_name, metrics in self.metrics.items():
            if not metrics["execution_times"]:
                continue
            
            avg_time = np.mean(metrics["execution_times"])
            
            # Check for slow agents
            if avg_time > 1.0:  # More than 1 second average
                bottlenecks.append({
                    "type": "slow_agent",
                    "agent": agent_name,
                    "avg_execution_time": avg_time,
                    "severity": "high" if avg_time > 5.0 else "medium"
                })
            
            # Check for memory leaks
            if metrics["memory_usage"]:
                memory_growth = np.polyfit(
                    range(len(metrics["memory_usage"])),
                    metrics["memory_usage"],
                    1
                )[0]
                
                if memory_growth > 1.0:  # Growing > 1MB per execution
                    bottlenecks.append({
                        "type": "memory_leak",
                        "agent": agent_name,
                        "growth_rate_mb": memory_growth,
                        "severity": "high"
                    })
        
        return bottlenecks

Debugging Tools Integration

1. Visual Debugger

Web-based visual debugging interface:
from flask import Flask, render_template_string, jsonify
import json

class VisualDebugger:
    def __init__(self, agent_system):
        self.agent_system = agent_system
        self.app = Flask(__name__)
        self.setup_routes()
    
    def setup_routes(self):
        @self.app.route('/')
        def index():
            return render_template_string('''
            <!DOCTYPE html>
            <html>
            <head>
                <title>Multi-Agent System Debugger</title>
                <script src="https://cdn.jsdelivr.net/npm/d3@7"></script>
                <style>
                    .agent { fill: #69b3a2; }
                    .agent.error { fill: #ff6b6b; }
                    .link { stroke: #999; stroke-opacity: 0.6; }
                    .message { fill: #feca57; }
                </style>
            </head>
            <body>
                <h1>Multi-Agent System Debugger</h1>
                <div id="graph"></div>
                <div id="logs"></div>
                <script>
                    // D3.js visualization code
                    const width = 800;
                    const height = 600;
                    
                    const svg = d3.select("#graph")
                        .append("svg")
                        .attr("width", width)
                        .attr("height", height);
                    
                    // Update function
                    function updateGraph() {
                        fetch('/api/system-state')
                            .then(response => response.json())
                            .then(data => {
                                // Update visualization
                                renderAgents(data.agents);
                                renderMessages(data.messages);
                            });
                    }
                    
                    // Update every second
                    setInterval(updateGraph, 1000);
                </script>
            </body>
            </html>
            ''')
        
        @self.app.route('/api/system-state')
        def system_state():
            agents = []
            for agent in self.agent_system.get_all_agents():
                agents.append({
                    "name": agent.name,
                    "status": "active" if agent.is_active else "inactive",
                    "state": agent.get_state()
                })
            
            messages = []
            for msg in self.agent_system.get_message_queue():
                messages.append({
                    "from": msg["from"],
                    "to": msg["to"],
                    "type": msg["type"]
                })
            
            return jsonify({
                "agents": agents,
                "messages": messages,
                "timestamp": time.time()
            })
        
        @self.app.route('/api/agent/<agent_name>')
        def agent_detail(agent_name):
            agent = self.agent_system.get_agent(agent_name)
            if not agent:
                return jsonify({"error": "Agent not found"}), 404
            
            return jsonify({
                "name": agent.name,
                "state": agent.get_state(),
                "history": agent.get_history(),
                "metrics": agent.get_metrics()
            })
    
    def run(self, host='localhost', port=5000):
        """Run the visual debugger"""
        self.app.run(host=host, port=port, debug=True)

Best Practices

  1. Use Correlation IDs: Track requests across agents
    def generate_correlation_id() -> str:
        return f"req_{uuid.uuid4().hex[:8]}"
    
    def propagate_correlation_id(correlation_id: str, message: Dict):
        message["correlation_id"] = correlation_id
        return message
    
  2. Implement Health Checks: Regular system health monitoring
    class HealthChecker:
        def check_agent_health(self, agent) -> Dict[str, Any]:
            return {
                "responsive": agent.ping(),
                "memory_usage": agent.get_memory_usage(),
                "queue_size": len(agent.message_queue),
                "last_activity": agent.last_activity_time
            }
    
  3. Use Debug Assertions: Add assertions that can be enabled in debug mode
    DEBUG_MODE = os.environ.get('DEBUG', 'false').lower() == 'true'
    
    def debug_assert(condition: bool, message: str):
        if DEBUG_MODE and not condition:
            raise AssertionError(f"Debug assertion failed: {message}")
    

Testing Debugging Tools

import pytest

def test_distributed_tracing():
    tracer = DistributedTracer()
    
    # Create trace
    with tracer.span("main_operation", "agent1") as span1:
        span1.add_tag("user_id", "123")
        
        with tracer.span("sub_operation", "agent2", span1) as span2:
            span2.log("Processing data")
            time.sleep(0.1)
    
    # Verify trace
    trace = tracer.get_trace(span1.trace_id)
    assert len(trace) == 2
    assert trace[0].operation == "main_operation"
    assert trace[1].parent_id == trace[0].span_id

def test_state_inspector():
    inspector = AgentStateInspector()
    
    # Capture states
    inspector.capture_state("agent1", {"counter": 1, "status": "active"})
    time.sleep(0.1)
    inspector.capture_state("agent1", {"counter": 2, "status": "active"})
    
    # Get timeline
    timeline = inspector.get_state_timeline("agent1", "counter")
    assert len(timeline) == 2
    assert timeline[0][1] == 1
    assert timeline[1][1] == 2

def test_chaos_monkey():
    # Mock agent system
    agent_system = Mock()
    agent_system.get_all_agents.return_value = [
        Mock(name="agent1"),
        Mock(name="agent2")
    ]
    
    chaos = ChaosMonkey(agent_system, chaos_level=1.0)  # Always inject chaos
    chaos.inject_chaos()
    
    # Verify chaos was injected
    assert len(chaos.chaos_events) == 1
    assert chaos.chaos_events[0]["type"] in [
        "kill_agent", "delay_message", "corrupt_message",
        "network_partition", "resource_exhaustion"
    ]

Conclusion

Effective debugging of multi-agent systems requires a combination of comprehensive logging, distributed tracing, state inspection, and specialized debugging tools. By implementing these debugging strategies and tools, you can quickly identify and resolve issues in complex multi-agent applications.