Skip to main content

gRPC Interface

Complete gRPC API reference for Agent-to-Agent (A2A) communication in the Pixell Agent Runtime (PAR). This interface enables direct agent-to-agent communication using gRPC and Protocol Buffers.

Overview

The gRPC interface provides high-performance, type-safe communication between agents using Protocol Buffers for serialization and HTTP/2 for transport.

Key Features

  • 🔗 High Performance - Binary protocol with HTTP/2 multiplexing
  • 🛡️ Type Safety - Protocol Buffer schema validation
  • 🌐 Language Agnostic - Works with any gRPC-supported language
  • 🔄 Bidirectional Streaming - Real-time agent communication
  • 🔐 Built-in Security - TLS encryption and authentication

Service Definition

Agent Service

syntax = "proto3";

package pixell.runtime;

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";

// Main agent service for A2A communication
service AgentService {
// Health check endpoint
rpc Health(HealthRequest) returns (HealthResponse);

// Describe agent capabilities
rpc DescribeCapabilities(DescribeCapabilitiesRequest) returns (DescribeCapabilitiesResponse);

// Invoke agent with message
rpc Invoke(InvokeRequest) returns (InvokeResponse);

// Ping for connectivity testing
rpc Ping(PingRequest) returns (PingResponse);

// Stream for real-time communication
rpc Stream(stream StreamRequest) returns (stream StreamResponse);
}

// Health check request
message HealthRequest {
string agent_id = 1;
google.protobuf.Timestamp timestamp = 2;
}

// Health check response
message HealthResponse {
string status = 1; // "healthy", "unhealthy", "degraded"
string message = 2;
google.protobuf.Timestamp timestamp = 3;
map<string, string> metadata = 4;
}

// Describe capabilities request
message DescribeCapabilitiesRequest {
string agent_id = 1;
bool include_details = 2;
}

// Describe capabilities response
message DescribeCapabilitiesResponse {
repeated string capabilities = 1;
string description = 2;
string version = 3;
map<string, string> metadata = 4;
}

// Invoke request
message InvokeRequest {
string message = 1;
map<string, string> context = 2;
google.protobuf.Any payload = 3;
string request_id = 4;
google.protobuf.Timestamp timestamp = 5;
}

// Invoke response
message InvokeResponse {
string response = 1;
bool success = 2;
string error_message = 3;
map<string, string> metadata = 4;
google.protobuf.Any payload = 5;
string request_id = 6;
google.protobuf.Timestamp timestamp = 7;
}

// Ping request
message PingRequest {
string message = 1;
google.protobuf.Timestamp timestamp = 2;
}

// Ping response
message PingResponse {
string message = 1;
google.protobuf.Timestamp timestamp = 2;
int64 latency_ms = 3;
}

// Stream request
message StreamRequest {
string type = 1; // "message", "event", "heartbeat"
string content = 2;
map<string, string> context = 3;
google.protobuf.Any payload = 4;
string stream_id = 5;
google.protobuf.Timestamp timestamp = 6;
}

// Stream response
message StreamResponse {
string type = 1; // "message", "event", "heartbeat"
string content = 2;
map<string, string> metadata = 3;
google.protobuf.Any payload = 4;
string stream_id = 5;
google.protobuf.Timestamp timestamp = 6;
}

Connection Details

Endpoint

localhost:8081

Protocol

  • Transport: HTTP/2
  • Serialization: Protocol Buffers
  • Authentication: TLS with client certificates (optional)

Service Methods

Health Check

Check if an agent is healthy and responsive.

rpc Health(HealthRequest) returns (HealthResponse);

Request:

{
"agent_id": "agent-1",
"timestamp": "2024-01-15T10:30:00Z"
}

Response:

{
"status": "healthy",
"message": "Agent is running normally",
"timestamp": "2024-01-15T10:30:00Z",
"metadata": {
"uptime": "00:05:23",
"memory_usage": "128MB",
"request_count": "42"
}
}

