Skip to main content

Debugging Guide

Comprehensive guide for debugging Pixell agents, including tools, techniques, and best practices for identifying and resolving issues.

Note: This documentation is generated by AI based on the source code, and therefore it may have some incorrect knowledge of the project. In that case, please contact engineering@pixell.global

Overview

Debugging agent systems requires understanding the distributed nature of the architecture, the communication patterns between components, and the various failure modes that can occur. This guide covers debugging techniques for both individual agents and multi-agent systems.

Debugging Tools and Techniques

1. Logging and Monitoring

Implement comprehensive logging for debugging:

import logging
import json
import traceback
from datetime import datetime
from typing import Dict, Any, Optional
import sys

class AgentDebugger:
"""Debugging utilities for agents"""

def __init__(self, agent_id: str, log_level: str = "DEBUG"):
self.agent_id = agent_id
self.setup_logging(log_level)
self.debug_context = {}

def setup_logging(self, log_level: str):
"""Setup structured logging for debugging"""
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(f'agent_{self.agent_id}_debug.log')
]
)

self.logger = logging.getLogger(f"agent.{self.agent_id}")

def log_debug(self, message: str, context: Dict[str, Any] = None):
"""Log debug information"""
self.logger.debug(json.dumps({
"agent_id": self.agent_id,
"message": message,
"context": context or {},
"timestamp": datetime.utcnow().isoformat()
}))

def log_error(self, message: str, error: Exception, context: Dict[str, Any] = None):
"""Log error with full traceback"""
self.logger.error(json.dumps({
"agent_id": self.agent_id,
"message": message,
"error": str(error),
"traceback": traceback.format_exc(),
"context": context or {},
"timestamp": datetime.utcnow().isoformat()
}))

def log_request(self, request_id: str, request: Dict[str, Any]):
"""Log incoming request"""
self.logger.info(json.dumps({
"event": "request_received",
"request_id": request_id,
"agent_id": self.agent_id,
"request": request,
"timestamp": datetime.utcnow().isoformat()
}))

def log_response(self, request_id: str, response: Dict[str, Any], duration: float):
"""Log response"""
self.logger.info(json.dumps({
"event": "response_sent",
"request_id": request_id,
"agent_id": self.agent_id,
"response": response,
"duration_ms": duration * 1000,
"timestamp": datetime.utcnow().isoformat()
}))

def set_debug_context(self, key: str, value: Any):
"""Set debug context"""
self.debug_context[key] = value

def get_debug_context(self) -> Dict[str, Any]:
"""Get current debug context"""
return self.debug_context.copy()

# Usage
debugger = AgentDebugger("my-agent", "DEBUG")

async def process_request_with_debugging(request: Dict[str, Any]) -> Dict[str, Any]:
"""Process request with debugging"""
request_id = request.get("id", "unknown")

# Log incoming request
debugger.log_request(request_id, request)

try:
# Set debug context
debugger.set_debug_context("processing_start", datetime.utcnow().isoformat())

# Process request
result = await process_request(request)

# Log successful response
debugger.log_response(request_id, result, 0.1)

return result

except Exception as e:
# Log error with context
debugger.log_error(f"Request {request_id} failed", e, {
"request": request,
"debug_context": debugger.get_debug_context()
})

return {
"success": False,
"error": str(e),
"request_id": request_id
}

2. Request Tracing

Implement request tracing for distributed debugging:

import uuid
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class TraceSpan:
"""Trace span for request tracking"""
span_id: str
parent_span_id: Optional[str]
operation_name: str
start_time: datetime
end_time: Optional[datetime]
tags: Dict[str, Any]
logs: List[Dict[str, Any]]
status: str = "started"

class RequestTracer:
"""Request tracer for distributed debugging"""

def __init__(self, agent_id: str):
self.agent_id = agent_id
self.active_spans: Dict[str, TraceSpan] = {}
self.trace_history: List[TraceSpan] = []

def start_span(self, operation_name: str, parent_span_id: str = None,
tags: Dict[str, Any] = None) -> str:
"""Start a new trace span"""
span_id = str(uuid.uuid4())

span = TraceSpan(
span_id=span_id,
parent_span_id=parent_span_id,
operation_name=operation_name,
start_time=datetime.utcnow(),
end_time=None,
tags=tags or {},
logs=[]
)

self.active_spans[span_id] = span
return span_id

def finish_span(self, span_id: str, status: str = "completed",
tags: Dict[str, Any] = None):
"""Finish a trace span"""
if span_id in self.active_spans:
span = self.active_spans[span_id]
span.end_time = datetime.utcnow()
span.status = status

if tags:
span.tags.update(tags)

