Skip to main content

A2A Protocol

The Agent-to-Agent (A2A) Protocol is the core communication standard that enables direct, secure, and efficient communication between AI agents in the Pixell ecosystem. Built on gRPC and Protocol Buffers, it provides type-safe, high-performance inter-agent communication.

Protocol Overview

The A2A Protocol defines:

  • 📨 Message Formats - Standardized message structures
  • 🔄 Communication Patterns - Request/response, streaming, and event-driven
  • 🛡️ Security Model - Authentication, authorization, and encryption
  • 🎯 Service Discovery - Dynamic agent discovery and routing
  • 📊 Monitoring - Health checks, metrics, and observability

Message Format Specification

Base Message Structure

All A2A messages follow a consistent structure:

syntax = "proto3";

package pixell.a2a;

import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";

// Base message envelope
message A2AMessage {
string message_id = 1;
string correlation_id = 2;
string source_agent = 3;
string target_agent = 4;
MessageType type = 5;
google.protobuf.Timestamp timestamp = 6;
map<string, string> headers = 7;
google.protobuf.Any payload = 8;
SecurityContext security = 9;
}

enum MessageType {
REQUEST = 0;
RESPONSE = 1;
EVENT = 2;
HEARTBEAT = 3;
ERROR = 4;
STREAM_START = 5;
STREAM_DATA = 6;
STREAM_END = 7;
}

message SecurityContext {
string auth_token = 1;
string signature = 2;
string encryption_key_id = 3;
repeated string permissions = 4;
}

Request/Response Messages

// Request message
message A2ARequest {
A2AMessage envelope = 1;
string method = 2;
map<string, string> parameters = 3;
google.protobuf.Any data = 4;
}

// Response message
message A2AResponse {
A2AMessage envelope = 1;
int32 status_code = 2;
string status_message = 3;
google.protobuf.Any data = 4;
map<string, string> metadata = 5;
}

Event Messages

// Event message for pub/sub communication
message A2AEvent {
A2AMessage envelope = 1;
string event_type = 2;
string topic = 3;
google.protobuf.Any payload = 4;
int32 priority = 5;
google.protobuf.Timestamp expires_at = 6;
}

Stream Messages

// Streaming message
message A2AStream {
A2AMessage envelope = 1;
string stream_id = 2;
StreamType stream_type = 3;
int32 sequence_number = 4;
bool is_final = 5;
google.protobuf.Any data = 6;
}

enum StreamType {
BIDIRECTIONAL = 0;
CLIENT_TO_SERVER = 1;
SERVER_TO_CLIENT = 2;
}

Communication Patterns

1. Request/Response Pattern

Standard synchronous communication:

# Client implementation
import grpc
from pixell_a2a.proto import a2a_pb2, a2a_pb2_grpc

class A2AClient:
def __init__(self, target_agent):
self.target_agent = target_agent
self.channel = grpc.insecure_channel(f"{target_agent}:8081")
self.stub = a2a_pb2_grpc.A2AServiceStub(self.channel)

def send_request(self, method, data, correlation_id=None):
"""Send a request and wait for response"""
request = a2a_pb2.A2ARequest(
envelope=a2a_pb2.A2AMessage(
message_id=generate_message_id(),
correlation_id=correlation_id or generate_correlation_id(),
source_agent=self.agent_id,
target_agent=self.target_agent,
type=a2a_pb2.REQUEST,
timestamp=Timestamp(seconds=int(time.time())),
headers={"content-type": "application/json"},
payload=Any(value=json.dumps(data))
),
method=method,
data=Any(value=json.dumps(data))
)

response = self.stub.ProcessRequest(request)
return self._parse_response(response)

def _parse_response(self, response):
"""Parse response and extract data"""
if response.status_code == 200:
return json.loads(response.data.value)
else:
raise A2AException(f"Request failed: {response.status_message}")

# Usage
client = A2AClient("data-analyst-001")
result = client.send_request("analyze_data", {
"dataset": "sales_data.csv",
"analysis_type": "trend_analysis"
})

2. Streaming Pattern

Real-time bidirectional communication:

class A2AStreamClient:
def __init__(self, target_agent):
self.target_agent = target_agent
self.channel = grpc.insecure_channel(f"{target_agent}:8081")
self.stub = a2a_pb2_grpc.A2AServiceStub(self.channel)

def create_stream(self, stream_type="bidirectional"):
"""Create a bidirectional stream"""
def request_generator():
while True:
# Yield messages from client
yield a2a_pb2.A2AStream(
envelope=a2a_pb2.A2AMessage(
message_id=generate_message_id(),
source_agent=self.agent_id,
target_agent=self.target_agent,
type=a2a_pb2.STREAM_DATA,
timestamp=Timestamp(seconds=int(time.time()))
),
stream_id=self.stream_id,
stream_type=getattr(a2a_pb2, stream_type.upper()),
data=Any(value=json.dumps({"message": "Hello"}))
)
time.sleep(1)

