Debugging Guide
Comprehensive guide for debugging Pixell agents, including tools, techniques, and best practices for identifying and resolving issues.
Note: This documentation is generated by AI based on the source code, and therefore it may have some incorrect knowledge of the project. In that case, please contact engineering@pixell.global
Overview
Debugging agent systems requires understanding the distributed nature of the architecture, the communication patterns between components, and the various failure modes that can occur. This guide covers debugging techniques for both individual agents and multi-agent systems.
Debugging Tools and Techniques
1. Logging and Monitoring
Implement comprehensive logging for debugging:
import logging
import json
import traceback
from datetime import datetime
from typing import Dict, Any, Optional
import sys
class AgentDebugger:
    """Debugging utilities for agents"""
    
    def __init__(self, agent_id: str, log_level: str = "DEBUG"):
        self.agent_id = agent_id
        self.setup_logging(log_level)
        self.debug_context = {}
    
    def setup_logging(self, log_level: str):
        """Setup structured logging for debugging"""
        logging.basicConfig(
            level=getattr(logging, log_level.upper()),
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.StreamHandler(sys.stdout),
                logging.FileHandler(f'agent_{self.agent_id}_debug.log')
            ]
        )
        
        self.logger = logging.getLogger(f"agent.{self.agent_id}")
    
    def log_debug(self, message: str, context: Dict[str, Any] = None):
        """Log debug information"""
        self.logger.debug(json.dumps({
            "agent_id": self.agent_id,
            "message": message,
            "context": context or {},
            "timestamp": datetime.utcnow().isoformat()
        }))
    
    def log_error(self, message: str, error: Exception, context: Dict[str, Any] = None):
        """Log error with full traceback"""
        self.logger.error(json.dumps({
            "agent_id": self.agent_id,
            "message": message,
            "error": str(error),
            "traceback": traceback.format_exc(),
            "context": context or {},
            "timestamp": datetime.utcnow().isoformat()
        }))
    
    def log_request(self, request_id: str, request: Dict[str, Any]):
        """Log incoming request"""
        self.logger.info(json.dumps({
            "event": "request_received",
            "request_id": request_id,
            "agent_id": self.agent_id,
            "request": request,
            "timestamp": datetime.utcnow().isoformat()
        }))
    
    def log_response(self, request_id: str, response: Dict[str, Any], duration: float):
        """Log response"""
        self.logger.info(json.dumps({
            "event": "response_sent",
            "request_id": request_id,
            "agent_id": self.agent_id,
            "response": response,
            "duration_ms": duration * 1000,
            "timestamp": datetime.utcnow().isoformat()
        }))
    
    def set_debug_context(self, key: str, value: Any):
        """Set debug context"""
        self.debug_context[key] = value
    
    def get_debug_context(self) -> Dict[str, Any]:
        """Get current debug context"""
        return self.debug_context.copy()
# Usage
debugger = AgentDebugger("my-agent", "DEBUG")
async def process_request_with_debugging(request: Dict[str, Any]) -> Dict[str, Any]:
    """Process request with debugging"""
    request_id = request.get("id", "unknown")
    
    # Log incoming request
    debugger.log_request(request_id, request)
    
    try:
        # Set debug context
        debugger.set_debug_context("processing_start", datetime.utcnow().isoformat())
        
        # Process request
        result = await process_request(request)
        
        # Log successful response
        debugger.log_response(request_id, result, 0.1)
        
        return result
        
    except Exception as e:
        # Log error with context
        debugger.log_error(f"Request {request_id} failed", e, {
            "request": request,
            "debug_context": debugger.get_debug_context()
        })
        
        return {
            "success": False,
            "error": str(e),
            "request_id": request_id
        }
2. Request Tracing
Implement request tracing for distributed debugging:
import uuid
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TraceSpan:
    """Trace span for request tracking"""
    span_id: str
    parent_span_id: Optional[str]
    operation_name: str
    start_time: datetime
    end_time: Optional[datetime]
    tags: Dict[str, Any]
    logs: List[Dict[str, Any]]
    status: str = "started"