Describe Capabilities

Get information about agent capabilities and features.

rpc DescribeCapabilities(DescribeCapabilitiesRequest) returns (DescribeCapabilitiesResponse);

Request:

{
"agent_id": "agent-1",
"include_details": true
}

Response:

{
"capabilities": [
"greeting",
"conversation",
"personalization"
],
"description": "A friendly greeting agent",
"version": "0.1.0",
"metadata": {
"max_message_length": "1000",
"supported_languages": "en,es,fr",
"response_time": "0.123s"
}
}

Invoke Agent

Send a message to an agent and get a response.

rpc Invoke(InvokeRequest) returns (InvokeResponse);

Request:

{
"message": "Hello, agent!",
"context": {
"user_id": "user123",
"session_id": "session456",
"language": "en"
},
"request_id": "req_123456789",
"timestamp": "2024-01-15T10:30:00Z"
}

Response:

{
"response": "Hello! How can I help you today?",
"success": true,
"metadata": {
"agent": "greeting-agent",
"version": "0.1.0",
"processing_time": "0.123s"
},
"request_id": "req_123456789",
"timestamp": "2024-01-15T10:30:00Z"
}

Ping

Test connectivity and measure latency.

rpc Ping(PingRequest) returns (PingResponse);

Request:

{
"message": "ping",
"timestamp": "2024-01-15T10:30:00Z"
}

Response:

{
"message": "pong",
"timestamp": "2024-01-15T10:30:00Z",
"latency_ms": 5
}

Stream Communication

Real-time bidirectional communication.

rpc Stream(stream StreamRequest) returns (stream StreamResponse);

Request Stream:

{
"type": "message",
"content": "Hello, agent!",
"context": {
"user_id": "user123"
},
"stream_id": "stream_123456789",
"timestamp": "2024-01-15T10:30:00Z"
}

Response Stream:

{
"type": "message",
"content": "Hello! How can I help you?",
"metadata": {
"agent": "greeting-agent",
"confidence": "0.95"
},
"stream_id": "stream_123456789",
"timestamp": "2024-01-15T10:30:00Z"
}

Client Implementation

Python Client

import grpc
from pixell_runtime.proto import agent_pb2, agent_pb2_grpc
from google.protobuf.timestamp_pb2 import Timestamp
import time

# Create gRPC channel
channel = grpc.insecure_channel('localhost:8081')
stub = agent_pb2_grpc.AgentServiceStub(channel)

# Health check
def check_health():
request = agent_pb2.HealthRequest(
agent_id="agent-1",
timestamp=Timestamp(seconds=int(time.time()))
)
response = stub.Health(request)
print(f"Agent status: {response.status}")
return response.status == "healthy"

# Describe capabilities
def get_capabilities():
request = agent_pb2.DescribeCapabilitiesRequest(
agent_id="agent-1",
include_details=True
)
response = stub.DescribeCapabilities(request)
print(f"Capabilities: {response.capabilities}")
return response

# Invoke agent
def invoke_agent(message, context=None):
request = agent_pb2.InvokeRequest(
message=message,
context=context or {},
request_id=f"req_{int(time.time())}",
timestamp=Timestamp(seconds=int(time.time()))
)
response = stub.Invoke(request)
print(f"Agent response: {response.response}")
return response

# Stream communication
def stream_communication():
def request_generator():
for i in range(5):
yield agent_pb2.StreamRequest(
type="message",
content=f"Message {i}",
context={"user_id": "user123"},
stream_id=f"stream_{int(time.time())}",
timestamp=Timestamp(seconds=int(time.time()))
)
time.sleep(1)

responses = stub.Stream(request_generator())
for response in responses:
print(f"Stream response: {response.content}")

# Usage
if __name__ == "__main__":
# Check health
if check_health():
# Get capabilities
get_capabilities()

# Invoke agent
invoke_agent("Hello, agent!", {"user_id": "user123"})

# Stream communication
stream_communication()