# Start streaming
responses = self.stub.ProcessStream(request_generator())

for response in responses:
print(f"Received: {response.data.value}")
if response.is_final:
break

3. Event-Driven Pattern

Pub/sub communication for loose coupling:

class A2AEventClient:
def __init__(self):
self.event_bus = EventBus()
self.subscriptions = {}

def publish_event(self, event_type, topic, data):
"""Publish an event to the bus"""
event = a2a_pb2.A2AEvent(
envelope=a2a_pb2.A2AMessage(
message_id=generate_message_id(),
source_agent=self.agent_id,
type=a2a_pb2.EVENT,
timestamp=Timestamp(seconds=int(time.time()))
),
event_type=event_type,
topic=topic,
payload=Any(value=json.dumps(data))
)

self.event_bus.publish(event)

def subscribe_to_topic(self, topic, callback):
"""Subscribe to events on a topic"""
subscription_id = self.event_bus.subscribe(topic, callback)
self.subscriptions[topic] = subscription_id
return subscription_id

def unsubscribe(self, topic):
"""Unsubscribe from a topic"""
if topic in self.subscriptions:
self.event_bus.unsubscribe(self.subscriptions[topic])
del self.subscriptions[topic]

# Usage
event_client = A2AEventClient()

# Publish an event
event_client.publish_event("data_processed", "analytics", {
"dataset_id": "sales_2024",
"status": "completed",
"records_processed": 10000
})

# Subscribe to events
def handle_data_events(event):
print(f"Received event: {event.event_type} - {event.payload.value}")

event_client.subscribe_to_topic("analytics", handle_data_events)

Service Discovery

Agent Registry

Agents register themselves with the discovery service:

class AgentRegistry:
def __init__(self):
self.agents = {}
self.capabilities_index = {}

def register_agent(self, agent_id, capabilities, endpoints):
"""Register an agent with its capabilities"""
agent_info = {
"agent_id": agent_id,
"capabilities": capabilities,
"endpoints": endpoints,
"status": "online",
"last_heartbeat": time.time(),
"load_factor": 0.0
}

self.agents[agent_id] = agent_info

# Update capability index
for capability in capabilities:
if capability not in self.capabilities_index:
self.capabilities_index[capability] = []
self.capabilities_index[capability].append(agent_id)

def find_agents_by_capability(self, capability):
"""Find agents with specific capability"""
return self.capabilities_index.get(capability, [])

def get_agent_info(self, agent_id):
"""Get information about a specific agent"""
return self.agents.get(agent_id)

def update_agent_status(self, agent_id, status, load_factor=None):
"""Update agent status and load"""
if agent_id in self.agents:
self.agents[agent_id]["status"] = status
self.agents[agent_id]["last_heartbeat"] = time.time()
if load_factor is not None:
self.agents[agent_id]["load_factor"] = load_factor

Dynamic Discovery

class DynamicDiscovery:
def __init__(self, registry):
self.registry = registry
self.discovery_cache = {}
self.cache_ttl = 300 # 5 minutes

def discover_agent(self, capability, context=None):
"""Discover the best agent for a capability"""
cache_key = f"{capability}:{hash(str(context))}"

# Check cache first
if cache_key in self.discovery_cache:
cached_result, timestamp = self.discovery_cache[cache_key]
if time.time() - timestamp < self.cache_ttl:
return cached_result

# Find agents with capability
candidates = self.registry.find_agents_by_capability(capability)

if not candidates:
return None

# Score and rank candidates
scored_candidates = []
for agent_id in candidates:
score = self._calculate_agent_score(agent_id, context)
scored_candidates.append((agent_id, score))

# Get best match
best_agent = max(scored_candidates, key=lambda x: x[1])

# Cache result
self.discovery_cache[cache_key] = (best_agent, time.time())

return best_agent

def _calculate_agent_score(self, agent_id, context):
"""Calculate agent suitability score"""
agent_info = self.registry.get_agent_info(agent_id)
if not agent_info:
return 0

score = 0

# Availability score
if agent_info["status"] == "online":
score += 0.5

# Load factor score (lower is better)
load_factor = agent_info["load_factor"]
score += (1.0 - load_factor) * 0.3

# Context matching score
if context:
context_score = self._calculate_context_score(agent_id, context)
score += context_score * 0.2

return score

Security Model

Authentication

class A2AAuthentication:
def __init__(self, secret_key):
self.secret_key = secret_key
self.token_cache = {}

def generate_auth_token(self, agent_id, permissions=None):
"""Generate authentication token for agent"""
payload = {
"agent_id": agent_id,
"permissions": permissions or [],
"expires_at": time.time() + 3600, # 1 hour
"issued_at": time.time()
}