# Move to history
self.trace_history.append(span)
del self.active_spans[span_id]

def add_span_log(self, span_id: str, message: str, level: str = "info",
fields: Dict[str, Any] = None):
"""Add log to span"""
if span_id in self.active_spans:
self.active_spans[span_id].logs.append({
"message": message,
"level": level,
"fields": fields or {},
"timestamp": datetime.utcnow().isoformat()
})

def add_span_tag(self, span_id: str, key: str, value: Any):
"""Add tag to span"""
if span_id in self.active_spans:
self.active_spans[span_id].tags[key] = value

def get_trace(self, root_span_id: str) -> List[TraceSpan]:
"""Get complete trace for a request"""
trace = []

def collect_spans(span_id: str):
# Find spans with this parent
for span in self.trace_history:
if span.parent_span_id == span_id:
trace.append(span)
collect_spans(span.span_id)

# Start with root span
for span in self.trace_history:
if span.span_id == root_span_id:
trace.append(span)
collect_spans(span_id)
break

return sorted(trace, key=lambda s: s.start_time)

# Usage with tracing
tracer = RequestTracer("my-agent")

async def traced_request_processing(request: Dict[str, Any]) -> Dict[str, Any]:
"""Process request with tracing"""
# Start root span
root_span_id = tracer.start_span(
"process_request",
tags={"request_id": request.get("id"), "agent": "my-agent"}
)

try:
# Add processing steps
tracer.add_span_log(root_span_id, "Starting request processing", "info")

# Validate request
validation_span = tracer.start_span("validate_request", root_span_id)
validation_result = await validate_request(request)
tracer.finish_span(validation_span, "completed" if validation_result else "failed")

if not validation_result:
tracer.add_span_tag(root_span_id, "validation_failed", True)
tracer.finish_span(root_span_id, "failed", {"error": "validation_failed"})
return {"success": False, "error": "validation_failed"}

# Process request
processing_span = tracer.start_span("process_data", root_span_id)
result = await process_data(request)
tracer.finish_span(processing_span, "completed")

# Send response
response_span = tracer.start_span("send_response", root_span_id)
await send_response(result)
tracer.finish_span(response_span, "completed")

tracer.finish_span(root_span_id, "completed")
return result

except Exception as e:
tracer.add_span_log(root_span_id, f"Error: {str(e)}", "error")
tracer.finish_span(root_span_id, "failed", {"error": str(e)})
raise

3. Performance Profiling

Profile agent performance to identify bottlenecks:

import time
import cProfile
import pstats
from typing import Dict, Any, List
from functools import wraps
import asyncio

class PerformanceProfiler:
"""Performance profiler for agents"""

def __init__(self, agent_id: str):
self.agent_id = agent_id
self.profiles: Dict[str, Dict[str, Any]] = {}
self.function_times: Dict[str, List[float]] = {}

def profile_function(self, func_name: str = None):
"""Decorator to profile function performance"""
def decorator(func):
name = func_name or func.__name__

@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
self.record_function_time(name, duration)

@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
self.record_function_time(name, duration)

return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper

return decorator

def record_function_time(self, func_name: str, duration: float):
"""Record function execution time"""
if func_name not in self.function_times:
self.function_times[func_name] = []

self.function_times[func_name].append(duration)

# Keep only last 1000 measurements
if len(self.function_times[func_name]) > 1000:
self.function_times[func_name] = self.function_times[func_name][-1000:]

def get_performance_stats(self) -> Dict[str, Any]:
"""Get performance statistics"""
stats = {}

for func_name, times in self.function_times.items():
if times:
stats[func_name] = {
"count": len(times),
"total_time": sum(times),
"average_time": sum(times) / len(times),
"min_time": min(times),
"max_time": max(times),
"recent_avg": sum(times[-10:]) / min(len(times), 10) if times else 0
}

return stats

def start_cpu_profiling(self, profile_name: str):
"""Start CPU profiling"""
self.profiles[profile_name] = {
"profiler": cProfile.Profile(),
"start_time": time.time()
}
self.profiles[profile_name]["profiler"].enable()

def stop_cpu_profiling(self, profile_name: str) -> Dict[str, Any]:
"""Stop CPU profiling and get results"""
if profile_name not in self.profiles:
return {}

profile_data = self.profiles[profile_name]
profile_data["profiler"].disable()

# Get stats
stats = pstats.Stats(profile_data["profiler"])
stats.sort_stats('cumulative')

# Convert to dictionary
result = {
"profile_name": profile_name,
"duration": time.time() - profile_data["start_time"],
"total_calls": stats.total_calls,
"primitive_calls": stats.prim_calls,
"total_time": stats.total_tt,
"top_functions": []
}