class RequestTracer:
    """Request tracer for distributed debugging"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.active_spans: Dict[str, TraceSpan] = {}
        self.trace_history: List[TraceSpan] = []
    
    def start_span(self, operation_name: str, parent_span_id: str = None, 
                  tags: Dict[str, Any] = None) -> str:
        """Start a new trace span"""
        span_id = str(uuid.uuid4())
        
        span = TraceSpan(
            span_id=span_id,
            parent_span_id=parent_span_id,
            operation_name=operation_name,
            start_time=datetime.utcnow(),
            end_time=None,
            tags=tags or {},
            logs=[]
        )
        
        self.active_spans[span_id] = span
        return span_id
    
    def finish_span(self, span_id: str, status: str = "completed", 
                   tags: Dict[str, Any] = None):
        """Finish a trace span"""
        if span_id in self.active_spans:
            span = self.active_spans[span_id]
            span.end_time = datetime.utcnow()
            span.status = status
            
            if tags:
                span.tags.update(tags)
            
            # Move to history
            self.trace_history.append(span)
            del self.active_spans[span_id]
    
    def add_span_log(self, span_id: str, message: str, level: str = "info", 
                    fields: Dict[str, Any] = None):
        """Add log to span"""
        if span_id in self.active_spans:
            self.active_spans[span_id].logs.append({
                "message": message,
                "level": level,
                "fields": fields or {},
                "timestamp": datetime.utcnow().isoformat()
            })
    
    def add_span_tag(self, span_id: str, key: str, value: Any):
        """Add tag to span"""
        if span_id in self.active_spans:
            self.active_spans[span_id].tags[key] = value
    
    def get_trace(self, root_span_id: str) -> List[TraceSpan]:
        """Get complete trace for a request"""
        trace = []
        
        def collect_spans(span_id: str):
            # Find spans with this parent
            for span in self.trace_history:
                if span.parent_span_id == span_id:
                    trace.append(span)
                    collect_spans(span.span_id)
        
        # Start with root span
        for span in self.trace_history:
            if span.span_id == root_span_id:
                trace.append(span)
                collect_spans(span_id)
                break
        
        return sorted(trace, key=lambda s: s.start_time)
# Usage with tracing
tracer = RequestTracer("my-agent")
async def traced_request_processing(request: Dict[str, Any]) -> Dict[str, Any]:
    """Process request with tracing"""
    # Start root span
    root_span_id = tracer.start_span(
        "process_request",
        tags={"request_id": request.get("id"), "agent": "my-agent"}
    )
    
    try:
        # Add processing steps
        tracer.add_span_log(root_span_id, "Starting request processing", "info")
        
        # Validate request
        validation_span = tracer.start_span("validate_request", root_span_id)
        validation_result = await validate_request(request)
        tracer.finish_span(validation_span, "completed" if validation_result else "failed")
        
        if not validation_result:
            tracer.add_span_tag(root_span_id, "validation_failed", True)
            tracer.finish_span(root_span_id, "failed", {"error": "validation_failed"})
            return {"success": False, "error": "validation_failed"}
        
        # Process request
        processing_span = tracer.start_span("process_data", root_span_id)
        result = await process_data(request)
        tracer.finish_span(processing_span, "completed")
        
        # Send response
        response_span = tracer.start_span("send_response", root_span_id)
        await send_response(result)
        tracer.finish_span(response_span, "completed")
        
        tracer.finish_span(root_span_id, "completed")
        return result
        
    except Exception as e:
        tracer.add_span_log(root_span_id, f"Error: {str(e)}", "error")
        tracer.finish_span(root_span_id, "failed", {"error": str(e)})
        raise
3. Performance Profiling
Profile agent performance to identify bottlenecks:
import time
import cProfile
import pstats
from typing import Dict, Any, List
from functools import wraps
import asyncio
class PerformanceProfiler:
    """Performance profiler for agents"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.profiles: Dict[str, Dict[str, Any]] = {}
        self.function_times: Dict[str, List[float]] = {}
    
    def profile_function(self, func_name: str = None):
        """Decorator to profile function performance"""
        def decorator(func):
            name = func_name or func.__name__
            
            @wraps(func)
            async def async_wrapper(*args, **kwargs):
                start_time = time.time()
                try:
                    result = await func(*args, **kwargs)
                    return result
                finally:
                    duration = time.time() - start_time
                    self.record_function_time(name, duration)
            
            @wraps(func)
            def sync_wrapper(*args, **kwargs):
                start_time = time.time()
                try:
                    result = func(*args, **kwargs)
                    return result
                finally:
                    duration = time.time() - start_time
                    self.record_function_time(name, duration)
            
            return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
        
        return decorator
    
    def record_function_time(self, func_name: str, duration: float):
        """Record function execution time"""
        if func_name not in self.function_times:
            self.function_times[func_name] = []
        
        self.function_times[func_name].append(duration)
        
        # Keep only last 1000 measurements
        if len(self.function_times[func_name]) > 1000:
            self.function_times[func_name] = self.function_times[func_name][-1000:]
    
    def get_performance_stats(self) -> Dict[str, Any]:
        """Get performance statistics"""
        stats = {}
        
        for func_name, times in self.function_times.items():
            if times:
                stats[func_name] = {
                    "count": len(times),
                    "total_time": sum(times),
                    "average_time": sum(times) / len(times),
                    "min_time": min(times),
                    "max_time": max(times),
                    "recent_avg": sum(times[-10:]) / min(len(times), 10) if times else 0
                }
        
        return stats
    
    def start_cpu_profiling(self, profile_name: str):
        """Start CPU profiling"""
        self.profiles[profile_name] = {
            "profiler": cProfile.Profile(),
            "start_time": time.time()
        }
        self.profiles[profile_name]["profiler"].enable()
    
    def stop_cpu_profiling(self, profile_name: str) -> Dict[str, Any]:
        """Stop CPU profiling and get results"""
        if profile_name not in self.profiles:
            return {}
        
        profile_data = self.profiles[profile_name]
        profile_data["profiler"].disable()
        
        # Get stats
        stats = pstats.Stats(profile_data["profiler"])
        stats.sort_stats('cumulative')
        
        # Convert to dictionary
        result = {
            "profile_name": profile_name,
            "duration": time.time() - profile_data["start_time"],
            "total_calls": stats.total_calls,
            "primitive_calls": stats.prim_calls,
            "total_time": stats.total_tt,
            "top_functions": []
        }
        
        # Get top 10 functions
        for func, (cc, nc, tt, ct, callers) in stats.stats.items():
            result["top_functions"].append({
                "function": f"{func[0]}:{func[1]}({func[2]})",
                "cumulative_time": ct,
                "total_time": tt,
                "calls": cc
            })
        
        # Sort by cumulative time
        result["top_functions"].sort(key=lambda x: x["cumulative_time"], reverse=True)
        result["top_functions"] = result["top_functions"][:10]
        
        return result