JavaScript/Node.js Client

const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');

// Load proto file
const packageDefinition = protoLoader.loadSync('agent.proto', {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});

const agentProto = grpc.loadPackageDefinition(packageDefinition).pixell.runtime;

// Create client
const client = new agentProto.AgentService('localhost:8081',
grpc.credentials.createInsecure());

// Health check
function checkHealth() {
const request = {
agent_id: 'agent-1',
timestamp: { seconds: Math.floor(Date.now() / 1000) }
};

client.Health(request, (error, response) => {
if (error) {
console.error('Health check failed:', error);
} else {
console.log('Agent status:', response.status);
}
});
}

// Invoke agent
function invokeAgent(message, context = {}) {
const request = {
message: message,
context: context,
request_id: `req_${Date.now()}`,
timestamp: { seconds: Math.floor(Date.now() / 1000) }
};

client.Invoke(request, (error, response) => {
if (error) {
console.error('Invoke failed:', error);
} else {
console.log('Agent response:', response.response);
}
});
}

// Stream communication
function streamCommunication() {
const stream = client.Stream();

stream.on('data', (response) => {
console.log('Stream response:', response.content);
});

stream.on('end', () => {
console.log('Stream ended');
});

// Send messages
for (let i = 0; i < 5; i++) {
stream.write({
type: 'message',
content: `Message ${i}`,
context: { user_id: 'user123' },
stream_id: `stream_${Date.now()}`,
timestamp: { seconds: Math.floor(Date.now() / 1000) }
});
}

stream.end();
}

// Usage
checkHealth();
invokeAgent('Hello, agent!', { user_id: 'user123' });
streamCommunication();

Go Client

package main

import (
"context"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/timestamppb"

pb "your-project/pixell_runtime/proto"
)