# Get top 10 functions
for func, (cc, nc, tt, ct, callers) in stats.stats.items():
result["top_functions"].append({
"function": f"{func[0]}:{func[1]}({func[2]})",
"cumulative_time": ct,
"total_time": tt,
"calls": cc
})

# Sort by cumulative time
result["top_functions"].sort(key=lambda x: x["cumulative_time"], reverse=True)
result["top_functions"] = result["top_functions"][:10]

return result

# Usage
profiler = PerformanceProfiler("my-agent")

@profiler.profile_function("process_request")
async def process_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Process request with profiling"""
# Your processing logic here
await asyncio.sleep(0.1) # Simulate processing
return {"success": True, "result": "processed"}

# Start profiling
profiler.start_cpu_profiling("request_processing")

# Process some requests
for i in range(100):
await process_request({"id": i, "data": f"request_{i}"})

# Stop profiling and get results
profile_results = profiler.stop_cpu_profiling("request_processing")
performance_stats = profiler.get_performance_stats()

print("Performance Stats:", performance_stats)
print("CPU Profile:", profile_results)

Common Debugging Scenarios

1. Agent Communication Issues

Debug agent-to-agent communication problems:

class AgentCommunicationDebugger:
"""Debug agent communication issues"""

def __init__(self):
self.communication_logs: List[Dict[str, Any]] = []
self.failed_requests: List[Dict[str, Any]] = []

def log_communication(self, event_type: str, source: str, target: str,
message: str, success: bool, error: str = None):
"""Log communication event"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"source": source,
"target": target,
"message": message,
"success": success,
"error": error
}

self.communication_logs.append(log_entry)

if not success:
self.failed_requests.append(log_entry)

def analyze_communication_failures(self) -> Dict[str, Any]:
"""Analyze communication failures"""
if not self.failed_requests:
return {"status": "no_failures"}

# Group failures by type
failure_types = {}
for failure in self.failed_requests:
error = failure.get("error", "unknown")
if error not in failure_types:
failure_types[error] = 0
failure_types[error] += 1

# Find patterns
recent_failures = [
f for f in self.failed_requests
if (datetime.utcnow() - datetime.fromisoformat(f["timestamp"])).seconds < 300
]

return {
"total_failures": len(self.failed_requests),
"recent_failures": len(recent_failures),
"failure_types": failure_types,
"most_common_error": max(failure_types.items(), key=lambda x: x[1])[0] if failure_types else None
}

def get_communication_health(self) -> Dict[str, Any]:
"""Get communication health metrics"""
if not self.communication_logs:
return {"status": "no_data"}

total_requests = len(self.communication_logs)
successful_requests = len([log for log in self.communication_logs if log["success"]])
success_rate = successful_requests / total_requests if total_requests > 0 else 0

# Recent success rate (last 100 requests)
recent_logs = self.communication_logs[-100:]
recent_successful = len([log for log in recent_logs if log["success"]])
recent_success_rate = recent_successful / len(recent_logs) if recent_logs else 0

return {
"total_requests": total_requests,
"successful_requests": successful_requests,
"success_rate": success_rate,
"recent_success_rate": recent_success_rate,
"status": "healthy" if recent_success_rate > 0.9 else "degraded" if recent_success_rate > 0.7 else "unhealthy"
}

# Usage
comm_debugger = AgentCommunicationDebugger()

async def debug_agent_communication():
"""Debug agent communication issues"""
# Simulate some communication attempts
comm_debugger.log_communication("request", "agent-a", "agent-b", "process_data", True)
comm_debugger.log_communication("request", "agent-a", "agent-c", "analyze_data", False, "timeout")
comm_debugger.log_communication("request", "agent-a", "agent-d", "generate_report", True)

# Analyze failures
failure_analysis = comm_debugger.analyze_communication_failures()
print("Failure Analysis:", failure_analysis)

# Check health
health = comm_debugger.get_communication_health()
print("Communication Health:", health)

2. Memory and Resource Issues

Debug memory leaks and resource issues:

import psutil
import gc
from typing import Dict, Any, List
import tracemalloc

class ResourceDebugger:
"""Debug resource usage and memory leaks"""

def __init__(self, agent_id: str):
self.agent_id = agent_id
self.memory_snapshots: List[Dict[str, Any]] = []
self.resource_usage: List[Dict[str, Any]] = []
tracemalloc.start()

def take_memory_snapshot(self, label: str = None):
"""Take memory snapshot"""
process = psutil.Process()
memory_info = process.memory_info()

# Get tracemalloc snapshot
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')