token = jwt.encode(payload, self.secret_key, algorithm="HS256")
self.token_cache[agent_id] = token

return token

def validate_token(self, token):
"""Validate authentication token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])

# Check expiration
if payload["expires_at"] < time.time():
return None

return payload
except jwt.InvalidTokenError:
return None

def check_permissions(self, token, required_permission):
"""Check if token has required permission"""
payload = self.validate_token(token)
if not payload:
return False

return required_permission in payload.get("permissions", [])

Message Encryption

class A2AEncryption:
def __init__(self, encryption_key):
self.encryption_key = encryption_key
self.cipher = Fernet(encryption_key)

def encrypt_message(self, message_data):
"""Encrypt message data"""
json_data = json.dumps(message_data)
encrypted_data = self.cipher.encrypt(json_data.encode())
return base64.b64encode(encrypted_data).decode()

def decrypt_message(self, encrypted_data):
"""Decrypt message data"""
try:
decoded_data = base64.b64decode(encrypted_data.encode())
decrypted_data = self.cipher.decrypt(decoded_data)
return json.loads(decrypted_data.decode())
except Exception as e:
raise A2ADecryptionError(f"Failed to decrypt message: {e}")

def sign_message(self, message_data, private_key):
"""Sign message for integrity verification"""
message_json = json.dumps(message_data, sort_keys=True)
signature = private_key.sign(
message_json.encode(),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return base64.b64encode(signature).decode()

def verify_signature(self, message_data, signature, public_key):
"""Verify message signature"""
try:
message_json = json.dumps(message_data, sort_keys=True)
signature_bytes = base64.b64decode(signature.encode())

public_key.verify(
signature_bytes,
message_json.encode(),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception:
return False

Message Routing

Routing Strategies

class A2ARouter:
def __init__(self, discovery, registry):
self.discovery = discovery
self.registry = registry
self.routing_cache = {}

def route_message(self, message, routing_strategy="capability_based"):
"""Route message to appropriate agent"""
if routing_strategy == "capability_based":
return self._route_by_capability(message)
elif routing_strategy == "load_based":
return self._route_by_load(message)
elif routing_strategy == "context_aware":
return self._route_by_context(message)
else:
raise ValueError(f"Unknown routing strategy: {routing_strategy}")

def _route_by_capability(self, message):
"""Route based on required capabilities"""
required_capabilities = message.get("required_capabilities", [])

if not required_capabilities:
return None

# Find agents with primary capability
primary_capability = required_capabilities[0]
candidates = self.registry.find_agents_by_capability(primary_capability)

if not candidates:
return None

# Score candidates based on capability match
best_agent = None
best_score = 0

for agent_id in candidates:
score = self._calculate_capability_score(agent_id, required_capabilities)
if score > best_score:
best_score = score
best_agent = agent_id

return best_agent, best_score

def _route_by_load(self, message):
"""Route based on agent load"""
required_capabilities = message.get("required_capabilities", [])
candidates = self.registry.find_agents_by_capability(required_capabilities[0])

# Filter by availability
available_agents = [
agent_id for agent_id in candidates
if self.registry.get_agent_info(agent_id)["status"] == "online"
]

if not available_agents:
return None

# Choose agent with lowest load
best_agent = min(available_agents, key=lambda x:
self.registry.get_agent_info(x)["load_factor"]
)

return best_agent, 1.0

def _route_by_context(self, message):
"""Route based on context and history"""
required_capabilities = message.get("required_capabilities", [])
context = message.get("context", {})

candidates = self.registry.find_agents_by_capability(required_capabilities[0])

# Score based on context
scored_candidates = []
for agent_id in candidates:
score = self._calculate_context_score(agent_id, context)
scored_candidates.append((agent_id, score))

if not scored_candidates:
return None

return max(scored_candidates, key=lambda x: x[1])

Error Handling

Error Types

// Error message structure
message A2AError {
int32 error_code = 1;
string error_message = 2;
string error_type = 3;
map<string, string> error_details = 4;
google.protobuf.Timestamp timestamp = 5;
}

enum ErrorCode {
UNKNOWN_ERROR = 0;
INVALID_MESSAGE = 1;
AUTHENTICATION_FAILED = 2;
AUTHORIZATION_FAILED = 3;
AGENT_NOT_FOUND = 4;
CAPABILITY_NOT_AVAILABLE = 5;
TIMEOUT = 6;
NETWORK_ERROR = 7;
INTERNAL_ERROR = 8;
}

Error Handling Implementation

class A2AErrorHandler:
def __init__(self):
self.error_handlers = {}
self.retry_policies = {}

def handle_error(self, error, context=None):
"""Handle A2A communication errors"""
error_type = error.error_type
error_code = error.error_code

# Get error handler
handler = self.error_handlers.get(error_type)
if handler:
return handler(error, context)

# Default error handling
return self._default_error_handler(error, context)

def _default_error_handler(self, error, context):
"""Default error handling strategy"""
if error.error_code == ErrorCode.TIMEOUT:
return self._handle_timeout(error, context)
elif error.error_code == ErrorCode.AGENT_NOT_FOUND:
return self._handle_agent_not_found(error, context)
elif error.error_code == ErrorCode.AUTHENTICATION_FAILED:
return self._handle_auth_failure(error, context)
else:
return self._handle_generic_error(error, context)

def _handle_timeout(self, error, context):
"""Handle timeout errors with retry"""
retry_count = context.get("retry_count", 0)
max_retries = context.get("max_retries", 3)

if retry_count < max_retries:
return {
"action": "retry",
"delay": min(2 ** retry_count, 30), # Exponential backoff
"retry_count": retry_count + 1
}
else:
return {
"action": "fail",
"error": "Max retries exceeded"
}

def _handle_agent_not_found(self, error, context):
"""Handle agent not found errors"""
return {
"action": "discover_alternative",
"fallback_strategy": "find_similar_capability"
}

Monitoring and Observability

Health Checks

class A2AHealthMonitor:
def __init__(self):
self.health_status = {}
self.metrics = {}

def check_agent_health(self, agent_id):
"""Check health of specific agent"""
try:
# Send health check request
health_request = a2a_pb2.HealthRequest(
agent_id=agent_id,
timestamp=Timestamp(seconds=int(time.time()))
)

# This would be implemented with actual gRPC call
response = self._send_health_check(agent_id, health_request)

self.health_status[agent_id] = {
"status": response.status,
"last_check": time.time(),
"response_time": response.response_time
}

return response.status == "healthy"
except Exception as e:
self.health_status[agent_id] = {
"status": "unhealthy",
"last_check": time.time(),
"error": str(e)
}
return False

def get_system_health(self):
"""Get overall system health"""
total_agents = len(self.health_status)
healthy_agents = sum(1 for status in self.health_status.values()
if status["status"] == "healthy")

return {
"overall_status": "healthy" if healthy_agents == total_agents else "degraded",
"total_agents": total_agents,
"healthy_agents": healthy_agents,
"unhealthy_agents": total_agents - healthy_agents,
"health_percentage": (healthy_agents / total_agents) * 100 if total_agents > 0 else 0
}

Metrics Collection

class A2AMetricsCollector:
def __init__(self):
self.metrics = {
"messages_sent": 0,
"messages_received": 0,
"errors": 0,
"response_times": [],
"throughput": 0
}

def record_message_sent(self, message_type, target_agent):
"""Record message sent"""
self.metrics["messages_sent"] += 1
self._update_throughput()

def record_message_received(self, message_type, source_agent):
"""Record message received"""
self.metrics["messages_received"] += 1

def record_error(self, error_type, error_code):
"""Record error occurrence"""
self.metrics["errors"] += 1

def record_response_time(self, response_time):
"""Record response time"""
self.metrics["response_times"].append(response_time)

# Keep only last 1000 measurements
if len(self.metrics["response_times"]) > 1000:
self.metrics["response_times"] = self.metrics["response_times"][-1000:]

def get_metrics(self):
"""Get current metrics"""
response_times = self.metrics["response_times"]
avg_response_time = sum(response_times) / len(response_times) if response_times else 0

return {
"messages_sent": self.metrics["messages_sent"],
"messages_received": self.metrics["messages_received"],
"errors": self.metrics["errors"],
"average_response_time": avg_response_time,
"throughput": self.metrics["throughput"],
"error_rate": self.metrics["errors"] / max(self.metrics["messages_sent"], 1)
}

Configuration

A2A Protocol Configuration

# a2a-protocol-config.yaml
a2a_protocol:
message_format:
version: "1.0"
encoding: "protobuf"
compression: "gzip"

security:
authentication:
enabled: true
method: "jwt"
token_expiry: 3600
encryption:
enabled: true
algorithm: "AES-256-GCM"
signing:
enabled: true
algorithm: "RSA-PSS"

routing:
strategy: "capability_based"
fallback_enabled: true
timeout: 30
retry_attempts: 3

monitoring:
health_check_interval: 30
metrics_enabled: true
tracing_enabled: true

Next Steps

After understanding the A2A Protocol:

  1. Message Routing - Learn about message routing mechanisms
  2. Security - Explore security considerations
  3. LangGraph Integration - Advanced workflow integration
  4. Intent System - Intent recognition and routing

Ready to learn about message routing? Check out Message Routing to understand how messages are routed between agents!