gRPC Interface
Complete gRPC API reference for Agent-to-Agent (A2A) communication in the Pixell Agent Runtime (PAR). This interface enables direct agent-to-agent communication using gRPC and Protocol Buffers.
Overview
The gRPC interface provides high-performance, type-safe communication between agents using Protocol Buffers for serialization and HTTP/2 for transport.
Key Features
- 🔗 High Performance - Binary protocol with HTTP/2 multiplexing
 - 🛡️ Type Safety - Protocol Buffer schema validation
 - 🌐 Language Agnostic - Works with any gRPC-supported language
 - 🔄 Bidirectional Streaming - Real-time agent communication
 - 🔐 Built-in Security - TLS encryption and authentication
 
Service Definition
Agent Service
syntax = "proto3";
package pixell.runtime;
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
// Main agent service for A2A communication
service AgentService {
  // Health check endpoint
  rpc Health(HealthRequest) returns (HealthResponse);
  
  // Describe agent capabilities
  rpc DescribeCapabilities(DescribeCapabilitiesRequest) returns (DescribeCapabilitiesResponse);
  
  // Invoke agent with message
  rpc Invoke(InvokeRequest) returns (InvokeResponse);
  
  // Ping for connectivity testing
  rpc Ping(PingRequest) returns (PingResponse);
  
  // Stream for real-time communication
  rpc Stream(stream StreamRequest) returns (stream StreamResponse);
}
// Health check request
message HealthRequest {
  string agent_id = 1;
  google.protobuf.Timestamp timestamp = 2;
}
// Health check response
message HealthResponse {
  string status = 1;  // "healthy", "unhealthy", "degraded"
  string message = 2;
  google.protobuf.Timestamp timestamp = 3;
  map<string, string> metadata = 4;
}
// Describe capabilities request
message DescribeCapabilitiesRequest {
  string agent_id = 1;
  bool include_details = 2;
}
// Describe capabilities response
message DescribeCapabilitiesResponse {
  repeated string capabilities = 1;
  string description = 2;
  string version = 3;
  map<string, string> metadata = 4;
}
// Invoke request
message InvokeRequest {
  string message = 1;
  map<string, string> context = 2;
  google.protobuf.Any payload = 3;
  string request_id = 4;
  google.protobuf.Timestamp timestamp = 5;
}
// Invoke response
message InvokeResponse {
  string response = 1;
  bool success = 2;
  string error_message = 3;
  map<string, string> metadata = 4;
  google.protobuf.Any payload = 5;
  string request_id = 6;
  google.protobuf.Timestamp timestamp = 7;
}
// Ping request
message PingRequest {
  string message = 1;
  google.protobuf.Timestamp timestamp = 2;
}
// Ping response
message PingResponse {
  string message = 1;
  google.protobuf.Timestamp timestamp = 2;
  int64 latency_ms = 3;
}
// Stream request
message StreamRequest {
  string type = 1;  // "message", "event", "heartbeat"
  string content = 2;
  map<string, string> context = 3;
  google.protobuf.Any payload = 4;
  string stream_id = 5;
  google.protobuf.Timestamp timestamp = 6;
}
// Stream response
message StreamResponse {
  string type = 1;  // "message", "event", "heartbeat"
  string content = 2;
  map<string, string> metadata = 3;
  google.protobuf.Any payload = 4;
  string stream_id = 5;
  google.protobuf.Timestamp timestamp = 6;
}
Connection Details
Endpoint
localhost:8081
Protocol
- Transport: HTTP/2
 - Serialization: Protocol Buffers
 - Authentication: TLS with client certificates (optional)
 