# Usage
profiler = PerformanceProfiler("my-agent")
@profiler.profile_function("process_request")
async def process_request(request: Dict[str, Any]) -> Dict[str, Any]:
    """Process request with profiling"""
    # Your processing logic here
    await asyncio.sleep(0.1)  # Simulate processing
    return {"success": True, "result": "processed"}
# Start profiling
profiler.start_cpu_profiling("request_processing")
# Process some requests
for i in range(100):
    await process_request({"id": i, "data": f"request_{i}"})
# Stop profiling and get results
profile_results = profiler.stop_cpu_profiling("request_processing")
performance_stats = profiler.get_performance_stats()
print("Performance Stats:", performance_stats)
print("CPU Profile:", profile_results)
Common Debugging Scenarios
1. Agent Communication Issues
Debug agent-to-agent communication problems:
class AgentCommunicationDebugger:
    """Debug agent communication issues"""
    
    def __init__(self):
        self.communication_logs: List[Dict[str, Any]] = []
        self.failed_requests: List[Dict[str, Any]] = []
    
    def log_communication(self, event_type: str, source: str, target: str, 
                         message: str, success: bool, error: str = None):
        """Log communication event"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "source": source,
            "target": target,
            "message": message,
            "success": success,
            "error": error
        }
        
        self.communication_logs.append(log_entry)
        
        if not success:
            self.failed_requests.append(log_entry)
    
    def analyze_communication_failures(self) -> Dict[str, Any]:
        """Analyze communication failures"""
        if not self.failed_requests:
            return {"status": "no_failures"}
        
        # Group failures by type
        failure_types = {}
        for failure in self.failed_requests:
            error = failure.get("error", "unknown")
            if error not in failure_types:
                failure_types[error] = 0
            failure_types[error] += 1
        
        # Find patterns
        recent_failures = [
            f for f in self.failed_requests 
            if (datetime.utcnow() - datetime.fromisoformat(f["timestamp"])).seconds < 300
        ]
        
        return {
            "total_failures": len(self.failed_requests),
            "recent_failures": len(recent_failures),
            "failure_types": failure_types,
            "most_common_error": max(failure_types.items(), key=lambda x: x[1])[0] if failure_types else None
        }
    
    def get_communication_health(self) -> Dict[str, Any]:
        """Get communication health metrics"""
        if not self.communication_logs:
            return {"status": "no_data"}
        
        total_requests = len(self.communication_logs)
        successful_requests = len([log for log in self.communication_logs if log["success"]])
        success_rate = successful_requests / total_requests if total_requests > 0 else 0
        
        # Recent success rate (last 100 requests)
        recent_logs = self.communication_logs[-100:]
        recent_successful = len([log for log in recent_logs if log["success"]])
        recent_success_rate = recent_successful / len(recent_logs) if recent_logs else 0
        
        return {
            "total_requests": total_requests,
            "successful_requests": successful_requests,
            "success_rate": success_rate,
            "recent_success_rate": recent_success_rate,
            "status": "healthy" if recent_success_rate > 0.9 else "degraded" if recent_success_rate > 0.7 else "unhealthy"
        }
# Usage
comm_debugger = AgentCommunicationDebugger()
async def debug_agent_communication():
    """Debug agent communication issues"""
    # Simulate some communication attempts
    comm_debugger.log_communication("request", "agent-a", "agent-b", "process_data", True)
    comm_debugger.log_communication("request", "agent-a", "agent-c", "analyze_data", False, "timeout")
    comm_debugger.log_communication("request", "agent-a", "agent-d", "generate_report", True)
    
    # Analyze failures
    failure_analysis = comm_debugger.analyze_communication_failures()
    print("Failure Analysis:", failure_analysis)
    
    # Check health
    health = comm_debugger.get_communication_health()
    print("Communication Health:", health)
2. Memory and Resource Issues
Debug memory leaks and resource issues:
import psutil
import gc
from typing import Dict, Any, List
import tracemalloc
class ResourceDebugger:
    """Debug resource usage and memory leaks"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.memory_snapshots: List[Dict[str, Any]] = []
        self.resource_usage: List[Dict[str, Any]] = []
        tracemalloc.start()
    
    def take_memory_snapshot(self, label: str = None):
        """Take memory snapshot"""
        process = psutil.Process()
        memory_info = process.memory_info()
        
        # Get tracemalloc snapshot
        snapshot = tracemalloc.take_snapshot()
        top_stats = snapshot.statistics('lineno')
        
        snapshot_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "label": label,
            "rss_mb": memory_info.rss / 1024 / 1024,
            "vms_mb": memory_info.vms / 1024 / 1024,
            "cpu_percent": process.cpu_percent(),
            "num_threads": process.num_threads(),
            "open_files": len(process.open_files()),
            "top_memory_allocations": [
                {
                    "filename": stat.traceback.format()[0],
                    "size_mb": stat.size / 1024 / 1024,
                    "count": stat.count
                }
                for stat in top_stats[:10]
            ]
        }
        
        self.memory_snapshots.append(snapshot_data)
        return snapshot_data
    
    def detect_memory_leaks(self) -> Dict[str, Any]:
        """Detect potential memory leaks"""
        if len(self.memory_snapshots) < 2:
            return {"status": "insufficient_data"}
        
        # Compare first and last snapshots
        first_snapshot = self.memory_snapshots[0]
        last_snapshot = self.memory_snapshots[-1]
        
        rss_growth = last_snapshot["rss_mb"] - first_snapshot["rss_mb"]
        vms_growth = last_snapshot["vms_mb"] - first_snapshot["vms_mb"]
        
        # Check for significant growth
        significant_growth = rss_growth > 100  # 100MB growth
        
        # Analyze allocation patterns
        allocation_growth = {}
        for stat in last_snapshot["top_memory_allocations"]:
            filename = stat["filename"]
            if filename in allocation_growth:
                allocation_growth[filename] += stat["size_mb"]
            else:
                allocation_growth[filename] = stat["size_mb"]
        
        return {
            "rss_growth_mb": rss_growth,
            "vms_growth_mb": vms_growth,
            "significant_growth": significant_growth,
            "allocation_growth": allocation_growth,
            "status": "leak_detected" if significant_growth else "normal"
        }
    
    def get_resource_usage(self) -> Dict[str, Any]:
        """Get current resource usage"""
        process = psutil.Process()
        
        return {
            "timestamp": datetime.utcnow().isoformat(),
            "cpu_percent": process.cpu_percent(),
            "memory_percent": process.memory_percent(),
            "memory_mb": process.memory_info().rss / 1024 / 1024,
            "num_threads": process.num_threads(),
            "open_files": len(process.open_files()),
            "connections": len(process.connections())
        }
    
    def force_garbage_collection(self):
        """Force garbage collection"""
        collected = gc.collect()
        return {"collected_objects": collected}