snapshot_data = {
"timestamp": datetime.utcnow().isoformat(),
"label": label,
"rss_mb": memory_info.rss / 1024 / 1024,
"vms_mb": memory_info.vms / 1024 / 1024,
"cpu_percent": process.cpu_percent(),
"num_threads": process.num_threads(),
"open_files": len(process.open_files()),
"top_memory_allocations": [
{
"filename": stat.traceback.format()[0],
"size_mb": stat.size / 1024 / 1024,
"count": stat.count
}
for stat in top_stats[:10]
]
}

self.memory_snapshots.append(snapshot_data)
return snapshot_data

def detect_memory_leaks(self) -> Dict[str, Any]:
"""Detect potential memory leaks"""
if len(self.memory_snapshots) < 2:
return {"status": "insufficient_data"}

# Compare first and last snapshots
first_snapshot = self.memory_snapshots[0]
last_snapshot = self.memory_snapshots[-1]

rss_growth = last_snapshot["rss_mb"] - first_snapshot["rss_mb"]
vms_growth = last_snapshot["vms_mb"] - first_snapshot["vms_mb"]

# Check for significant growth
significant_growth = rss_growth > 100 # 100MB growth

# Analyze allocation patterns
allocation_growth = {}
for stat in last_snapshot["top_memory_allocations"]:
filename = stat["filename"]
if filename in allocation_growth:
allocation_growth[filename] += stat["size_mb"]
else:
allocation_growth[filename] = stat["size_mb"]

return {
"rss_growth_mb": rss_growth,
"vms_growth_mb": vms_growth,
"significant_growth": significant_growth,
"allocation_growth": allocation_growth,
"status": "leak_detected" if significant_growth else "normal"
}

def get_resource_usage(self) -> Dict[str, Any]:
"""Get current resource usage"""
process = psutil.Process()

return {
"timestamp": datetime.utcnow().isoformat(),
"cpu_percent": process.cpu_percent(),
"memory_percent": process.memory_percent(),
"memory_mb": process.memory_info().rss / 1024 / 1024,
"num_threads": process.num_threads(),
"open_files": len(process.open_files()),
"connections": len(process.connections())
}

def force_garbage_collection(self):
"""Force garbage collection"""
collected = gc.collect()
return {"collected_objects": collected}

# Usage
resource_debugger = ResourceDebugger("my-agent")

async def debug_resource_usage():
"""Debug resource usage"""
# Take initial snapshot
initial_snapshot = resource_debugger.take_memory_snapshot("initial")
print("Initial Memory:", initial_snapshot["rss_mb"], "MB")

# Simulate some work
for i in range(1000):
data = [f"item_{j}" for j in range(1000)]
# Process data
result = [item.upper() for item in data]
del data, result # Clean up

# Take final snapshot
final_snapshot = resource_debugger.take_memory_snapshot("final")
print("Final Memory:", final_snapshot["rss_mb"], "MB")

# Detect leaks
leak_analysis = resource_debugger.detect_memory_leaks()
print("Leak Analysis:", leak_analysis)

# Force garbage collection
gc_result = resource_debugger.force_garbage_collection()
print("GC Result:", gc_result)

3. Configuration and Environment Issues

Debug configuration and environment problems:

import os
import sys
from typing import Dict, Any, List
import json

class ConfigurationDebugger:
"""Debug configuration and environment issues"""

def __init__(self, agent_id: str):
self.agent_id = agent_id
self.config_issues: List[Dict[str, Any]] = []

def validate_environment(self) -> Dict[str, Any]:
"""Validate environment configuration"""
issues = []

# Check required environment variables
required_vars = ["API_KEY", "DATABASE_URL", "REDIS_URL"]
for var in required_vars:
if not os.getenv(var):
issues.append({
"type": "missing_environment_variable",
"variable": var,
"severity": "critical"
})

# Check Python version
python_version = sys.version_info
if python_version < (3, 8):
issues.append({
"type": "python_version",
"current_version": f"{python_version.major}.{python_version.minor}",
"required_version": "3.8+",
"severity": "critical"
})

# Check available memory
try:
import psutil
memory = psutil.virtual_memory()
if memory.available < 1024 * 1024 * 1024: # 1GB
issues.append({
"type": "low_memory",
"available_mb": memory.available / 1024 / 1024,
"severity": "warning"
})
except ImportError:
issues.append({
"type": "missing_dependency",
"dependency": "psutil",
"severity": "warning"
})

return {
"status": "healthy" if not issues else "issues_found",
"issues": issues,
"total_issues": len(issues)
}