Service Methods
Health Check
Check if an agent is healthy and responsive.
rpc Health(HealthRequest) returns (HealthResponse);
Request:
{
  "agent_id": "agent-1",
  "timestamp": "2024-01-15T10:30:00Z"
}
Response:
{
  "status": "healthy",
  "message": "Agent is running normally",
  "timestamp": "2024-01-15T10:30:00Z",
  "metadata": {
    "uptime": "00:05:23",
    "memory_usage": "128MB",
    "request_count": "42"
  }
}
Describe Capabilities
Get information about agent capabilities and features.
rpc DescribeCapabilities(DescribeCapabilitiesRequest) returns (DescribeCapabilitiesResponse);
Request:
{
  "agent_id": "agent-1",
  "include_details": true
}
Response:
{
  "capabilities": [
    "greeting",
    "conversation",
    "personalization"
  ],
  "description": "A friendly greeting agent",
  "version": "0.1.0",
  "metadata": {
    "max_message_length": "1000",
    "supported_languages": "en,es,fr",
    "response_time": "0.123s"
  }
}
Invoke Agent
Send a message to an agent and get a response.
rpc Invoke(InvokeRequest) returns (InvokeResponse);
Request:
{
  "message": "Hello, agent!",
  "context": {
    "user_id": "user123",
    "session_id": "session456",
    "language": "en"
  },
  "request_id": "req_123456789",
  "timestamp": "2024-01-15T10:30:00Z"
}
Response:
{
  "response": "Hello! How can I help you today?",
  "success": true,
  "metadata": {
    "agent": "greeting-agent",
    "version": "0.1.0",
    "processing_time": "0.123s"
  },
  "request_id": "req_123456789",
  "timestamp": "2024-01-15T10:30:00Z"
}
Ping
Test connectivity and measure latency.
rpc Ping(PingRequest) returns (PingResponse);
Request:
{
  "message": "ping",
  "timestamp": "2024-01-15T10:30:00Z"
}
Response:
{
  "message": "pong",
  "timestamp": "2024-01-15T10:30:00Z",
  "latency_ms": 5
}
Stream Communication
Real-time bidirectional communication.
rpc Stream(stream StreamRequest) returns (stream StreamResponse);
Request Stream:
{
  "type": "message",
  "content": "Hello, agent!",
  "context": {
    "user_id": "user123"
  },
  "stream_id": "stream_123456789",
  "timestamp": "2024-01-15T10:30:00Z"
}
Response Stream:
{
  "type": "message",
  "content": "Hello! How can I help you?",
  "metadata": {
    "agent": "greeting-agent",
    "confidence": "0.95"
  },
  "stream_id": "stream_123456789",
  "timestamp": "2024-01-15T10:30:00Z"
}
Client Implementation
Python Client
import grpc
from pixell_runtime.proto import agent_pb2, agent_pb2_grpc
from google.protobuf.timestamp_pb2 import Timestamp
import time
# Create gRPC channel
channel = grpc.insecure_channel('localhost:8081')
stub = agent_pb2_grpc.AgentServiceStub(channel)
# Health check
def check_health():
    request = agent_pb2.HealthRequest(
        agent_id="agent-1",
        timestamp=Timestamp(seconds=int(time.time()))
    )
    response = stub.Health(request)
    print(f"Agent status: {response.status}")
    return response.status == "healthy"
# Describe capabilities
def get_capabilities():
    request = agent_pb2.DescribeCapabilitiesRequest(
        agent_id="agent-1",
        include_details=True
    )
    response = stub.DescribeCapabilities(request)
    print(f"Capabilities: {response.capabilities}")
    return response
# Invoke agent
def invoke_agent(message, context=None):
    request = agent_pb2.InvokeRequest(
        message=message,
        context=context or {},
        request_id=f"req_{int(time.time())}",
        timestamp=Timestamp(seconds=int(time.time()))
    )
    response = stub.Invoke(request)
    print(f"Agent response: {response.response}")
    return response
# Stream communication
def stream_communication():
    def request_generator():
        for i in range(5):
            yield agent_pb2.StreamRequest(
                type="message",
                content=f"Message {i}",
                context={"user_id": "user123"},
                stream_id=f"stream_{int(time.time())}",
                timestamp=Timestamp(seconds=int(time.time()))
            )
            time.sleep(1)
    
    responses = stub.Stream(request_generator())
    for response in responses:
        print(f"Stream response: {response.content}")
# Usage
if __name__ == "__main__":
    # Check health
    if check_health():
        # Get capabilities
        get_capabilities()
        
        # Invoke agent
        invoke_agent("Hello, agent!", {"user_id": "user123"})
        
        # Stream communication
        stream_communication()