func main() {
// Create connection
conn, err := grpc.Dial("localhost:8081", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

client := pb.NewAgentServiceClient(conn)

// Health check
healthResp, err := client.Health(context.Background(), &pb.HealthRequest{
AgentId: "agent-1",
Timestamp: timestamppb.Now(),
})
if err != nil {
log.Fatalf("Health check failed: %v", err)
}
log.Printf("Agent status: %s", healthResp.Status)

// Invoke agent
invokeResp, err := client.Invoke(context.Background(), &pb.InvokeRequest{
Message: "Hello, agent!",
Context: map[string]string{"user_id": "user123"},
RequestId: "req_" + string(time.Now().Unix()),
Timestamp: timestamppb.Now(),
})
if err != nil {
log.Fatalf("Invoke failed: %v", err)
}
log.Printf("Agent response: %s", invokeResp.Response)
}

Server Implementation

Python Server

import grpc
from concurrent import futures
from pixell_runtime.proto import agent_pb2, agent_pb2_grpc
from google.protobuf.timestamp_pb2 import Timestamp
import time

class AgentServiceServicer(agent_pb2_grpc.AgentServiceServicer):
def __init__(self, agent):
self.agent = agent

def Health(self, request, context):
"""Health check endpoint"""
return agent_pb2.HealthResponse(
status="healthy",
message="Agent is running normally",
timestamp=Timestamp(seconds=int(time.time())),
metadata={
"uptime": "00:05:23",
"memory_usage": "128MB",
"request_count": "42"
}
)

def DescribeCapabilities(self, request, context):
"""Describe agent capabilities"""
return agent_pb2.DescribeCapabilitiesResponse(
capabilities=["greeting", "conversation", "personalization"],
description="A friendly greeting agent",
version="0.1.0",
metadata={
"max_message_length": "1000",
"supported_languages": "en,es,fr",
"response_time": "0.123s"
}
)

def Invoke(self, request, context):
"""Invoke agent with message"""
try:
# Process the message
response = self.agent.process_message(
request.message,
dict(request.context)
)

return agent_pb2.InvokeResponse(
response=response,
success=True,
metadata={
"agent": "greeting-agent",
"version": "0.1.0",
"processing_time": "0.123s"
},
request_id=request.request_id,
timestamp=Timestamp(seconds=int(time.time()))
)
except Exception as e:
return agent_pb2.InvokeResponse(
response="",
success=False,
error_message=str(e),
request_id=request.request_id,
timestamp=Timestamp(seconds=int(time.time()))
)

def Ping(self, request, context):
"""Ping endpoint"""
return agent_pb2.PingResponse(
message="pong",
timestamp=Timestamp(seconds=int(time.time())),
latency_ms=5
)

def Stream(self, request_iterator, context):
"""Stream communication"""
for request in request_iterator:
# Process stream request
response = self.agent.process_stream_message(
request.content,
dict(request.context)
)

yield agent_pb2.StreamResponse(
type="message",
content=response,
metadata={"agent": "greeting-agent"},
stream_id=request.stream_id,
timestamp=Timestamp(seconds=int(time.time()))
)

def serve():
"""Start gRPC server"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

# Add service
agent_pb2_grpc.add_AgentServiceServicer_to_server(
AgentServiceServicer(agent), server
)

# Start server
server.add_insecure_port('[::]:8081')
server.start()
print("gRPC server started on port 8081")

try:
server.wait_for_termination()
except KeyboardInterrupt:
server.stop(0)

if __name__ == "__main__":
serve()

Error Handling

gRPC Status Codes

CodeDescription
OKSuccess
INVALID_ARGUMENTInvalid request parameters
UNAUTHENTICATEDAuthentication required
PERMISSION_DENIEDInsufficient permissions
NOT_FOUNDAgent not found
UNAVAILABLEService unavailable
INTERNALInternal server error

Error Examples

Invalid Argument

{
"code": "INVALID_ARGUMENT",
"message": "Invalid request parameters",
"details": "Missing required field: 'message'"
}

Service Unavailable

{
"code": "UNAVAILABLE",
"message": "Service temporarily unavailable",
"details": "Agent is overloaded, try again later"
}

Security

TLS Configuration

# Server with TLS
server_credentials = grpc.ssl_server_credentials([
(private_key, certificate_chain)
])
server.add_secure_port('[::]:8081', server_credentials)

# Client with TLS
channel_credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel('localhost:8081', channel_credentials)

Authentication

# Server with authentication
def authenticate(context):
metadata = context.invocation_metadata()
for key, value in metadata:
if key == 'authorization':
# Validate JWT token
return validate_jwt_token(value)
return False

# Client with authentication
metadata = [('authorization', 'Bearer your-jwt-token')]
response = stub.Invoke(request, metadata=metadata)

Performance Optimization

Connection Pooling

# Reuse connections
channel = grpc.insecure_channel('localhost:8081')
stub = agent_pb2_grpc.AgentServiceStub(channel)

# Keep connection alive
channel.get_state()

Streaming Optimization

# Batch requests for better performance
def batch_invoke(messages):
with stub.Stream() as stream:
for message in messages:
stream.send(message)
stream.close()
return stream.recv_all()

Testing

Health Check Test

# Test with grpcurl
grpcurl -plaintext localhost:8081 pixell.runtime.AgentService/Health

# Test with custom request
grpcurl -plaintext -d '{"agent_id": "agent-1"}' \
localhost:8081 pixell.runtime.AgentService/Health

Invoke Test

# Test agent invocation
grpcurl -plaintext -d '{"message": "Hello, agent!"}' \
localhost:8081 pixell.runtime.AgentService/Invoke

Stream Test

# Test streaming
grpcurl -plaintext -d '{"type": "message", "content": "Hello"}' \
localhost:8081 pixell.runtime.AgentService/Stream

Next Steps

After exploring the gRPC interface:

  1. Deployment - Deploy to production
  2. Configuration - Advanced configuration options
  3. REST Endpoints - HTTP API reference

Ready to deploy? Check out Deployment to learn about production deployment!