# Usage
resource_debugger = ResourceDebugger("my-agent")
async def debug_resource_usage():
    """Debug resource usage"""
    # Take initial snapshot
    initial_snapshot = resource_debugger.take_memory_snapshot("initial")
    print("Initial Memory:", initial_snapshot["rss_mb"], "MB")
    
    # Simulate some work
    for i in range(1000):
        data = [f"item_{j}" for j in range(1000)]
        # Process data
        result = [item.upper() for item in data]
        del data, result  # Clean up
    
    # Take final snapshot
    final_snapshot = resource_debugger.take_memory_snapshot("final")
    print("Final Memory:", final_snapshot["rss_mb"], "MB")
    
    # Detect leaks
    leak_analysis = resource_debugger.detect_memory_leaks()
    print("Leak Analysis:", leak_analysis)
    
    # Force garbage collection
    gc_result = resource_debugger.force_garbage_collection()
    print("GC Result:", gc_result)
3. Configuration and Environment Issues
Debug configuration and environment problems:
import os
import sys
from typing import Dict, Any, List
import json
class ConfigurationDebugger:
    """Debug configuration and environment issues"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.config_issues: List[Dict[str, Any]] = []
    
    def validate_environment(self) -> Dict[str, Any]:
        """Validate environment configuration"""
        issues = []
        
        # Check required environment variables
        required_vars = ["API_KEY", "DATABASE_URL", "REDIS_URL"]
        for var in required_vars:
            if not os.getenv(var):
                issues.append({
                    "type": "missing_environment_variable",
                    "variable": var,
                    "severity": "critical"
                })
        
        # Check Python version
        python_version = sys.version_info
        if python_version < (3, 8):
            issues.append({
                "type": "python_version",
                "current_version": f"{python_version.major}.{python_version.minor}",
                "required_version": "3.8+",
                "severity": "critical"
            })
        
        # Check available memory
        try:
            import psutil
            memory = psutil.virtual_memory()
            if memory.available < 1024 * 1024 * 1024:  # 1GB
                issues.append({
                    "type": "low_memory",
                    "available_mb": memory.available / 1024 / 1024,
                    "severity": "warning"
                })
        except ImportError:
            issues.append({
                "type": "missing_dependency",
                "dependency": "psutil",
                "severity": "warning"
            })
        
        return {
            "status": "healthy" if not issues else "issues_found",
            "issues": issues,
            "total_issues": len(issues)
        }
    
    def validate_configuration(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """Validate agent configuration"""
        issues = []
        
        # Check required configuration keys
        required_keys = ["name", "version", "entry_point"]
        for key in required_keys:
            if key not in config:
                issues.append({
                    "type": "missing_config_key",
                    "key": key,
                    "severity": "critical"
                })
        
        # Validate entry point
        if "entry_point" in config:
            entry_point = config["entry_point"]
            if not os.path.exists(entry_point):
                issues.append({
                    "type": "invalid_entry_point",
                    "path": entry_point,
                    "severity": "critical"
                })
        
        # Check port configuration
        if "port" in config:
            port = config["port"]
            if not isinstance(port, int) or port < 1024 or port > 65535:
                issues.append({
                    "type": "invalid_port",
                    "port": port,
                    "severity": "critical"
                })
        
        return {
            "status": "valid" if not issues else "invalid",
            "issues": issues,
            "total_issues": len(issues)
        }
    
    def check_dependencies(self) -> Dict[str, Any]:
        """Check Python dependencies"""
        issues = []
        
        # Check critical dependencies
        critical_deps = ["fastapi", "uvicorn", "grpc", "pydantic"]
        for dep in critical_deps:
            try:
                __import__(dep)
            except ImportError:
                issues.append({
                    "type": "missing_dependency",
                    "dependency": dep,
                    "severity": "critical"
                })
        
        # Check optional dependencies
        optional_deps = ["redis", "psutil", "prometheus_client"]
        for dep in optional_deps:
            try:
                __import__(dep)
            except ImportError:
                issues.append({
                    "type": "missing_optional_dependency",
                    "dependency": dep,
                    "severity": "warning"
                })
        
        return {
            "status": "healthy" if not issues else "dependencies_missing",
            "issues": issues,
            "total_issues": len(issues)
        }
    
    def get_system_info(self) -> Dict[str, Any]:
        """Get system information for debugging"""
        return {
            "agent_id": self.agent_id,
            "python_version": sys.version,
            "platform": sys.platform,
            "working_directory": os.getcwd(),
            "environment_variables": {
                key: value for key, value in os.environ.items()
                if key.startswith(("PAK_", "PAR_", "PIXELL_"))
            },
            "python_path": sys.path,
            "loaded_modules": list(sys.modules.keys())
        }
# Usage
config_debugger = ConfigurationDebugger("my-agent")
def debug_agent_configuration():
    """Debug agent configuration"""
    # Validate environment
    env_validation = config_debugger.validate_environment()
    print("Environment Validation:", env_validation)
    
    # Check dependencies
    deps_check = config_debugger.check_dependencies()
    print("Dependencies Check:", deps_check)
    
    # Get system info
    system_info = config_debugger.get_system_info()
    print("System Info:", json.dumps(system_info, indent=2))
    
    # Validate configuration
    config = {
        "name": "my-agent",
        "version": "1.0.0",
        "entry_point": "src/main.py",
        "port": 8080
    }
    
    config_validation = config_debugger.validate_configuration(config)
    print("Configuration Validation:", config_validation)
Debugging Multi-Agent Systems
1. Distributed Tracing
Trace requests across multiple agents:
class DistributedTracer:
    """Distributed tracer for multi-agent systems"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.traces: Dict[str, List[Dict[str, Any]]] = {}
    
    def start_distributed_trace(self, trace_id: str, operation: str) -> str:
        """Start a distributed trace"""
        span_id = str(uuid.uuid4())
        
        span = {
            "trace_id": trace_id,
            "span_id": span_id,
            "parent_span_id": None,
            "agent_id": self.agent_id,
            "operation": operation,
            "start_time": datetime.utcnow().isoformat(),
            "end_time": None,
            "status": "started",
            "tags": {},
            "logs": []
        }
        
        if trace_id not in self.traces:
            self.traces[trace_id] = []
        
        self.traces[trace_id].append(span)
        return span_id
    
    def finish_span(self, trace_id: str, span_id: str, status: str = "completed"):
        """Finish a span"""
        for span in self.traces.get(trace_id, []):
            if span["span_id"] == span_id:
                span["end_time"] = datetime.utcnow().isoformat()
                span["status"] = status
                break
    
    def add_span_log(self, trace_id: str, span_id: str, message: str, level: str = "info"):
        """Add log to span"""
        for span in self.traces.get(trace_id, []):
            if span["span_id"] == span_id:
                span["logs"].append({
                    "message": message,
                    "level": level,
                    "timestamp": datetime.utcnow().isoformat()
                })
                break
    
    def get_trace(self, trace_id: str) -> List[Dict[str, Any]]:
        """Get complete trace"""
        return self.traces.get(trace_id, [])
    
    def analyze_trace(self, trace_id: str) -> Dict[str, Any]:
        """Analyze trace for issues"""
        trace = self.get_trace(trace_id)
        if not trace:
            return {"status": "trace_not_found"}
        
        # Find failed spans
        failed_spans = [span for span in trace if span["status"] == "failed"]
        
        # Calculate total duration
        start_times = [datetime.fromisoformat(span["start_time"]) for span in trace if span["start_time"]]
        end_times = [datetime.fromisoformat(span["end_time"]) for span in trace if span["end_time"]]
        
        total_duration = 0
        if start_times and end_times:
            total_duration = (max(end_times) - min(start_times)).total_seconds()
        
        # Find slow spans
        slow_spans = []
        for span in trace:
            if span["start_time"] and span["end_time"]:
                duration = (datetime.fromisoformat(span["end_time"]) - 
                          datetime.fromisoformat(span["start_time"])).total_seconds()
                if duration > 1.0:  # Slower than 1 second
                    slow_spans.append({
                        "span_id": span["span_id"],
                        "operation": span["operation"],
                        "duration": duration
                    })
        
        return {
            "status": "failed" if failed_spans else "completed",
            "total_duration": total_duration,
            "failed_spans": len(failed_spans),
            "slow_spans": slow_spans,
            "total_spans": len(trace)
        }