JavaScript/Node.js Client
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
// Load proto file
const packageDefinition = protoLoader.loadSync('agent.proto', {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});
const agentProto = grpc.loadPackageDefinition(packageDefinition).pixell.runtime;
// Create client
const client = new agentProto.AgentService('localhost:8081', 
  grpc.credentials.createInsecure());
// Health check
function checkHealth() {
  const request = {
    agent_id: 'agent-1',
    timestamp: { seconds: Math.floor(Date.now() / 1000) }
  };
  
  client.Health(request, (error, response) => {
    if (error) {
      console.error('Health check failed:', error);
    } else {
      console.log('Agent status:', response.status);
    }
  });
}
// Invoke agent
function invokeAgent(message, context = {}) {
  const request = {
    message: message,
    context: context,
    request_id: `req_${Date.now()}`,
    timestamp: { seconds: Math.floor(Date.now() / 1000) }
  };
  
  client.Invoke(request, (error, response) => {
    if (error) {
      console.error('Invoke failed:', error);
    } else {
      console.log('Agent response:', response.response);
    }
  });
}
// Stream communication
function streamCommunication() {
  const stream = client.Stream();
  
  stream.on('data', (response) => {
    console.log('Stream response:', response.content);
  });
  
  stream.on('end', () => {
    console.log('Stream ended');
  });
  
  // Send messages
  for (let i = 0; i < 5; i++) {
    stream.write({
      type: 'message',
      content: `Message ${i}`,
      context: { user_id: 'user123' },
      stream_id: `stream_${Date.now()}`,
      timestamp: { seconds: Math.floor(Date.now() / 1000) }
    });
  }
  
  stream.end();
}
// Usage
checkHealth();
invokeAgent('Hello, agent!', { user_id: 'user123' });
streamCommunication();
Go Client
package main
import (
    "context"
    "log"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/protobuf/types/known/timestamppb"
    
    pb "your-project/pixell_runtime/proto"
)
func main() {
    // Create connection
    conn, err := grpc.Dial("localhost:8081", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()
    
    client := pb.NewAgentServiceClient(conn)
    
    // Health check
    healthResp, err := client.Health(context.Background(), &pb.HealthRequest{
        AgentId:   "agent-1",
        Timestamp: timestamppb.Now(),
    })
    if err != nil {
        log.Fatalf("Health check failed: %v", err)
    }
    log.Printf("Agent status: %s", healthResp.Status)
    
    // Invoke agent
    invokeResp, err := client.Invoke(context.Background(), &pb.InvokeRequest{
        Message:   "Hello, agent!",
        Context:   map[string]string{"user_id": "user123"},
        RequestId: "req_" + string(time.Now().Unix()),
        Timestamp: timestamppb.Now(),
    })
    if err != nil {
        log.Fatalf("Invoke failed: %v", err)
    }
    log.Printf("Agent response: %s", invokeResp.Response)
}
Server Implementation
Python Server
import grpc
from concurrent import futures
from pixell_runtime.proto import agent_pb2, agent_pb2_grpc
from google.protobuf.timestamp_pb2 import Timestamp
import time
class AgentServiceServicer(agent_pb2_grpc.AgentServiceServicer):
    def __init__(self, agent):
        self.agent = agent
    
    def Health(self, request, context):
        """Health check endpoint"""
        return agent_pb2.HealthResponse(
            status="healthy",
            message="Agent is running normally",
            timestamp=Timestamp(seconds=int(time.time())),
            metadata={
                "uptime": "00:05:23",
                "memory_usage": "128MB",
                "request_count": "42"
            }
        )
    
    def DescribeCapabilities(self, request, context):
        """Describe agent capabilities"""
        return agent_pb2.DescribeCapabilitiesResponse(
            capabilities=["greeting", "conversation", "personalization"],
            description="A friendly greeting agent",
            version="0.1.0",
            metadata={
                "max_message_length": "1000",
                "supported_languages": "en,es,fr",
                "response_time": "0.123s"
            }
        )
    
    def Invoke(self, request, context):
        """Invoke agent with message"""
        try:
            # Process the message
            response = self.agent.process_message(
                request.message, 
                dict(request.context)
            )
            
            return agent_pb2.InvokeResponse(
                response=response,
                success=True,
                metadata={
                    "agent": "greeting-agent",
                    "version": "0.1.0",
                    "processing_time": "0.123s"
                },
                request_id=request.request_id,
                timestamp=Timestamp(seconds=int(time.time()))
            )
        except Exception as e:
            return agent_pb2.InvokeResponse(
                response="",
                success=False,
                error_message=str(e),
                request_id=request.request_id,
                timestamp=Timestamp(seconds=int(time.time()))
            )
    
    def Ping(self, request, context):
        """Ping endpoint"""
        return agent_pb2.PingResponse(
            message="pong",
            timestamp=Timestamp(seconds=int(time.time())),
            latency_ms=5
        )
    
    def Stream(self, request_iterator, context):
        """Stream communication"""
        for request in request_iterator:
            # Process stream request
            response = self.agent.process_stream_message(
                request.content,
                dict(request.context)
            )
            
            yield agent_pb2.StreamResponse(
                type="message",
                content=response,
                metadata={"agent": "greeting-agent"},
                stream_id=request.stream_id,
                timestamp=Timestamp(seconds=int(time.time()))
            )
def serve():
    """Start gRPC server"""
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    
    # Add service
    agent_pb2_grpc.add_AgentServiceServicer_to_server(
        AgentServiceServicer(agent), server
    )
    
    # Start server
    server.add_insecure_port('[::]:8081')
    server.start()
    print("gRPC server started on port 8081")
    
    try:
        server.wait_for_termination()
    except KeyboardInterrupt:
        server.stop(0)
if __name__ == "__main__":
    serve()
Error Handling
gRPC Status Codes
| Code | Description | 
|---|---|
OK | Success | 
INVALID_ARGUMENT | Invalid request parameters | 
UNAUTHENTICATED | Authentication required | 
PERMISSION_DENIED | Insufficient permissions | 
NOT_FOUND | Agent not found | 
UNAVAILABLE | Service unavailable | 
INTERNAL | Internal server error | 
Error Examples
Invalid Argument
{
  "code": "INVALID_ARGUMENT",
  "message": "Invalid request parameters",
  "details": "Missing required field: 'message'"
}
Service Unavailable
{
  "code": "UNAVAILABLE",
  "message": "Service temporarily unavailable",
  "details": "Agent is overloaded, try again later"
}
Security
TLS Configuration
# Server with TLS
server_credentials = grpc.ssl_server_credentials([
    (private_key, certificate_chain)
])
server.add_secure_port('[::]:8081', server_credentials)
# Client with TLS
channel_credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel('localhost:8081', channel_credentials)
Authentication
# Server with authentication
def authenticate(context):
    metadata = context.invocation_metadata()
    for key, value in metadata:
        if key == 'authorization':
            # Validate JWT token
            return validate_jwt_token(value)
    return False
# Client with authentication
metadata = [('authorization', 'Bearer your-jwt-token')]
response = stub.Invoke(request, metadata=metadata)
Performance Optimization
Connection Pooling
# Reuse connections
channel = grpc.insecure_channel('localhost:8081')
stub = agent_pb2_grpc.AgentServiceStub(channel)
# Keep connection alive
channel.get_state()
Streaming Optimization
# Batch requests for better performance
def batch_invoke(messages):
    with stub.Stream() as stream:
        for message in messages:
            stream.send(message)
        stream.close()
        return stream.recv_all()
Testing
Health Check Test
# Test with grpcurl
grpcurl -plaintext localhost:8081 pixell.runtime.AgentService/Health
# Test with custom request
grpcurl -plaintext -d '{"agent_id": "agent-1"}' \
  localhost:8081 pixell.runtime.AgentService/Health
Invoke Test
# Test agent invocation
grpcurl -plaintext -d '{"message": "Hello, agent!"}' \
  localhost:8081 pixell.runtime.AgentService/Invoke
Stream Test
# Test streaming
grpcurl -plaintext -d '{"type": "message", "content": "Hello"}' \
  localhost:8081 pixell.runtime.AgentService/Stream
Next Steps
After exploring the gRPC interface:
- Deployment - Deploy to production
 - Configuration - Advanced configuration options
 - REST Endpoints - HTTP API reference
 
Ready to deploy? Check out Deployment to learn about production deployment!