Message Routing
Message Routing is the intelligent system that determines how messages flow between agents in the Pixell ecosystem. It ensures optimal delivery, load balancing, failover handling, and efficient resource utilization across the agent network.
Routing Overview
The message routing system provides:
- 🎯 Intelligent Routing - Route messages to the most appropriate agent
- ⚖️ Load Balancing - Distribute load across available agents
- 🔄 Failover Handling - Automatic failover when agents are unavailable
- 📊 Performance Optimization - Route based on performance metrics
- 🛡️ Security Routing - Route based on security policies
Routing Strategies
1. Capability-Based Routing
Route messages based on agent capabilities and requirements:
class CapabilityRouter:
def __init__(self, capability_registry):
self.capability_registry = capability_registry
self.routing_cache = {}
def route_message(self, message, required_capabilities):
"""Route message to agent with required capabilities"""
# Check cache first
cache_key = f"{hash(str(required_capabilities))}"
if cache_key in self.routing_cache:
cached_result, timestamp = self.routing_cache[cache_key]
if time.time() - timestamp < 300: # 5 minute cache
return cached_result
# Find agents with required capabilities
candidates = []
for capability in required_capabilities:
agents = self.capability_registry.find_agents_by_capability(capability)
candidates.extend(agents)
if not candidates:
return None
# Score and rank candidates
scored_candidates = []
for agent_id in set(candidates):
score = self._calculate_capability_score(agent_id, required_capabilities)
scored_candidates.append((agent_id, score))
# Get best match
best_agent = max(scored_candidates, key=lambda x: x[1])
# Cache result
self.routing_cache[cache_key] = (best_agent, time.time())
return best_agent
def _calculate_capability_score(self, agent_id, required_capabilities):
"""Calculate how well an agent matches required capabilities"""
agent_capabilities = self.capability_registry.get_agent_capabilities(agent_id)
matches = 0
total_required = len(required_capabilities)
for required in required_capabilities:
for agent_cap in agent_capabilities:
if (agent_cap["name"] == required["name"] and
agent_cap["confidence"] >= required.get("min_confidence", 0.5)):
matches += 1
break
return matches / total_required if total_required > 0 else 0
2. Load-Based Routing
Route messages based on agent load and availability:
class LoadBasedRouter:
def __init__(self, agent_registry):
self.agent_registry = agent_registry
self.load_monitor = LoadMonitor()
def route_message(self, message, required_capabilities):
"""Route message to least loaded agent"""
# Find agents with required capabilities
candidates = []
for capability in required_capabilities:
agents = self.agent_registry.find_agents_by_capability(capability)
candidates.extend(agents)
# Filter by availability
available_agents = [
agent_id for agent_id in set(candidates)
if self.agent_registry.is_agent_available(agent_id)
]
if not available_agents:
return None
# Get current load for each agent
agent_loads = []
for agent_id in available_agents:
load_factor = self.load_monitor.get_agent_load(agent_id)
agent_loads.append((agent_id, load_factor))
# Choose agent with lowest load
best_agent = min(agent_loads, key=lambda x: x[1])
return best_agent[0]
def get_agent_load(self, agent_id):
"""Get current load factor for agent"""
agent_info = self.agent_registry.get_agent_info(agent_id)
return agent_info.get("load_factor", 0.0)
3. Context-Aware Routing
Route messages based on context and historical performance:
class ContextAwareRouter:
def __init__(self, agent_registry, context_analyzer):
self.agent_registry = agent_registry
self.context_analyzer = context_analyzer
self.performance_history = {}
def route_message(self, message, required_capabilities, context=None):
"""Route message based on context and performance history"""
# Find agents with required capabilities
candidates = []
for capability in required_capabilities:
agents = self.agent_registry.find_agents_by_capability(capability)
candidates.extend(agents)
if not candidates:
return None
# Score candidates based on context
scored_candidates = []
for agent_id in set(candidates):
score = self._calculate_context_score(agent_id, context, message)
scored_candidates.append((agent_id, score))
# Return best match
return max(scored_candidates, key=lambda x: x[1])
def _calculate_context_score(self, agent_id, context, message):
"""Calculate agent score based on context"""
score = 0
# Base availability score
if self.agent_registry.is_agent_available(agent_id):
score += 0.3
# Performance history score
performance_score = self._get_performance_score(agent_id, context)
score += performance_score * 0.4
# Context matching score
context_score = self.context_analyzer.analyze_agent_context(agent_id, context)
score += context_score * 0.3
return score
def _get_performance_score(self, agent_id, context):
"""Get performance score based on historical data"""
if agent_id not in self.performance_history:
return 0.5 # Default score for new agents
history = self.performance_history[agent_id]
# Calculate average performance
if not history:
return 0.5
avg_response_time = sum(h["response_time"] for h in history) / len(history)
avg_success_rate = sum(h["success"] for h in history) / len(history)
# Normalize scores (lower response time = higher score)
response_score = max(0, 1 - (avg_response_time / 5.0)) # 5s max
success_score = avg_success_rate
return (response_score + success_score) / 2
4. Geographic Routing
Route messages based on geographic proximity:
class GeographicRouter:
def __init__(self, agent_registry, geo_resolver):
self.agent_registry = agent_registry
self.geo_resolver = geo_resolver
def route_message(self, message, required_capabilities, source_location=None):
"""Route message to geographically closest agent"""
# Find agents with required capabilities
candidates = []
for capability in required_capabilities:
agents = self.agent_registry.find_agents_by_capability(capability)
candidates.extend(agents)
if not candidates:
return None
# Calculate distances
agent_distances = []
for agent_id in set(candidates):
agent_location = self.agent_registry.get_agent_location(agent_id)
if agent_location and source_location:
distance = self.geo_resolver.calculate_distance(
source_location, agent_location
)
agent_distances.append((agent_id, distance))
if not agent_distances:
# Fallback to first available agent
return candidates[0]
# Choose closest agent
closest_agent = min(agent_distances, key=lambda x: x[1])
return closest_agent[0]
Load Balancing
Round Robin Load Balancing
class RoundRobinBalancer:
def __init__(self, agent_registry):
self.agent_registry = agent_registry
self.current_index = {}
def get_next_agent(self, capability):
"""Get next agent in round-robin fashion"""
agents = self.agent_registry.find_agents_by_capability(capability)
if not agents:
return None
# Initialize index for this capability
if capability not in self.current_index:
self.current_index[capability] = 0
# Get next agent
agent = agents[self.current_index[capability]]
# Update index
self.current_index[capability] = (self.current_index[capability] + 1) % len(agents)
return agent
Weighted Load Balancing
class WeightedBalancer:
def __init__(self, agent_registry):
self.agent_registry = agent_registry
self.agent_weights = {}
def set_agent_weight(self, agent_id, weight):
"""Set weight for agent (higher weight = more requests)"""
self.agent_weights[agent_id] = weight
def get_weighted_agent(self, capability):
"""Get agent based on weighted selection"""
agents = self.agent_registry.find_agents_by_capability(capability)
if not agents:
return None
# Calculate total weight
total_weight = sum(self.agent_weights.get(agent_id, 1) for agent_id in agents)
# Random selection based on weights
random_value = random.uniform(0, total_weight)
current_weight = 0
for agent_id in agents:
current_weight += self.agent_weights.get(agent_id, 1)
if random_value <= current_weight:
return agent_id
# Fallback to last agent
return agents[-1]
Failover Handling
Primary/Secondary Failover
class FailoverRouter:
def __init__(self, agent_registry, health_monitor):
self.agent_registry = agent_registry
self.health_monitor = health_monitor
self.failover_configs = {}
def configure_failover(self, capability, primary_agents, secondary_agents):
"""Configure failover for a capability"""
self.failover_configs[capability] = {
"primary": primary_agents,
"secondary": secondary_agents,
"current_primary": 0
}
def route_with_failover(self, message, capability):
"""Route message with failover support"""
if capability not in self.failover_configs:
return None
config = self.failover_configs[capability]
# Try primary agents first
for i, agent_id in enumerate(config["primary"]):
if self.health_monitor.is_agent_healthy(agent_id):
return agent_id
# Try secondary agents
for agent_id in config["secondary"]:
if self.health_monitor.is_agent_healthy(agent_id):
return agent_id
# No healthy agents available
return None
def handle_agent_failure(self, agent_id):
"""Handle agent failure and update routing"""
for capability, config in self.failover_configs.items():
if agent_id in config["primary"]:
# Remove failed agent from primary list
config["primary"] = [a for a in config["primary"] if a != agent_id]
# Promote secondary agent if available
if config["secondary"]:
promoted_agent = config["secondary"].pop(0)
config["primary"].append(promoted_agent)
Circuit Breaker Pattern
class CircuitBreakerRouter:
def __init__(self, agent_registry):
self.agent_registry = agent_registry
self.circuit_breakers = {}
self.failure_counts = {}
self.last_failure_times = {}
def route_with_circuit_breaker(self, message, capability, max_failures=5, timeout=60):
"""Route message with circuit breaker protection"""
agents = self.agent_registry.find_agents_by_capability(capability)
for agent_id in agents:
# Check circuit breaker state
if self._is_circuit_open(agent_id, timeout):
continue
try:
# Attempt to route to agent
result = self._route_to_agent(agent_id, message)
# Reset failure count on success
self.failure_counts[agent_id] = 0
return result
except Exception as e:
# Record failure
self._record_failure(agent_id)
continue
# All agents failed
raise RoutingError("All agents failed or circuit breakers open")
def _is_circuit_open(self, agent_id, timeout):
"""Check if circuit breaker is open for agent"""
if agent_id not in self.failure_counts:
return False
failure_count = self.failure_counts[agent_id]
last_failure = self.last_failure_times.get(agent_id, 0)
# Check if we've exceeded failure threshold
if failure_count >= 5:
# Check if timeout has passed
if time.time() - last_failure > timeout:
# Reset circuit breaker
self.failure_counts[agent_id] = 0
return False
return True
return False
def _record_failure(self, agent_id):
"""Record failure for agent"""
self.failure_counts[agent_id] = self.failure_counts.get(agent_id, 0) + 1
self.last_failure_times[agent_id] = time.time()
Message Queuing
Priority Queue Routing
class PriorityQueueRouter:
def __init__(self, agent_registry):
self.agent_registry = agent_registry
self.message_queues = {}
self.priority_handlers = {}
def enqueue_message(self, message, capability, priority=0):
"""Enqueue message with priority"""
if capability not in self.message_queues:
self.message_queues[capability] = []
# Add message to queue with priority
self.message_queues[capability].append({
"message": message,
"priority": priority,
"timestamp": time.time()
})
# Sort by priority (higher priority first)
self.message_queues[capability].sort(key=lambda x: x["priority"], reverse=True)
def process_queue(self, capability):
"""Process messages in queue"""
if capability not in self.message_queues:
return
queue = self.message_queues[capability]
while queue:
# Get highest priority message
message_data = queue.pop(0)
message = message_data["message"]
# Route message
agent_id = self._route_message(message, capability)
if agent_id:
# Send message to agent
self._send_to_agent(agent_id, message)
else:
# No available agents, re-queue message
queue.insert(0, message_data)
break
Dead Letter Queue
class DeadLetterQueue:
def __init__(self, max_retries=3):
self.max_retries = max_retries
self.dlq = []
self.retry_counts = {}
def handle_failed_message(self, message, error, agent_id):
"""Handle failed message routing"""
message_id = message.get("message_id")
if message_id not in self.retry_counts:
self.retry_counts[message_id] = 0
self.retry_counts[message_id] += 1
if self.retry_counts[message_id] < self.max_retries:
# Retry message
return self._retry_message(message, agent_id)
else:
# Move to dead letter queue
self.dlq.append({
"message": message,
"error": str(error),
"agent_id": agent_id,
"timestamp": time.time(),
"retry_count": self.retry_counts[message_id]
})
return False
def _retry_message(self, message, agent_id):
"""Retry failed message"""
# Implement retry logic
return True
Performance Optimization
Caching and Memoization
class CachedRouter:
def __init__(self, base_router, cache_ttl=300):
self.base_router = base_router
self.cache_ttl = cache_ttl
self.routing_cache = {}
def route_message(self, message, capability):
"""Route message with caching"""
cache_key = self._generate_cache_key(message, capability)
# Check cache
if cache_key in self.routing_cache:
cached_result, timestamp = self.routing_cache[cache_key]
if time.time() - timestamp < self.cache_ttl:
return cached_result
# Route using base router
result = self.base_router.route_message(message, capability)
# Cache result
self.routing_cache[cache_key] = (result, time.time())
return result
def _generate_cache_key(self, message, capability):
"""Generate cache key for message"""
# Create hash based on message content and capability
content = json.dumps(message, sort_keys=True)
return hashlib.md5(f"{content}:{capability}".encode()).hexdigest()
Adaptive Routing
class AdaptiveRouter:
def __init__(self, agent_registry, performance_monitor):
self.agent_registry = agent_registry
self.performance_monitor = performance_monitor
self.routing_strategies = {
"capability_based": CapabilityRouter(agent_registry),
"load_based": LoadBasedRouter(agent_registry),
"context_aware": ContextAwareRouter(agent_registry, None)
}
self.current_strategy = "capability_based"
self.strategy_performance = {}
def route_message(self, message, capability):
"""Route message using adaptive strategy"""
# Get performance metrics for current strategy
current_performance = self.performance_monitor.get_strategy_performance(
self.current_strategy
)
# Check if we should switch strategies
if self._should_switch_strategy(current_performance):
self._switch_strategy()
# Route using current strategy
router = self.routing_strategies[self.current_strategy]
return router.route_message(message, capability)
def _should_switch_strategy(self, current_performance):
"""Determine if we should switch routing strategies"""
# Switch if performance is below threshold
if current_performance["success_rate"] < 0.8:
return True
if current_performance["avg_response_time"] > 5.0:
return True
return False
def _switch_strategy(self):
"""Switch to better performing strategy"""
# Get performance for all strategies
strategy_performance = {}
for strategy_name in self.routing_strategies:
performance = self.performance_monitor.get_strategy_performance(strategy_name)
strategy_performance[strategy_name] = performance
# Choose best performing strategy
best_strategy = max(strategy_performance.items(),
key=lambda x: x[1]["success_rate"])
self.current_strategy = best_strategy[0]
Monitoring and Analytics
Routing Metrics
class RoutingMetrics:
def __init__(self):
self.metrics = {
"total_routes": 0,
"successful_routes": 0,
"failed_routes": 0,
"average_response_time": 0,
"routing_errors": 0,
"strategy_performance": {}
}
def record_route(self, strategy, success, response_time, error=None):
"""Record routing metrics"""
self.metrics["total_routes"] += 1
if success:
self.metrics["successful_routes"] += 1
else:
self.metrics["failed_routes"] += 1
if error:
self.metrics["routing_errors"] += 1
# Update average response time
total_successful = self.metrics["successful_routes"]
if total_successful > 0:
current_avg = self.metrics["average_response_time"]
self.metrics["average_response_time"] = (
(current_avg * (total_successful - 1) + response_time) / total_successful
)
# Update strategy performance
if strategy not in self.metrics["strategy_performance"]:
self.metrics["strategy_performance"][strategy] = {
"total": 0,
"successful": 0,
"avg_response_time": 0
}
strategy_metrics = self.metrics["strategy_performance"][strategy]
strategy_metrics["total"] += 1
if success:
strategy_metrics["successful"] += 1
# Update average response time for strategy
successful = strategy_metrics["successful"]
current_avg = strategy_metrics["avg_response_time"]
strategy_metrics["avg_response_time"] = (
(current_avg * (successful - 1) + response_time) / successful
)
def get_metrics(self):
"""Get current routing metrics"""
return self.metrics.copy()
Configuration
Routing Configuration
# routing-config.yaml
routing:
strategies:
primary: "capability_based"
fallback: "load_based"
adaptive: true
load_balancing:
algorithm: "round_robin"
weights:
agent-1: 1.0
agent-2: 0.8
agent-3: 1.2
failover:
enabled: true
max_retries: 3
timeout: 30
circuit_breaker:
enabled: true
failure_threshold: 5
recovery_timeout: 60
caching:
enabled: true
ttl: 300
max_size: 1000
monitoring:
metrics_enabled: true
performance_tracking: true
alerting:
success_rate_threshold: 0.8
response_time_threshold: 5.0
Next Steps
After understanding Message Routing:
- Security - Explore security considerations
- LangGraph Integration - Advanced workflow integration
- A2A Protocol - Learn about the communication protocol
- Intent System - Intent recognition and routing
Ready to learn about security? Check out Security to understand security considerations for agent communication!