# Usage
distributed_tracer = DistributedTracer("agent-a")
async def debug_distributed_request():
    """Debug distributed request"""
    trace_id = str(uuid.uuid4())
    
    # Start trace
    root_span = distributed_tracer.start_distributed_trace(trace_id, "process_request")
    
    try:
        # Add logs
        distributed_tracer.add_span_log(trace_id, root_span, "Starting request processing")
        
        # Simulate calling other agents
        agent_b_span = distributed_tracer.start_distributed_trace(trace_id, "call_agent_b")
        distributed_tracer.add_span_log(trace_id, agent_b_span, "Calling agent B")
        # Simulate agent B call
        await asyncio.sleep(0.1)
        distributed_tracer.finish_span(trace_id, agent_b_span, "completed")
        
        agent_c_span = distributed_tracer.start_distributed_trace(trace_id, "call_agent_c")
        distributed_tracer.add_span_log(trace_id, agent_c_span, "Calling agent C")
        # Simulate agent C call
        await asyncio.sleep(0.2)
        distributed_tracer.finish_span(trace_id, agent_c_span, "completed")
        
        # Finish root span
        distributed_tracer.finish_span(trace_id, root_span, "completed")
        
    except Exception as e:
        distributed_tracer.add_span_log(trace_id, root_span, f"Error: {str(e)}", "error")
        distributed_tracer.finish_span(trace_id, root_span, "failed")
    
    # Analyze trace
    analysis = distributed_tracer.analyze_trace(trace_id)
    print("Trace Analysis:", analysis)
    
    # Get full trace
    trace = distributed_tracer.get_trace(trace_id)
    print("Full Trace:", json.dumps(trace, indent=2))
