Agent-to-Agent Communication
Complete guide for implementing multi-agent collaboration and communication patterns in Pixell systems.
Note: This documentation is generated by AI based on the source code, and therefore it may have some incorrect knowledge of the project. In that case, please contact engineering@pixell.global
Overview
Agent-to-Agent (A2A) communication enables multiple agents to collaborate, share data, and coordinate tasks. This guide covers the implementation patterns, protocols, and best practices for building collaborative agent systems.
Architecture
Communication Patterns
1. Request-Response Pattern
Direct synchronous communication between agents:
from pixell_runtime.proto import agent_pb2, agent_pb2_grpc
import grpc
from typing import Dict, Any, Optional
import asyncio
class AgentClient:
"""Client for agent-to-agent communication"""
def __init__(self, agent_address: str):
self.agent_address = agent_address
self.channel = None
self.stub = None
async def __aenter__(self):
"""Async context manager entry"""
self.channel = grpc.aio.insecure_channel(self.agent_address)
self.stub = agent_pb2_grpc.AgentServiceStub(self.channel)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.channel:
await self.channel.close()
async def invoke_agent(self, message: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Invoke another agent"""
try:
# Prepare request
request = agent_pb2.InvokeRequest(
message=message,
context=context or {}
)
# Make gRPC call
response = await self.stub.Invoke(request)
return {
"success": response.success,
"response": response.response,
"metadata": dict(response.metadata)
}
except grpc.RpcError as e:
return {
"success": False,
"error": f"gRPC error: {e.code()} - {e.details()}"
}
async def get_capabilities(self) -> Dict[str, Any]:
"""Get agent capabilities"""
try:
request = agent_pb2.CapabilitiesRequest()
response = await self.stub.DescribeCapabilities(request)
return {
"capabilities": list(response.capabilities),
"description": response.description,
"version": response.version
}
except grpc.RpcError as e:
return {
"success": False,
"error": f"gRPC error: {e.code()} - {e.details()}"
}
async def health_check(self) -> Dict[str, Any]:
"""Check agent health"""
try:
request = agent_pb2.HealthRequest()
response = await self.stub.Health(request)
return {
"status": response.status,
"message": response.message,
"timestamp": response.timestamp
}
except grpc.RpcError as e:
return {
"status": "unhealthy",
"error": f"gRPC error: {e.code()} - {e.details()}"
}
# Usage example
async def collaborate_with_agent():
"""Example of agent collaboration"""
async with AgentClient("localhost:50051") as client:
# Check if agent is healthy
health = await client.health_check()
if health["status"] != "healthy":
print(f"Agent is unhealthy: {health}")
return
# Get agent capabilities
capabilities = await client.get_capabilities()
print(f"Agent capabilities: {capabilities}")
# Invoke agent
result = await client.invoke_agent(
message="Process this data",
context={"priority": "high", "type": "analysis"}
)
if result["success"]:
print(f"Agent response: {result['response']}")
else:
print(f"Agent error: {result['error']}")
2. Event-Driven Pattern
Asynchronous communication using message brokers:
import asyncio
import json
from typing import Dict, Any, Callable, List
from dataclasses import dataclass
from datetime import datetime
import uuid
@dataclass
class AgentEvent:
"""Agent event structure"""
event_id: str
event_type: str
source_agent: str
target_agent: str
payload: Dict[str, Any]
timestamp: datetime
correlation_id: str = None
class EventBus:
"""Event bus for agent communication"""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
self.event_history: List[AgentEvent] = []
def subscribe(self, event_type: str, handler: Callable):
"""Subscribe to event type"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def publish(self, event: AgentEvent):
"""Publish event to subscribers"""
# Store event in history
self.event_history.append(event)
# Notify subscribers
if event.event_type in self.subscribers:
tasks = []
for handler in self.subscribers[event.event_type]:
tasks.append(handler(event))
# Run handlers concurrently
await asyncio.gather(*tasks, return_exceptions=True)
async def publish_event(self, event_type: str, source_agent: str,
target_agent: str, payload: Dict[str, Any],
correlation_id: str = None) -> str:
"""Publish event with automatic ID generation"""
event = AgentEvent(
event_id=str(uuid.uuid4()),
event_type=event_type,
source_agent=source_agent,
target_agent=target_agent,
payload=payload,
timestamp=datetime.utcnow(),
correlation_id=correlation_id
)
await self.publish(event)
return event.event_id
class EventDrivenAgent:
"""Agent that communicates via events"""
def __init__(self, agent_id: str, event_bus: EventBus):
self.agent_id = agent_id
self.event_bus = event_bus
self.event_handlers = {}
def register_handler(self, event_type: str, handler: Callable):
"""Register event handler"""
self.event_handlers[event_type] = handler
self.event_bus.subscribe(event_type, self._handle_event)
async def _handle_event(self, event: AgentEvent):
"""Handle incoming event"""
if event.target_agent != self.agent_id and event.target_agent != "*":
return # Not for this agent
if event.event_type in self.event_handlers:
try:
await self.event_handlers[event.event_type](event)
except Exception as e:
print(f"Error handling event {event.event_id}: {e}")
async def send_event(self, event_type: str, target_agent: str,
payload: Dict[str, Any], correlation_id: str = None) -> str:
"""Send event to another agent"""
return await self.event_bus.publish_event(
event_type=event_type,
source_agent=self.agent_id,
target_agent=target_agent,
payload=payload,
correlation_id=correlation_id
)
# Usage example
async def event_driven_collaboration():
"""Example of event-driven collaboration"""
event_bus = EventBus()
# Create agents
data_processor = EventDrivenAgent("data-processor", event_bus)
report_generator = EventDrivenAgent("report-generator", event_bus)
notification_sender = EventDrivenAgent("notification-sender", event_bus)
# Register event handlers
async def handle_data_processed(event: AgentEvent):
"""Handle data processed event"""
print(f"Data processed: {event.payload}")
# Send event to report generator
await report_generator.send_event(
"generate_report",
"report-generator",
{"data_id": event.payload["data_id"]},
event.correlation_id
)
async def handle_report_generated(event: AgentEvent):
"""Handle report generated event"""
print(f"Report generated: {event.payload}")
# Send notification
await notification_sender.send_event(
"send_notification",
"notification-sender",
{"report_id": event.payload["report_id"]},
event.correlation_id
)
async def handle_notification_sent(event: AgentEvent):
"""Handle notification sent event"""
print(f"Notification sent: {event.payload}")
# Register handlers
data_processor.register_handler("data_processed", handle_data_processed)
report_generator.register_handler("report_generated", handle_report_generated)
notification_sender.register_handler("notification_sent", handle_notification_sent)
# Start the workflow
await data_processor.send_event(
"data_processed",
"data-processor",
{"data_id": "123", "status": "completed"},
correlation_id="workflow-001"
)
3. Workflow Orchestration Pattern
Coordinate complex multi-agent workflows:
from enum import Enum
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
import asyncio
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class WorkflowTask:
"""Workflow task definition"""
task_id: str
agent_id: str
task_type: str
parameters: Dict[str, Any]
dependencies: List[str] = None
status: TaskStatus = TaskStatus.PENDING
result: Dict[str, Any] = None
error: str = None
class WorkflowOrchestrator:
"""Orchestrate multi-agent workflows"""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.workflows: Dict[str, List[WorkflowTask]] = {}
self.active_tasks: Dict[str, WorkflowTask] = {}
async def create_workflow(self, workflow_id: str, tasks: List[WorkflowTask]) -> str:
"""Create a new workflow"""
self.workflows[workflow_id] = tasks
# Start workflow execution
await self._execute_workflow(workflow_id)
return workflow_id
async def _execute_workflow(self, workflow_id: str):
"""Execute workflow tasks"""
tasks = self.workflows[workflow_id]
# Create task dependency graph
task_graph = self._build_dependency_graph(tasks)
# Execute tasks in dependency order
await self._execute_tasks(workflow_id, tasks, task_graph)
def _build_dependency_graph(self, tasks: List[WorkflowTask]) -> Dict[str, List[str]]:
"""Build task dependency graph"""
graph = {}
for task in tasks:
graph[task.task_id] = task.dependencies or []
return graph
async def _execute_tasks(self, workflow_id: str, tasks: List[WorkflowTask],
task_graph: Dict[str, List[str]]):
"""Execute tasks respecting dependencies"""
completed_tasks = set()
while len(completed_tasks) < len(tasks):
# Find tasks ready to execute
ready_tasks = []
for task in tasks:
if (task.status == TaskStatus.PENDING and
all(dep in completed_tasks for dep in (task.dependencies or []))):
ready_tasks.append(task)
if not ready_tasks:
# No ready tasks, check for failures
failed_tasks = [t for t in tasks if t.status == TaskStatus.FAILED]
if failed_tasks:
print(f"Workflow {workflow_id} failed due to task failures")
break
# Execute ready tasks concurrently
if ready_tasks:
await self._execute_task_batch(workflow_id, ready_tasks)
# Update completed tasks
for task in ready_tasks:
if task.status == TaskStatus.COMPLETED:
completed_tasks.add(task.task_id)
async def _execute_task_batch(self, workflow_id: str, tasks: List[WorkflowTask]):
"""Execute a batch of tasks concurrently"""
async def execute_single_task(task: WorkflowTask):
"""Execute a single task"""
task.status = TaskStatus.RUNNING
try:
# Send task to agent
result = await self._send_task_to_agent(task)
if result["success"]:
task.status = TaskStatus.COMPLETED
task.result = result["result"]
else:
task.status = TaskStatus.FAILED
task.error = result["error"]
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
# Execute all tasks concurrently
await asyncio.gather(*[execute_single_task(task) for task in tasks])
async def _send_task_to_agent(self, task: WorkflowTask) -> Dict[str, Any]:
"""Send task to agent via event bus"""
# This would integrate with your agent communication system
# For now, simulate task execution
await asyncio.sleep(0.1) # Simulate processing time
return {
"success": True,
"result": {
"task_id": task.task_id,
"output": f"Processed {task.task_type}",
"timestamp": datetime.utcnow().isoformat()
}
}
# Usage example
async def orchestrate_workflow():
"""Example of workflow orchestration"""
event_bus = EventBus()
orchestrator = WorkflowOrchestrator(event_bus)
# Define workflow tasks
tasks = [
WorkflowTask(
task_id="data_ingestion",
agent_id="data-agent",
task_type="ingest_data",
parameters={"source": "database", "table": "sales"}
),
WorkflowTask(
task_id="data_processing",
agent_id="processing-agent",
task_type="process_data",
parameters={"algorithm": "ml_classification"},
dependencies=["data_ingestion"]
),
WorkflowTask(
task_id="report_generation",
agent_id="report-agent",
task_type="generate_report",
parameters={"format": "pdf", "template": "standard"},
dependencies=["data_processing"]
),
WorkflowTask(
task_id="notification",
agent_id="notification-agent",
task_type="send_notification",
parameters={"channel": "email", "recipients": ["admin@company.com"]},
dependencies=["report_generation"]
)
]
# Create and execute workflow
workflow_id = await orchestrator.create_workflow("data-analysis-workflow", tasks)
print(f"Created workflow: {workflow_id}")
Service Implementation
1. gRPC Service Implementation
Implement the agent service interface:
from pixell_runtime.proto import agent_pb2, agent_pb2_grpc
from google.protobuf.timestamp_pb2 import Timestamp
import time
import json
from typing import Dict, Any
class AgentService(agent_pb2_grpc.AgentServiceServicer):
"""gRPC service implementation for agent communication"""
def __init__(self, agent_id: str, capabilities: List[str], description: str):
self.agent_id = agent_id
self.capabilities = capabilities
self.description = description
self.start_time = time.time()
def Health(self, request, context):
"""Health check endpoint"""
return agent_pb2.HealthResponse(
status="healthy",
message=f"Agent {self.agent_id} is running",
timestamp=Timestamp(seconds=int(time.time()))
)
def DescribeCapabilities(self, request, context):
"""Describe agent capabilities"""
return agent_pb2.CapabilitiesResponse(
capabilities=self.capabilities,
description=self.description,
version="1.0.0"
)
def Invoke(self, request, context):
"""Handle agent invocation"""
try:
# Parse request
message = request.message
context_dict = dict(request.context) if request.context else {}
# Process the request
result = self._process_request(message, context_dict)
return agent_pb2.InvokeResponse(
response=json.dumps(result),
success=True,
metadata={"agent": self.agent_id, "version": "1.0.0"}
)
except Exception as e:
return agent_pb2.InvokeResponse(
response=f"Error: {str(e)}",
success=False,
metadata={"error": str(e)}
)
def _process_request(self, message: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Process incoming request"""
# Implement your agent's processing logic here
return {
"message": f"Processed: {message}",
"context": context,
"agent": self.agent_id,
"timestamp": time.time()
}
# Server setup
async def start_agent_server(agent_id: str, port: int = 50051):
"""Start gRPC server for agent"""
server = grpc.aio.server()
# Create service instance
service = AgentService(
agent_id=agent_id,
capabilities=["data_processing", "analysis", "reporting"],
description="Multi-purpose data processing agent"
)
# Add service to server
agent_pb2_grpc.add_AgentServiceServicer_to_server(service, server)
# Start server
listen_addr = f'[::]:{port}'
server.add_insecure_port(listen_addr)
await server.start()
print(f"Agent {agent_id} server started on port {port}")
try:
await server.wait_for_termination()
except KeyboardInterrupt:
await server.stop(0)
2. Event Handler Implementation
Handle events from other agents:
class EventHandler:
"""Handle events from other agents"""
def __init__(self, agent_id: str, event_bus: EventBus):
self.agent_id = agent_id
self.event_bus = event_bus
self.handlers = {}
def register_handler(self, event_type: str, handler: Callable):
"""Register event handler"""
self.handlers[event_type] = handler
self.event_bus.subscribe(event_type, self._handle_event)
async def _handle_event(self, event: AgentEvent):
"""Handle incoming event"""
if event.target_agent != self.agent_id and event.target_agent != "*":
return
if event.event_type in self.handlers:
try:
await self.handlers[event.event_type](event)
except Exception as e:
print(f"Error handling event {event.event_id}: {e}")
async def send_response(self, original_event: AgentEvent, response_data: Dict[str, Any]):
"""Send response to original event"""
await self.event_bus.publish_event(
event_type=f"{original_event.event_type}_response",
source_agent=self.agent_id,
target_agent=original_event.source_agent,
payload=response_data,
correlation_id=original_event.correlation_id
)
# Usage
async def setup_event_handlers():
"""Setup event handlers for agent"""
event_bus = EventBus()
handler = EventHandler("my-agent", event_bus)
async def handle_data_request(event: AgentEvent):
"""Handle data request event"""
print(f"Received data request: {event.payload}")
# Process the request
result = await process_data_request(event.payload)
# Send response
await handler.send_response(event, {
"status": "completed",
"result": result,
"timestamp": datetime.utcnow().isoformat()
})
async def handle_analysis_request(event: AgentEvent):
"""Handle analysis request event"""
print(f"Received analysis request: {event.payload}")
# Process the analysis
result = await perform_analysis(event.payload)
# Send response
await handler.send_response(event, {
"status": "completed",
"analysis": result,
"timestamp": datetime.utcnow().isoformat()
})
# Register handlers
handler.register_handler("data_request", handle_data_request)
handler.register_handler("analysis_request", handle_analysis_request)
return handler
Advanced Patterns
1. Agent Discovery
Discover and register agents dynamically:
class AgentRegistry:
"""Registry for agent discovery"""
def __init__(self):
self.agents: Dict[str, Dict[str, Any]] = {}
self.health_checkers: Dict[str, asyncio.Task] = {}
def register_agent(self, agent_id: str, address: str, capabilities: List[str],
description: str = ""):
"""Register an agent"""
self.agents[agent_id] = {
"address": address,
"capabilities": capabilities,
"description": description,
"status": "unknown",
"last_seen": time.time()
}
# Start health checking
self.health_checkers[agent_id] = asyncio.create_task(
self._health_check_agent(agent_id, address)
)
async def _health_check_agent(self, agent_id: str, address: str):
"""Continuously health check an agent"""
while True:
try:
async with AgentClient(address) as client:
health = await client.health_check()
if health["status"] == "healthy":
self.agents[agent_id]["status"] = "healthy"
self.agents[agent_id]["last_seen"] = time.time()
else:
self.agents[agent_id]["status"] = "unhealthy"
except Exception as e:
self.agents[agent_id]["status"] = "unhealthy"
print(f"Health check failed for {agent_id}: {e}")
await asyncio.sleep(30) # Check every 30 seconds
def get_agents_by_capability(self, capability: str) -> List[Dict[str, Any]]:
"""Get agents with specific capability"""
return [
agent for agent in self.agents.values()
if capability in agent["capabilities"] and agent["status"] == "healthy"
]
def get_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
"""Get specific agent"""
return self.agents.get(agent_id)
async def discover_agents(self, discovery_service: str):
"""Discover agents from discovery service"""
# Implement service discovery logic
# This could integrate with Consul, etcd, or other discovery services
pass
# Usage
async def setup_agent_discovery():
"""Setup agent discovery"""
registry = AgentRegistry()
# Register known agents
registry.register_agent(
"data-processor",
"localhost:50051",
["data_processing", "analysis"],
"Data processing agent"
)
registry.register_agent(
"report-generator",
"localhost:50052",
["report_generation", "formatting"],
"Report generation agent"
)
return registry
2. Load Balancing
Distribute requests across multiple agent instances:
class AgentLoadBalancer:
"""Load balancer for agent instances"""
def __init__(self, registry: AgentRegistry):
self.registry = registry
self.round_robin_index = 0
async def invoke_agent(self, capability: str, message: str,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Invoke agent with load balancing"""
# Get available agents with the capability
agents = self.registry.get_agents_by_capability(capability)
if not agents:
return {
"success": False,
"error": f"No agents available with capability: {capability}"
}
# Select agent using round-robin
selected_agent = self._select_agent(agents)
# Invoke the selected agent
async with AgentClient(selected_agent["address"]) as client:
return await client.invoke_agent(message, context)
def _select_agent(self, agents: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Select agent using round-robin"""
if not agents:
return None
selected = agents[self.round_robin_index % len(agents)]
self.round_robin_index += 1
return selected
async def invoke_with_failover(self, capability: str, message: str,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Invoke agent with automatic failover"""
agents = self.registry.get_agents_by_capability(capability)
if not agents:
return {
"success": False,
"error": f"No agents available with capability: {capability}"
}
# Try each agent until one succeeds
for agent in agents:
try:
async with AgentClient(agent["address"]) as client:
result = await client.invoke_agent(message, context)
if result["success"]:
return result
except Exception as e:
print(f"Agent {agent['address']} failed: {e}")
continue
return {
"success": False,
"error": "All agents failed"
}
3. Circuit Breaker Pattern
Implement circuit breaker for agent communication:
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Circuit is open, failing fast
HALF_OPEN = "half_open" # Testing if service is back
class CircuitBreaker:
"""Circuit breaker for agent communication"""
def __init__(self, failure_threshold: int = 5, timeout: int = 60,
retry_timeout: int = 30):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.retry_timeout = retry_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def can_execute(self) -> bool:
"""Check if request can be executed"""
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.retry_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
elif self.state == CircuitState.HALF_OPEN:
return True
return False
def record_success(self):
"""Record successful execution"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
"""Record failed execution"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class CircuitBreakerAgentClient:
"""Agent client with circuit breaker"""
def __init__(self, agent_address: str, circuit_breaker: CircuitBreaker):
self.agent_address = agent_address
self.circuit_breaker = circuit_breaker
async def invoke_agent(self, message: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Invoke agent with circuit breaker"""
if not self.circuit_breaker.can_execute():
return {
"success": False,
"error": "Circuit breaker is open"
}
try:
async with AgentClient(self.agent_address) as client:
result = await client.invoke_agent(message, context)
if result["success"]:
self.circuit_breaker.record_success()
else:
self.circuit_breaker.record_failure()
return result
except Exception as e:
self.circuit_breaker.record_failure()
return {
"success": False,
"error": f"Circuit breaker error: {str(e)}"
}
Best Practices
1. Error Handling
Implement robust error handling for agent communication:
class AgentCommunicationError(Exception):
"""Base exception for agent communication errors"""
pass
class AgentUnavailableError(AgentCommunicationError):
"""Agent is unavailable"""
pass
class AgentTimeoutError(AgentCommunicationError):
"""Agent request timed out"""
pass
class AgentValidationError(AgentCommunicationError):
"""Agent request validation failed"""
pass
async def safe_agent_invoke(client: AgentClient, message: str,
context: Dict[str, Any] = None,
timeout: int = 30) -> Dict[str, Any]:
"""Safely invoke agent with error handling"""
try:
# Set timeout
result = await asyncio.wait_for(
client.invoke_agent(message, context),
timeout=timeout
)
if not result["success"]:
raise AgentCommunicationError(f"Agent returned error: {result.get('error')}")
return result
except asyncio.TimeoutError:
raise AgentTimeoutError(f"Agent request timed out after {timeout} seconds")
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise AgentUnavailableError(f"Agent is unavailable: {e.details()}")
else:
raise AgentCommunicationError(f"gRPC error: {e.code()} - {e.details()}")
except Exception as e:
raise AgentCommunicationError(f"Unexpected error: {str(e)}")
2. Monitoring and Observability
Monitor agent communication:
class AgentCommunicationMonitor:
"""Monitor agent communication metrics"""
def __init__(self):
self.metrics = {
"requests_total": 0,
"requests_successful": 0,
"requests_failed": 0,
"request_duration": [],
"agent_health": {},
"error_counts": {}
}
def record_request(self, agent_id: str, success: bool, duration: float, error: str = None):
"""Record request metrics"""
self.metrics["requests_total"] += 1
if success:
self.metrics["requests_successful"] += 1
else:
self.metrics["requests_failed"] += 1
if error:
self.metrics["error_counts"][error] = self.metrics["error_counts"].get(error, 0) + 1
self.metrics["request_duration"].append(duration)
# Keep only last 1000 durations
if len(self.metrics["request_duration"]) > 1000:
self.metrics["request_duration"] = self.metrics["request_duration"][-1000:]
def record_agent_health(self, agent_id: str, status: str):
"""Record agent health status"""
self.metrics["agent_health"][agent_id] = {
"status": status,
"timestamp": time.time()
}
def get_metrics(self) -> Dict[str, Any]:
"""Get current metrics"""
durations = self.metrics["request_duration"]
avg_duration = sum(durations) / len(durations) if durations else 0
return {
"requests_total": self.metrics["requests_total"],
"requests_successful": self.metrics["requests_successful"],
"requests_failed": self.metrics["requests_failed"],
"success_rate": self.metrics["requests_successful"] / max(self.metrics["requests_total"], 1),
"average_duration": avg_duration,
"agent_health": self.metrics["agent_health"],
"error_counts": self.metrics["error_counts"]
}
# Usage with monitoring
async def monitored_agent_invoke(client: AgentClient, message: str,
context: Dict[str, Any] = None,
monitor: AgentCommunicationMonitor = None) -> Dict[str, Any]:
"""Invoke agent with monitoring"""
start_time = time.time()
try:
result = await client.invoke_agent(message, context)
duration = time.time() - start_time
if monitor:
monitor.record_request(
agent_id=client.agent_address,
success=result["success"],
duration=duration,
error=result.get("error")
)
return result
except Exception as e:
duration = time.time() - start_time
if monitor:
monitor.record_request(
agent_id=client.agent_address,
success=False,
duration=duration,
error=str(e)
)
raise
3. Security
Implement security for agent communication:
import jwt
import hashlib
import hmac
from typing import Optional
class AgentSecurityManager:
"""Security manager for agent communication"""
def __init__(self, secret_key: str):
self.secret_key = secret_key
def generate_token(self, agent_id: str, capabilities: List[str],
expires_in: int = 3600) -> str:
"""Generate JWT token for agent"""
payload = {
"agent_id": agent_id,
"capabilities": capabilities,
"exp": time.time() + expires_in,
"iat": time.time()
}
return jwt.encode(payload, self.secret_key, algorithm="HS256")
def verify_token(self, token: str) -> Dict[str, Any]:
"""Verify JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise ValueError("Token has expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
def generate_signature(self, message: str, timestamp: str) -> str:
"""Generate HMAC signature for message"""
data = f"{message}:{timestamp}"
return hmac.new(
self.secret_key.encode(),
data.encode(),
hashlib.sha256
).hexdigest()
def verify_signature(self, message: str, timestamp: str, signature: str) -> bool:
"""Verify HMAC signature"""
expected_signature = self.generate_signature(message, timestamp)
return hmac.compare_digest(signature, expected_signature)
# Secure agent client
class SecureAgentClient(AgentClient):
"""Agent client with security"""
def __init__(self, agent_address: str, security_manager: AgentSecurityManager):
super().__init__(agent_address)
self.security_manager = security_manager
self.agent_token = None
async def authenticate(self, agent_id: str, capabilities: List[str]):
"""Authenticate with agent"""
self.agent_token = self.security_manager.generate_token(agent_id, capabilities)
async def invoke_agent(self, message: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Invoke agent with security"""
if not self.agent_token:
raise ValueError("Agent not authenticated")
# Add security headers
secure_context = context or {}
secure_context["token"] = self.agent_token
secure_context["timestamp"] = str(int(time.time()))
# Generate signature
signature = self.security_manager.generate_signature(
message, secure_context["timestamp"]
)
secure_context["signature"] = signature
return await super().invoke_agent(message, secure_context)
Next Steps
After implementing agent-to-agent communication:
- Debugging Guide - Debug your multi-agent systems
- UI Integration Guide - Integrate with user interfaces
- Best Practices - Follow development best practices
- Full Deployment Guide - Deploy collaborative agents
Ready to build collaborative agents? Check out Debugging Guide to learn how to debug your multi-agent systems!