def validate_configuration(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Validate agent configuration"""
issues = []

# Check required configuration keys
required_keys = ["name", "version", "entry_point"]
for key in required_keys:
if key not in config:
issues.append({
"type": "missing_config_key",
"key": key,
"severity": "critical"
})

# Validate entry point
if "entry_point" in config:
entry_point = config["entry_point"]
if not os.path.exists(entry_point):
issues.append({
"type": "invalid_entry_point",
"path": entry_point,
"severity": "critical"
})

# Check port configuration
if "port" in config:
port = config["port"]
if not isinstance(port, int) or port < 1024 or port > 65535:
issues.append({
"type": "invalid_port",
"port": port,
"severity": "critical"
})

return {
"status": "valid" if not issues else "invalid",
"issues": issues,
"total_issues": len(issues)
}

def check_dependencies(self) -> Dict[str, Any]:
"""Check Python dependencies"""
issues = []

# Check critical dependencies
critical_deps = ["fastapi", "uvicorn", "grpc", "pydantic"]
for dep in critical_deps:
try:
__import__(dep)
except ImportError:
issues.append({
"type": "missing_dependency",
"dependency": dep,
"severity": "critical"
})

# Check optional dependencies
optional_deps = ["redis", "psutil", "prometheus_client"]
for dep in optional_deps:
try:
__import__(dep)
except ImportError:
issues.append({
"type": "missing_optional_dependency",
"dependency": dep,
"severity": "warning"
})

return {
"status": "healthy" if not issues else "dependencies_missing",
"issues": issues,
"total_issues": len(issues)
}

def get_system_info(self) -> Dict[str, Any]:
"""Get system information for debugging"""
return {
"agent_id": self.agent_id,
"python_version": sys.version,
"platform": sys.platform,
"working_directory": os.getcwd(),
"environment_variables": {
key: value for key, value in os.environ.items()
if key.startswith(("PAK_", "PAR_", "PIXELL_"))
},
"python_path": sys.path,
"loaded_modules": list(sys.modules.keys())
}

# Usage
config_debugger = ConfigurationDebugger("my-agent")

def debug_agent_configuration():
"""Debug agent configuration"""
# Validate environment
env_validation = config_debugger.validate_environment()
print("Environment Validation:", env_validation)

# Check dependencies
deps_check = config_debugger.check_dependencies()
print("Dependencies Check:", deps_check)

# Get system info
system_info = config_debugger.get_system_info()
print("System Info:", json.dumps(system_info, indent=2))

# Validate configuration
config = {
"name": "my-agent",
"version": "1.0.0",
"entry_point": "src/main.py",
"port": 8080
}

config_validation = config_debugger.validate_configuration(config)
print("Configuration Validation:", config_validation)

Debugging Multi-Agent Systems

1. Distributed Tracing

Trace requests across multiple agents:

class DistributedTracer:
"""Distributed tracer for multi-agent systems"""

def __init__(self, agent_id: str):
self.agent_id = agent_id
self.traces: Dict[str, List[Dict[str, Any]]] = {}

def start_distributed_trace(self, trace_id: str, operation: str) -> str:
"""Start a distributed trace"""
span_id = str(uuid.uuid4())

span = {
"trace_id": trace_id,
"span_id": span_id,
"parent_span_id": None,
"agent_id": self.agent_id,
"operation": operation,
"start_time": datetime.utcnow().isoformat(),
"end_time": None,
"status": "started",
"tags": {},
"logs": []
}

if trace_id not in self.traces:
self.traces[trace_id] = []

self.traces[trace_id].append(span)
return span_id

def finish_span(self, trace_id: str, span_id: str, status: str = "completed"):
"""Finish a span"""
for span in self.traces.get(trace_id, []):
if span["span_id"] == span_id:
span["end_time"] = datetime.utcnow().isoformat()
span["status"] = status
break

def add_span_log(self, trace_id: str, span_id: str, message: str, level: str = "info"):
"""Add log to span"""
for span in self.traces.get(trace_id, []):
if span["span_id"] == span_id:
span["logs"].append({
"message": message,
"level": level,
"timestamp": datetime.utcnow().isoformat()
})
break

def get_trace(self, trace_id: str) -> List[Dict[str, Any]]:
"""Get complete trace"""
return self.traces.get(trace_id, [])

def analyze_trace(self, trace_id: str) -> Dict[str, Any]:
"""Analyze trace for issues"""
trace = self.get_trace(trace_id)
if not trace:
return {"status": "trace_not_found"}

# Find failed spans
failed_spans = [span for span in trace if span["status"] == "failed"]

# Calculate total duration
start_times = [datetime.fromisoformat(span["start_time"]) for span in trace if span["start_time"]]
end_times = [datetime.fromisoformat(span["end_time"]) for span in trace if span["end_time"]]

total_duration = 0
if start_times and end_times:
total_duration = (max(end_times) - min(start_times)).total_seconds()

# Find slow spans
slow_spans = []
for span in trace:
if span["start_time"] and span["end_time"]:
duration = (datetime.fromisoformat(span["end_time"]) -
datetime.fromisoformat(span["start_time"])).total_seconds()
if duration > 1.0: # Slower than 1 second
slow_spans.append({
"span_id": span["span_id"],
"operation": span["operation"],
"duration": duration
})

return {
"status": "failed" if failed_spans else "completed",
"total_duration": total_duration,
"failed_spans": len(failed_spans),
"slow_spans": slow_spans,
"total_spans": len(trace)
}

# Usage
distributed_tracer = DistributedTracer("agent-a")

async def debug_distributed_request():
"""Debug distributed request"""
trace_id = str(uuid.uuid4())

# Start trace
root_span = distributed_tracer.start_distributed_trace(trace_id, "process_request")

try:
# Add logs
distributed_tracer.add_span_log(trace_id, root_span, "Starting request processing")

# Simulate calling other agents
agent_b_span = distributed_tracer.start_distributed_trace(trace_id, "call_agent_b")
distributed_tracer.add_span_log(trace_id, agent_b_span, "Calling agent B")
# Simulate agent B call
await asyncio.sleep(0.1)
distributed_tracer.finish_span(trace_id, agent_b_span, "completed")

agent_c_span = distributed_tracer.start_distributed_trace(trace_id, "call_agent_c")
distributed_tracer.add_span_log(trace_id, agent_c_span, "Calling agent C")
# Simulate agent C call
await asyncio.sleep(0.2)
distributed_tracer.finish_span(trace_id, agent_c_span, "completed")

# Finish root span
distributed_tracer.finish_span(trace_id, root_span, "completed")

except Exception as e:
distributed_tracer.add_span_log(trace_id, root_span, f"Error: {str(e)}", "error")
distributed_tracer.finish_span(trace_id, root_span, "failed")

# Analyze trace
analysis = distributed_tracer.analyze_trace(trace_id)
print("Trace Analysis:", analysis)

# Get full trace
trace = distributed_tracer.get_trace(trace_id)
print("Full Trace:", json.dumps(trace, indent=2))

2. Health Monitoring

Monitor health across multiple agents:

class MultiAgentHealthMonitor:
"""Monitor health of multiple agents"""

def __init__(self):
self.agents: Dict[str, Dict[str, Any]] = {}
self.health_checks: List[Dict[str, Any]] = []

def register_agent(self, agent_id: str, address: str, capabilities: List[str]):
"""Register agent for monitoring"""
self.agents[agent_id] = {
"address": address,
"capabilities": capabilities,
"status": "unknown",
"last_check": None,
"consecutive_failures": 0
}

async def check_agent_health(self, agent_id: str) -> Dict[str, Any]:
"""Check health of specific agent"""
if agent_id not in self.agents:
return {"status": "agent_not_found"}

agent = self.agents[agent_id]
start_time = time.time()

try:
# Try to connect to agent
async with AgentClient(agent["address"]) as client:
health = await client.health_check()

if health["status"] == "healthy":
agent["status"] = "healthy"
agent["consecutive_failures"] = 0
else:
agent["status"] = "unhealthy"
agent["consecutive_failures"] += 1

agent["last_check"] = datetime.utcnow().isoformat()

return {
"status": "healthy",
"response_time": time.time() - start_time,
"agent_status": agent["status"]
}

except Exception as e:
agent["status"] = "unreachable"
agent["consecutive_failures"] += 1
agent["last_check"] = datetime.utcnow().isoformat()

return {
"status": "unhealthy",
"error": str(e),
"response_time": time.time() - start_time
}

async def check_all_agents(self) -> Dict[str, Any]:
"""Check health of all agents"""
results = {}

for agent_id in self.agents:
results[agent_id] = await self.check_agent_health(agent_id)

# Analyze overall health
healthy_agents = sum(1 for result in results.values() if result["status"] == "healthy")
total_agents = len(results)

return {
"overall_status": "healthy" if healthy_agents == total_agents else "degraded",
"healthy_agents": healthy_agents,
"total_agents": total_agents,
"agent_results": results
}

def get_agent_status(self, agent_id: str) -> Dict[str, Any]:
"""Get status of specific agent"""
if agent_id not in self.agents:
return {"status": "agent_not_found"}

agent = self.agents[agent_id]
return {
"agent_id": agent_id,
"status": agent["status"],
"last_check": agent["last_check"],
"consecutive_failures": agent["consecutive_failures"],
"address": agent["address"],
"capabilities": agent["capabilities"]
}

def get_system_health(self) -> Dict[str, Any]:
"""Get overall system health"""
total_agents = len(self.agents)
healthy_agents = sum(1 for agent in self.agents.values() if agent["status"] == "healthy")
unhealthy_agents = sum(1 for agent in self.agents.values() if agent["status"] == "unhealthy")
unreachable_agents = sum(1 for agent in self.agents.values() if agent["status"] == "unreachable")

return {
"total_agents": total_agents,
"healthy_agents": healthy_agents,
"unhealthy_agents": unhealthy_agents,
"unreachable_agents": unreachable_agents,
"health_percentage": (healthy_agents / total_agents * 100) if total_agents > 0 else 0,
"status": "healthy" if unhealthy_agents == 0 else "degraded" if unhealthy_agents < total_agents / 2 else "critical"
}

# Usage
health_monitor = MultiAgentHealthMonitor()

# Register agents
health_monitor.register_agent("agent-a", "localhost:50051", ["data_processing"])
health_monitor.register_agent("agent-b", "localhost:50052", ["analysis"])
health_monitor.register_agent("agent-c", "localhost:50053", ["reporting"])

async def debug_system_health():
"""Debug system health"""
# Check all agents
health_results = await health_monitor.check_all_agents()
print("Health Results:", health_results)

# Get system health
system_health = health_monitor.get_system_health()
print("System Health:", system_health)

# Check specific agent
agent_status = health_monitor.get_agent_status("agent-a")
print("Agent A Status:", agent_status)

Debugging Tools and Utilities

1. Interactive Debugger

Interactive debugging session:

import pdb
import code
from typing import Dict, Any

class InteractiveDebugger:
"""Interactive debugger for agents"""

def __init__(self, agent_id: str):
self.agent_id = agent_id
self.debug_context = {}

def set_breakpoint(self, condition: str = None):
"""Set breakpoint with optional condition"""
if condition:
if eval(condition, self.debug_context):
pdb.set_trace()
else:
pdb.set_trace()

def debug_interactive(self, local_vars: Dict[str, Any]):
"""Start interactive debugging session"""
# Add agent context
debug_vars = {
"agent_id": self.agent_id,
"debug_context": self.debug_context,
**local_vars
}

# Start interactive session
code.interact(local=debug_vars)

def inspect_variables(self, variables: Dict[str, Any]) -> Dict[str, Any]:
"""Inspect variables for debugging"""
inspection = {}

for name, value in variables.items():
inspection[name] = {
"type": type(value).__name__,
"value": str(value)[:100], # Truncate long values
"size": len(str(value)) if hasattr(value, "__len__") else "N/A"
}

return inspection

# Usage
debugger = InteractiveDebugger("my-agent")

async def debug_agent_processing(request: Dict[str, Any]):
"""Debug agent processing with interactive debugging"""
# Set debug context
debugger.debug_context["request"] = request
debugger.debug_context["start_time"] = datetime.utcnow()

try:
# Process request
result = await process_request(request)

# Set breakpoint for inspection
debugger.set_breakpoint("result.get('success') == False")

return result

except Exception as e:
# Interactive debugging on error
debugger.debug_interactive({
"request": request,
"error": e,
"traceback": traceback.format_exc()
})
raise

2. Debug Dashboard

Create a debug dashboard for monitoring:

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
import json

class DebugDashboard:
"""Debug dashboard for agent monitoring"""

def __init__(self, agent_id: str):
self.agent_id = agent_id
self.app = FastAPI(title=f"Debug Dashboard - {agent_id}")
self.debug_data = {
"logs": [],
"metrics": {},
"traces": [],
"health": {}
}
self.setup_routes()

def setup_routes(self):
"""Setup dashboard routes"""

@self.app.get("/")
async def dashboard():
"""Debug dashboard HTML"""
return HTMLResponse("""
<!DOCTYPE html>
<html>
<head>
<title>Agent Debug Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
<h1>Agent Debug Dashboard</h1>
<div id="logs"></div>
<div id="metrics"></div>
<div id="traces"></div>
<script>
// WebSocket connection for real-time updates
const ws = new WebSocket('ws://localhost:8000/ws');
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
updateDashboard(data);
};