2. Health Monitoring
Monitor health across multiple agents:
class MultiAgentHealthMonitor:
    """Monitor health of multiple agents"""
    
    def __init__(self):
        self.agents: Dict[str, Dict[str, Any]] = {}
        self.health_checks: List[Dict[str, Any]] = []
    
    def register_agent(self, agent_id: str, address: str, capabilities: List[str]):
        """Register agent for monitoring"""
        self.agents[agent_id] = {
            "address": address,
            "capabilities": capabilities,
            "status": "unknown",
            "last_check": None,
            "consecutive_failures": 0
        }
    
    async def check_agent_health(self, agent_id: str) -> Dict[str, Any]:
        """Check health of specific agent"""
        if agent_id not in self.agents:
            return {"status": "agent_not_found"}
        
        agent = self.agents[agent_id]
        start_time = time.time()
        
        try:
            # Try to connect to agent
            async with AgentClient(agent["address"]) as client:
                health = await client.health_check()
                
                if health["status"] == "healthy":
                    agent["status"] = "healthy"
                    agent["consecutive_failures"] = 0
                else:
                    agent["status"] = "unhealthy"
                    agent["consecutive_failures"] += 1
                
                agent["last_check"] = datetime.utcnow().isoformat()
                
                return {
                    "status": "healthy",
                    "response_time": time.time() - start_time,
                    "agent_status": agent["status"]
                }
                
        except Exception as e:
            agent["status"] = "unreachable"
            agent["consecutive_failures"] += 1
            agent["last_check"] = datetime.utcnow().isoformat()
            
            return {
                "status": "unhealthy",
                "error": str(e),
                "response_time": time.time() - start_time
            }
    
    async def check_all_agents(self) -> Dict[str, Any]:
        """Check health of all agents"""
        results = {}
        
        for agent_id in self.agents:
            results[agent_id] = await self.check_agent_health(agent_id)
        
        # Analyze overall health
        healthy_agents = sum(1 for result in results.values() if result["status"] == "healthy")
        total_agents = len(results)
        
        return {
            "overall_status": "healthy" if healthy_agents == total_agents else "degraded",
            "healthy_agents": healthy_agents,
            "total_agents": total_agents,
            "agent_results": results
        }
    
    def get_agent_status(self, agent_id: str) -> Dict[str, Any]:
        """Get status of specific agent"""
        if agent_id not in self.agents:
            return {"status": "agent_not_found"}
        
        agent = self.agents[agent_id]
        return {
            "agent_id": agent_id,
            "status": agent["status"],
            "last_check": agent["last_check"],
            "consecutive_failures": agent["consecutive_failures"],
            "address": agent["address"],
            "capabilities": agent["capabilities"]
        }
    
    def get_system_health(self) -> Dict[str, Any]:
        """Get overall system health"""
        total_agents = len(self.agents)
        healthy_agents = sum(1 for agent in self.agents.values() if agent["status"] == "healthy")
        unhealthy_agents = sum(1 for agent in self.agents.values() if agent["status"] == "unhealthy")
        unreachable_agents = sum(1 for agent in self.agents.values() if agent["status"] == "unreachable")
        
        return {
            "total_agents": total_agents,
            "healthy_agents": healthy_agents,
            "unhealthy_agents": unhealthy_agents,
            "unreachable_agents": unreachable_agents,
            "health_percentage": (healthy_agents / total_agents * 100) if total_agents > 0 else 0,
            "status": "healthy" if unhealthy_agents == 0 else "degraded" if unhealthy_agents < total_agents / 2 else "critical"
        }
# Usage
health_monitor = MultiAgentHealthMonitor()
# Register agents
health_monitor.register_agent("agent-a", "localhost:50051", ["data_processing"])
health_monitor.register_agent("agent-b", "localhost:50052", ["analysis"])
health_monitor.register_agent("agent-c", "localhost:50053", ["reporting"])
async def debug_system_health():
    """Debug system health"""
    # Check all agents
    health_results = await health_monitor.check_all_agents()
    print("Health Results:", health_results)
    
    # Get system health
    system_health = health_monitor.get_system_health()
    print("System Health:", system_health)
    
    # Check specific agent
    agent_status = health_monitor.get_agent_status("agent-a")
    print("Agent A Status:", agent_status)
Debugging Tools and Utilities
1. Interactive Debugger
Interactive debugging session:
import pdb
import code
from typing import Dict, Any
class InteractiveDebugger:
    """Interactive debugger for agents"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.debug_context = {}
    
    def set_breakpoint(self, condition: str = None):
        """Set breakpoint with optional condition"""
        if condition:
            if eval(condition, self.debug_context):
                pdb.set_trace()
        else:
            pdb.set_trace()
    
    def debug_interactive(self, local_vars: Dict[str, Any]):
        """Start interactive debugging session"""
        # Add agent context
        debug_vars = {
            "agent_id": self.agent_id,
            "debug_context": self.debug_context,
            **local_vars
        }
        
        # Start interactive session
        code.interact(local=debug_vars)
    
    def inspect_variables(self, variables: Dict[str, Any]) -> Dict[str, Any]:
        """Inspect variables for debugging"""
        inspection = {}
        
        for name, value in variables.items():
            inspection[name] = {
                "type": type(value).__name__,
                "value": str(value)[:100],  # Truncate long values
                "size": len(str(value)) if hasattr(value, "__len__") else "N/A"
            }
        
        return inspection
# Usage
debugger = InteractiveDebugger("my-agent")
async def debug_agent_processing(request: Dict[str, Any]):
    """Debug agent processing with interactive debugging"""
    # Set debug context
    debugger.debug_context["request"] = request
    debugger.debug_context["start_time"] = datetime.utcnow()
    
    try:
        # Process request
        result = await process_request(request)
        
        # Set breakpoint for inspection
        debugger.set_breakpoint("result.get('success') == False")
        
        return result
        
    except Exception as e:
        # Interactive debugging on error
        debugger.debug_interactive({
            "request": request,
            "error": e,
            "traceback": traceback.format_exc()
        })
        raise
2. Debug Dashboard
Create a debug dashboard for monitoring:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
import json
class DebugDashboard:
    """Debug dashboard for agent monitoring"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.app = FastAPI(title=f"Debug Dashboard - {agent_id}")
        self.debug_data = {
            "logs": [],
            "metrics": {},
            "traces": [],
            "health": {}
        }
        self.setup_routes()
    
    def setup_routes(self):
        """Setup dashboard routes"""
        
        @self.app.get("/")
        async def dashboard():
            """Debug dashboard HTML"""
            return HTMLResponse("""
            <!DOCTYPE html>
            <html>
            <head>
                <title>Agent Debug Dashboard</title>
                <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
            </head>
            <body>
                <h1>Agent Debug Dashboard</h1>
                <div id="logs"></div>
                <div id="metrics"></div>
                <div id="traces"></div>
                <script>
                    // WebSocket connection for real-time updates
                    const ws = new WebSocket('ws://localhost:8000/ws');
                    ws.onmessage = function(event) {
                        const data = JSON.parse(event.data);
                        updateDashboard(data);
                    };
                    
                    function updateDashboard(data) {
                        // Update logs
                        const logsDiv = document.getElementById('logs');
                        logsDiv.innerHTML = data.logs.map(log => 
                            `<div>${log.timestamp}: ${log.message}</div>`
                        ).join('');
                        
                        // Update metrics
                        const metricsDiv = document.getElementById('metrics');
                        metricsDiv.innerHTML = JSON.stringify(data.metrics, null, 2);
                    }
                </script>
            </body>
            </html>
            """)
        
        @self.app.get("/debug/logs")
        async def get_logs():
            """Get debug logs"""
            return self.debug_data["logs"]
        
        @self.app.get("/debug/metrics")
        async def get_metrics():
            """Get debug metrics"""
            return self.debug_data["metrics"]
        
        @self.app.get("/debug/traces")
        async def get_traces():
            """Get debug traces"""
            return self.debug_data["traces"]
        
        @self.app.websocket("/ws")
        async def websocket_endpoint(websocket: WebSocket):
            """WebSocket for real-time updates"""
            await websocket.accept()
            while True:
                # Send debug data
                await websocket.send_text(json.dumps(self.debug_data))
                await asyncio.sleep(1)
    
    def add_log(self, message: str, level: str = "info", context: Dict[str, Any] = None):
        """Add log to dashboard"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "message": message,
            "level": level,
            "context": context or {}
        }
        
        self.debug_data["logs"].append(log_entry)
        
        # Keep only last 1000 logs
        if len(self.debug_data["logs"]) > 1000:
            self.debug_data["logs"] = self.debug_data["logs"][-1000:]
    
    def update_metrics(self, metrics: Dict[str, Any]):
        """Update debug metrics"""
        self.debug_data["metrics"].update(metrics)
    
    def add_trace(self, trace: Dict[str, Any]):
        """Add trace to dashboard"""
        self.debug_data["traces"].append(trace)
        
        # Keep only last 100 traces
        if len(self.debug_data["traces"]) > 100:
            self.debug_data["traces"] = self.debug_data["traces"][-100:]
# Usage
dashboard = DebugDashboard("my-agent")
# Add some debug data
dashboard.add_log("Agent started", "info")
dashboard.update_metrics({"cpu_usage": 45.2, "memory_usage": 123.4})
dashboard.add_trace({"operation": "process_request", "duration": 0.123})
# Start dashboard
# uvicorn dashboard.app:app --host 0.0.0.0 --port 8000
Best Practices for Debugging
1. Structured Logging
Use structured logging for better debugging:
import structlog
from typing import Dict, Any
# Configure structured logging
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)
class StructuredAgentLogger:
    """Structured logger for agents"""
    
    def __init__(self, agent_id: str):
        self.logger = structlog.get_logger("agent", agent_id=agent_id)
    
    def log_request(self, request_id: str, request: Dict[str, Any]):
        """Log request with structure"""
        self.logger.info(
            "request_received",
            request_id=request_id,
            request_type=request.get("type"),
            request_size=len(str(request))
        )
    
    def log_processing_step(self, request_id: str, step: str, duration: float):
        """Log processing step"""
        self.logger.info(
            "processing_step",
            request_id=request_id,
            step=step,
            duration_ms=duration * 1000
        )
    
    def log_error(self, request_id: str, error: Exception, context: Dict[str, Any] = None):
        """Log error with context"""
        self.logger.error(
            "processing_error",
            request_id=request_id,
            error=str(error),
            error_type=type(error).__name__,
            context=context or {}
        )
2. Debug Configuration
Configure debugging based on environment:
import os
from typing import Dict, Any
class DebugConfiguration:
    """Debug configuration management"""
    
    def __init__(self):
        self.config = self._load_config()
    
    def _load_config(self) -> Dict[str, Any]:
        """Load debug configuration"""
        return {
            "debug_enabled": os.getenv("DEBUG", "false").lower() == "true",
            "log_level": os.getenv("LOG_LEVEL", "INFO"),
            "trace_enabled": os.getenv("TRACE_ENABLED", "false").lower() == "true",
            "profile_enabled": os.getenv("PROFILE_ENABLED", "false").lower() == "true",
            "debug_port": int(os.getenv("DEBUG_PORT", "5678")),
            "max_log_size": int(os.getenv("MAX_LOG_SIZE", "1000")),
            "trace_sampling_rate": float(os.getenv("TRACE_SAMPLING_RATE", "0.1"))
        }
    
    def is_debug_enabled(self) -> bool:
        """Check if debugging is enabled"""
        return self.config["debug_enabled"]
    
    def should_trace(self) -> bool:
        """Check if tracing should be enabled"""
        return self.config["trace_enabled"]
    
    def should_profile(self) -> bool:
        """Check if profiling should be enabled"""
        return self.config["profile_enabled"]
    
    def get_log_level(self) -> str:
        """Get log level"""
        return self.config["log_level"]
    
    def get_debug_port(self) -> int:
        """Get debug port"""
        return self.config["debug_port"]
# Usage
debug_config = DebugConfiguration()
if debug_config.is_debug_enabled():
    # Enable debugging
    debugger = AgentDebugger("my-agent", debug_config.get_log_level())
    
if debug_config.should_trace():
    # Enable tracing
    tracer = RequestTracer("my-agent")
    
if debug_config.should_profile():
    # Enable profiling
    profiler = PerformanceProfiler("my-agent")
Next Steps
After setting up debugging:
- UI Integration Guide - Integrate with user interfaces
 - Best Practices - Follow development best practices
 - Full Deployment Guide - Deploy with proper monitoring
 - Agent-to-Agent Communication - Debug multi-agent systems
 
Ready to debug your agents? Check out UI Integration Guide to learn how to integrate with user interfaces!