function updateDashboard(data) {
// Update logs
const logsDiv = document.getElementById('logs');
logsDiv.innerHTML = data.logs.map(log =>
`<div>${log.timestamp}: ${log.message}</div>`
).join('');

// Update metrics
const metricsDiv = document.getElementById('metrics');
metricsDiv.innerHTML = JSON.stringify(data.metrics, null, 2);
}
</script>
</body>
</html>
""")

@self.app.get("/debug/logs")
async def get_logs():
"""Get debug logs"""
return self.debug_data["logs"]

@self.app.get("/debug/metrics")
async def get_metrics():
"""Get debug metrics"""
return self.debug_data["metrics"]

@self.app.get("/debug/traces")
async def get_traces():
"""Get debug traces"""
return self.debug_data["traces"]

@self.app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket for real-time updates"""
await websocket.accept()
while True:
# Send debug data
await websocket.send_text(json.dumps(self.debug_data))
await asyncio.sleep(1)

def add_log(self, message: str, level: str = "info", context: Dict[str, Any] = None):
"""Add log to dashboard"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"message": message,
"level": level,
"context": context or {}
}

self.debug_data["logs"].append(log_entry)

# Keep only last 1000 logs
if len(self.debug_data["logs"]) > 1000:
self.debug_data["logs"] = self.debug_data["logs"][-1000:]

def update_metrics(self, metrics: Dict[str, Any]):
"""Update debug metrics"""
self.debug_data["metrics"].update(metrics)

def add_trace(self, trace: Dict[str, Any]):
"""Add trace to dashboard"""
self.debug_data["traces"].append(trace)

# Keep only last 100 traces
if len(self.debug_data["traces"]) > 100:
self.debug_data["traces"] = self.debug_data["traces"][-100:]

# Usage
dashboard = DebugDashboard("my-agent")

# Add some debug data
dashboard.add_log("Agent started", "info")
dashboard.update_metrics({"cpu_usage": 45.2, "memory_usage": 123.4})
dashboard.add_trace({"operation": "process_request", "duration": 0.123})

# Start dashboard
# uvicorn dashboard.app:app --host 0.0.0.0 --port 8000

Best Practices for Debugging

1. Structured Logging

Use structured logging for better debugging:

import structlog
from typing import Dict, Any

# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)

class StructuredAgentLogger:
"""Structured logger for agents"""

def __init__(self, agent_id: str):
self.logger = structlog.get_logger("agent", agent_id=agent_id)

def log_request(self, request_id: str, request: Dict[str, Any]):
"""Log request with structure"""
self.logger.info(
"request_received",
request_id=request_id,
request_type=request.get("type"),
request_size=len(str(request))
)

def log_processing_step(self, request_id: str, step: str, duration: float):
"""Log processing step"""
self.logger.info(
"processing_step",
request_id=request_id,
step=step,
duration_ms=duration * 1000
)

def log_error(self, request_id: str, error: Exception, context: Dict[str, Any] = None):
"""Log error with context"""
self.logger.error(
"processing_error",
request_id=request_id,
error=str(error),
error_type=type(error).__name__,
context=context or {}
)

2. Debug Configuration

Configure debugging based on environment:

import os
from typing import Dict, Any

class DebugConfiguration:
"""Debug configuration management"""

def __init__(self):
self.config = self._load_config()

def _load_config(self) -> Dict[str, Any]:
"""Load debug configuration"""
return {
"debug_enabled": os.getenv("DEBUG", "false").lower() == "true",
"log_level": os.getenv("LOG_LEVEL", "INFO"),
"trace_enabled": os.getenv("TRACE_ENABLED", "false").lower() == "true",
"profile_enabled": os.getenv("PROFILE_ENABLED", "false").lower() == "true",
"debug_port": int(os.getenv("DEBUG_PORT", "5678")),
"max_log_size": int(os.getenv("MAX_LOG_SIZE", "1000")),
"trace_sampling_rate": float(os.getenv("TRACE_SAMPLING_RATE", "0.1"))
}

def is_debug_enabled(self) -> bool:
"""Check if debugging is enabled"""
return self.config["debug_enabled"]

def should_trace(self) -> bool:
"""Check if tracing should be enabled"""
return self.config["trace_enabled"]

def should_profile(self) -> bool:
"""Check if profiling should be enabled"""
return self.config["profile_enabled"]

def get_log_level(self) -> str:
"""Get log level"""
return self.config["log_level"]

def get_debug_port(self) -> int:
"""Get debug port"""
return self.config["debug_port"]

# Usage
debug_config = DebugConfiguration()

if debug_config.is_debug_enabled():
# Enable debugging
debugger = AgentDebugger("my-agent", debug_config.get_log_level())

if debug_config.should_trace():
# Enable tracing
tracer = RequestTracer("my-agent")

if debug_config.should_profile():
# Enable profiling
profiler = PerformanceProfiler("my-agent")

Next Steps

After setting up debugging:

  1. UI Integration Guide - Integrate with user interfaces
  2. Best Practices - Follow development best practices
  3. Full Deployment Guide - Deploy with proper monitoring
  4. Agent-to-Agent Communication - Debug multi-agent systems

Ready to debug your agents? Check out UI Integration Guide to learn how to integrate with user interfaces!