Skip to main content

Best Practices

Essential best practices for developing, deploying, and maintaining Pixell agents in production environments.

Note: This documentation is generated by AI based on the source code, and therefore it may have some incorrect knowledge of the project. In that case, please contact engineering@pixell.global

Development Best Practices

1. Code Organization

Structure your agent code for maintainability:

# src/
├── main.py # Entry point
├── services/ # Business logic
│ ├── data_processor.py
│ └── report_generator.py
├── api/ # REST endpoints
│ ├── routes.py
│ └── middleware.py
├── a2a/ # Agent-to-agent communication
│ └── service.py
├── config/ # Configuration
│ └── settings.py
└── tests/ # Test files
├── test_main.py
└── test_services.py

Key principles:

  • Separate concerns (business logic, API, configuration)
  • Use dependency injection
  • Implement proper error handling
  • Write comprehensive tests

2. Error Handling

Implement robust error handling:

import logging
from typing import Dict, Any, Optional
from fastapi import HTTPException

class AgentError(Exception):
"""Base exception for agent errors"""
def __init__(self, message: str, error_code: str = None):
self.message = message
self.error_code = error_code
super().__init__(self.message)

class ValidationError(AgentError):
"""Validation error"""
pass

class ProcessingError(AgentError):
"""Processing error"""
pass

def process_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Process request with proper error handling"""
try:
# Validate input
if not validate_request(request):
raise ValidationError("Invalid request format")

# Process request
result = perform_processing(request)
return {"success": True, "result": result}

except ValidationError as e:
logging.warning(f"Validation error: {e.message}")
raise HTTPException(status_code=400, detail=e.message)
except ProcessingError as e:
logging.error(f"Processing error: {e.message}")
raise HTTPException(status_code=500, detail="Internal processing error")
except Exception as e:
logging.error(f"Unexpected error: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")

def validate_request(request: Dict[str, Any]) -> bool:
"""Validate request structure"""
required_fields = ["type", "data"]
return all(field in request for field in required_fields)

3. Logging and Monitoring

Implement comprehensive logging:

import logging
import json
from datetime import datetime
from typing import Dict, Any

# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agent.log'),
logging.StreamHandler()
]
)

logger = logging.getLogger(__name__)

class AgentLogger:
"""Structured logging for agents"""

@staticmethod
def log_request(request_id: str, request: Dict[str, Any]):
"""Log incoming request"""
logger.info(json.dumps({
"event": "request_received",
"request_id": request_id,
"type": request.get("type"),
"timestamp": datetime.utcnow().isoformat()
}))

@staticmethod
def log_response(request_id: str, response: Dict[str, Any], duration: float):
"""Log response"""
logger.info(json.dumps({
"event": "response_sent",
"request_id": request_id,
"success": response.get("success", False),
"duration_ms": duration * 1000,
"timestamp": datetime.utcnow().isoformat()
}))

@staticmethod
def log_error(request_id: str, error: str, error_type: str):
"""Log error"""
logger.error(json.dumps({
"event": "error_occurred",
"request_id": request_id,
"error": error,
"error_type": error_type,
"timestamp": datetime.utcnow().isoformat()
}))

Security Best Practices

1. Authentication and Authorization

Implement proper security:

from fastapi import HTTPException, Depends, Header
from typing import Optional
import jwt
import os

class SecurityManager:
"""Handle authentication and authorization"""

def __init__(self):
self.jwt_secret = os.getenv("JWT_SECRET")
self.api_key = os.getenv("API_KEY")

def verify_api_key(self, api_key: Optional[str] = Header(None)) -> bool:
"""Verify API key"""
if not api_key:
raise HTTPException(status_code=401, detail="API key required")

if api_key != self.api_key:
raise HTTPException(status_code=401, detail="Invalid API key")

return True

def verify_jwt_token(self, token: Optional[str] = Header(None)) -> Dict[str, Any]:
"""Verify JWT token"""
if not token:
raise HTTPException(status_code=401, detail="Token required")

try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")

# Usage in endpoints
@app.post("/secure-endpoint")
async def secure_endpoint(
request: Dict[str, Any],
api_key: str = Depends(security_manager.verify_api_key),
user: Dict[str, Any] = Depends(security_manager.verify_jwt_token)
):
"""Secure endpoint with authentication"""
# Process request
pass

2. Input Validation

Validate all inputs:

from pydantic import BaseModel, validator
from typing import Dict, Any, List
import re

class ProcessRequest(BaseModel):
"""Validated request model"""
type: str
data: Dict[str, Any]
priority: str = "normal"

@validator('type')
def validate_type(cls, v):
allowed_types = ["analysis", "report", "process"]
if v not in allowed_types:
raise ValueError(f"Type must be one of {allowed_types}")
return v

@validator('priority')
def validate_priority(cls, v):
allowed_priorities = ["low", "normal", "high", "critical"]
if v not in allowed_priorities:
raise ValueError(f"Priority must be one of {allowed_priorities}")
return v

@validator('data')
def validate_data(cls, v):
if not isinstance(v, dict):
raise ValueError("Data must be a dictionary")

# Check for malicious content
for key, value in v.items():
if isinstance(value, str):
# Check for SQL injection patterns
if re.search(r'(union|select|insert|delete|update|drop)', value.lower()):
raise ValueError("Potentially malicious input detected")

return v

3. Rate Limiting

Implement rate limiting:

from fastapi import Request, HTTPException
from collections import defaultdict, deque
import time
from typing import Dict, Deque

class RateLimiter:
"""Simple rate limiter"""

def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: Dict[str, Deque[float]] = defaultdict(lambda: deque())

def is_allowed(self, client_ip: str) -> bool:
"""Check if request is allowed"""
now = time.time()
client_requests = self.requests[client_ip]

# Remove old requests outside window
while client_requests and client_requests[0] <= now - self.window_seconds:
client_requests.popleft()

# Check if under limit
if len(client_requests) >= self.max_requests:
return False

# Add current request
client_requests.append(now)
return True

# Usage
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
"""Rate limiting middleware"""
client_ip = request.client.host

if not rate_limiter.is_allowed(client_ip):
raise HTTPException(status_code=429, detail="Rate limit exceeded")

response = await call_next(request)
return response

Performance Best Practices

1. Async Programming

Use async/await for better performance:

import asyncio
import aiohttp
from typing import List, Dict, Any
import time

class AsyncProcessor:
"""Async request processor"""

def __init__(self):
self.session = None

async def __aenter__(self):
"""Async context manager entry"""
self.session = aiohttp.ClientSession()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.close()

async def process_requests(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process multiple requests concurrently"""
tasks = [self.process_single_request(req) for req in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)

# Handle exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"success": False,
"error": str(result),
"request_id": requests[i].get("id")
})
else:
processed_results.append(result)

return processed_results

async def process_single_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single request"""
# Simulate processing
await asyncio.sleep(0.1)

return {
"success": True,
"result": f"Processed {request.get('type')}",
"request_id": request.get("id")
}

# Usage
async def main():
requests = [
{"id": "1", "type": "analysis"},
{"id": "2", "type": "report"},
{"id": "3", "type": "process"}
]

async with AsyncProcessor() as processor:
results = await processor.process_requests(requests)
print(results)

2. Caching

Implement caching for better performance:

import redis
import json
from typing import Any, Optional
import hashlib

class CacheManager:
"""Redis-based caching"""

def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = 300 # 5 minutes

def _generate_key(self, prefix: str, data: Any) -> str:
"""Generate cache key"""
data_str = json.dumps(data, sort_keys=True)
hash_key = hashlib.md5(data_str.encode()).hexdigest()
return f"{prefix}:{hash_key}"

async def get(self, key: str) -> Optional[Any]:
"""Get cached value"""
try:
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
except Exception as e:
print(f"Cache get error: {e}")
return None

async def set(self, key: str, value: Any, ttl: int = None) -> bool:
"""Set cached value"""
try:
ttl = ttl or self.default_ttl
self.redis_client.setex(key, ttl, json.dumps(value))
return True
except Exception as e:
print(f"Cache set error: {e}")
return False

async def get_or_set(self, key: str, factory_func, ttl: int = None) -> Any:
"""Get from cache or set using factory function"""
cached = await self.get(key)
if cached is not None:
return cached

value = await factory_func()
await self.set(key, value, ttl)
return value

# Usage
cache = CacheManager()

async def get_expensive_data(query: str) -> Dict[str, Any]:
"""Get expensive data with caching"""
cache_key = cache._generate_key("query", query)

return await cache.get_or_set(
cache_key,
lambda: perform_expensive_query(query),
ttl=600 # 10 minutes
)

3. Connection Pooling

Use connection pooling for external services:

import aiohttp
import asyncio
from typing import Dict, Any

class ConnectionPool:
"""Connection pool for external services"""

def __init__(self, max_connections: int = 100):
self.max_connections = max_connections
self.session = None
self.connector = None

async def __aenter__(self):
"""Async context manager entry"""
self.connector = aiohttp.TCPConnector(
limit=self.max_connections,
limit_per_host=30,
keepalive_timeout=30,
enable_cleanup_closed=True
)
self.session = aiohttp.ClientSession(connector=self.connector)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.close()
if self.connector:
await self.connector.close()

async def make_request(self, url: str, method: str = "GET", **kwargs) -> Dict[str, Any]:
"""Make HTTP request using connection pool"""
async with self.session.request(method, url, **kwargs) as response:
return await response.json()

# Usage
async def fetch_data():
"""Fetch data using connection pool"""
async with ConnectionPool(max_connections=50) as pool:
tasks = [
pool.make_request(f"https://api.example.com/data/{i}")
for i in range(100)
]
results = await asyncio.gather(*tasks)
return results

Deployment Best Practices

1. Configuration Management

Use environment-based configuration:

import os
from typing import Dict, Any, Optional
from pydantic import BaseSettings

class Settings(BaseSettings):
"""Application settings"""

# Environment
environment: str = "development"
debug: bool = False

# Server
host: str = "0.0.0.0"
port: int = 8080

# Database
database_url: str
database_pool_size: int = 10

# Redis
redis_url: str = "redis://localhost:6379"

# Security
api_key: str
jwt_secret: str

# Logging
log_level: str = "INFO"
log_format: str = "json"

class Config:
env_file = ".env"
case_sensitive = False

# Environment-specific settings
class DevelopmentSettings(Settings):
debug: bool = True
log_level: str = "DEBUG"

class ProductionSettings(Settings):
debug: bool = False
log_level: str = "INFO"

def get_settings() -> Settings:
"""Get settings based on environment"""
env = os.getenv("ENVIRONMENT", "development")

if env == "production":
return ProductionSettings()
else:
return DevelopmentSettings()

2. Health Checks

Implement comprehensive health checks:

from fastapi import APIRouter, HTTPException
from typing import Dict, Any
import asyncio
import time

class HealthChecker:
"""Health check manager"""

def __init__(self):
self.start_time = time.time()
self.checks = {}

def register_check(self, name: str, check_func):
"""Register a health check"""
self.checks[name] = check_func

async def run_checks(self) -> Dict[str, Any]:
"""Run all health checks"""
results = {}

for name, check_func in self.checks.items():
try:
start = time.time()
result = await check_func()
duration = time.time() - start

results[name] = {
"status": "healthy",
"duration": duration,
"details": result
}
except Exception as e:
results[name] = {
"status": "unhealthy",
"error": str(e)
}

return results

# Health check functions
async def check_database():
"""Check database connectivity"""
# Implement database check
return {"connected": True}

async def check_redis():
"""Check Redis connectivity"""
# Implement Redis check
return {"connected": True}

async def check_external_api():
"""Check external API connectivity"""
# Implement external API check
return {"connected": True}

# Register checks
health_checker = HealthChecker()
health_checker.register_check("database", check_database)
health_checker.register_check("redis", check_redis)
health_checker.register_check("external_api", check_external_api)

# Health check endpoints
@app.get("/health")
async def health_check():
"""Basic health check"""
return {"status": "healthy", "timestamp": time.time()}

@app.get("/health/detailed")
async def detailed_health_check():
"""Detailed health check"""
checks = await health_checker.run_checks()

all_healthy = all(
check["status"] == "healthy"
for check in checks.values()
)

status_code = 200 if all_healthy else 503

return {
"status": "healthy" if all_healthy else "unhealthy",
"checks": checks,
"uptime": time.time() - health_checker.start_time
}

3. Graceful Shutdown

Implement graceful shutdown:

import signal
import asyncio
from typing import List, Callable
import logging

class GracefulShutdown:
"""Handle graceful shutdown"""

def __init__(self):
self.shutdown_handlers: List[Callable] = []
self.is_shutting_down = False

def register_handler(self, handler: Callable):
"""Register shutdown handler"""
self.shutdown_handlers.append(handler)

async def shutdown(self):
"""Execute shutdown handlers"""
if self.is_shutting_down:
return

self.is_shutting_down = True
logging.info("Starting graceful shutdown...")

for handler in self.shutdown_handlers:
try:
await handler()
except Exception as e:
logging.error(f"Error in shutdown handler: {e}")

logging.info("Graceful shutdown completed")

# Global shutdown manager
shutdown_manager = GracefulShutdown()

# Shutdown handlers
async def close_database():
"""Close database connections"""
logging.info("Closing database connections...")
# Implement database cleanup

async def close_redis():
"""Close Redis connections"""
logging.info("Closing Redis connections...")
# Implement Redis cleanup

async def finish_requests():
"""Finish pending requests"""
logging.info("Finishing pending requests...")
# Implement request cleanup

# Register handlers
shutdown_manager.register_handler(close_database)
shutdown_manager.register_handler(close_redis)
shutdown_manager.register_handler(finish_requests)

# Signal handlers
def signal_handler(signum, frame):
"""Handle shutdown signals"""
logging.info(f"Received signal {signum}")
asyncio.create_task(shutdown_manager.shutdown())

signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)

Monitoring Best Practices

1. Metrics Collection

Collect relevant metrics:

import time
from typing import Dict, Any
from collections import defaultdict, deque
import threading

class MetricsCollector:
"""Collect and store metrics"""

def __init__(self):
self.metrics = defaultdict(list)
self.lock = threading.Lock()

def increment_counter(self, name: str, value: int = 1):
"""Increment a counter metric"""
with self.lock:
self.metrics[f"{name}_total"].append({
"timestamp": time.time(),
"value": value
})

def record_histogram(self, name: str, value: float):
"""Record a histogram value"""
with self.lock:
self.metrics[f"{name}_histogram"].append({
"timestamp": time.time(),
"value": value
})

def get_metrics(self) -> Dict[str, Any]:
"""Get current metrics"""
with self.lock:
return dict(self.metrics)

# Global metrics collector
metrics = MetricsCollector()

# Decorator for automatic metrics
def track_metrics(func):
"""Decorator to track function metrics"""
async def wrapper(*args, **kwargs):
start_time = time.time()

try:
result = await func(*args, **kwargs)
duration = time.time() - start_time

# Record success metrics
metrics.increment_counter("requests_total")
metrics.record_histogram("request_duration", duration)

return result
except Exception as e:
# Record error metrics
metrics.increment_counter("errors_total")
metrics.increment_counter(f"errors_{type(e).__name__}")
raise

return wrapper

# Usage
@track_metrics
async def process_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Process request with metrics tracking"""
# Process request
return {"success": True}

2. Alerting

Set up alerting for critical issues:

import asyncio
import aiohttp
from typing import Dict, Any, List
import json

class AlertManager:
"""Manage alerts and notifications"""

def __init__(self, webhook_url: str = None):
self.webhook_url = webhook_url
self.alert_rules = []

def add_rule(self, name: str, condition: callable, severity: str = "warning"):
"""Add alert rule"""
self.alert_rules.append({
"name": name,
"condition": condition,
"severity": severity
})

async def check_alerts(self, metrics: Dict[str, Any]):
"""Check all alert rules"""
for rule in self.alert_rules:
try:
if rule["condition"](metrics):
await self.send_alert(rule["name"], rule["severity"], metrics)
except Exception as e:
print(f"Error checking alert {rule['name']}: {e}")

async def send_alert(self, name: str, severity: str, metrics: Dict[str, Any]):
"""Send alert notification"""
alert_data = {
"alert": name,
"severity": severity,
"timestamp": time.time(),
"metrics": metrics
}

if self.webhook_url:
async with aiohttp.ClientSession() as session:
await session.post(self.webhook_url, json=alert_data)

# Alert rules
def high_error_rate(metrics: Dict[str, Any]) -> bool:
"""Check for high error rate"""
errors = metrics.get("errors_total", [])
total = metrics.get("requests_total", [])

if not total:
return False

error_rate = len(errors) / len(total)
return error_rate > 0.1 # 10% error rate

def high_response_time(metrics: Dict[str, Any]) -> bool:
"""Check for high response time"""
durations = metrics.get("request_duration_histogram", [])
if not durations:
return False

avg_duration = sum(d["value"] for d in durations) / len(durations)
return avg_duration > 5.0 # 5 seconds

# Setup alerts
alert_manager = AlertManager(webhook_url="https://hooks.slack.com/your-webhook")
alert_manager.add_rule("high_error_rate", high_error_rate, "critical")
alert_manager.add_rule("high_response_time", high_response_time, "warning")

Testing Best Practices

1. Unit Testing

Write comprehensive unit tests:

import pytest
import asyncio
from unittest.mock import Mock, patch
from your_agent import AgentProcessor, ValidationError

class TestAgentProcessor:
"""Test agent processor"""

@pytest.fixture
def processor(self):
"""Create processor instance"""
return AgentProcessor()

@pytest.fixture
def valid_request(self):
"""Valid request fixture"""
return {
"type": "analysis",
"data": {"dataset": "test_data"}
}

@pytest.fixture
def invalid_request(self):
"""Invalid request fixture"""
return {
"type": "invalid_type",
"data": {}
}

def test_validate_request_valid(self, processor, valid_request):
"""Test valid request validation"""
assert processor.validate_request(valid_request) == True

def test_validate_request_invalid(self, processor, invalid_request):
"""Test invalid request validation"""
with pytest.raises(ValidationError):
processor.validate_request(invalid_request)

@pytest.mark.asyncio
async def test_process_request_success(self, processor, valid_request):
"""Test successful request processing"""
result = await processor.process_request(valid_request)

assert result["success"] == True
assert "result" in result

@pytest.mark.asyncio
async def test_process_request_error(self, processor, invalid_request):
"""Test request processing error"""
with pytest.raises(ValidationError):
await processor.process_request(invalid_request)

@patch('your_agent.external_api_call')
@pytest.mark.asyncio
async def test_external_api_integration(self, mock_api, processor, valid_request):
"""Test external API integration"""
mock_api.return_value = {"status": "success"}

result = await processor.process_request(valid_request)

assert result["success"] == True
mock_api.assert_called_once()

2. Integration Testing

Test complete workflows:

import pytest
import asyncio
from fastapi.testclient import TestClient
from your_agent import app

class TestIntegration:
"""Integration tests"""

@pytest.fixture
def client(self):
"""Test client fixture"""
return TestClient(app)

def test_health_endpoint(self, client):
"""Test health endpoint"""
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"

def test_process_endpoint_success(self, client):
"""Test process endpoint success"""
request_data = {
"type": "analysis",
"data": {"dataset": "test"}
}

response = client.post("/process", json=request_data)
assert response.status_code == 200

data = response.json()
assert data["success"] == True
assert "result" in data

def test_process_endpoint_validation_error(self, client):
"""Test process endpoint validation error"""
request_data = {
"type": "invalid_type",
"data": {}
}

response = client.post("/process", json=request_data)
assert response.status_code == 400

def test_rate_limiting(self, client):
"""Test rate limiting"""
# Make many requests quickly
for _ in range(150): # Exceed rate limit
response = client.post("/process", json={"type": "test", "data": {}})

if response.status_code == 429:
break

assert response.status_code == 429

3. Load Testing

Test performance under load:

import asyncio
import aiohttp
import time
from typing import List, Dict, Any

class LoadTester:
"""Load testing utility"""

def __init__(self, base_url: str):
self.base_url = base_url

async def run_load_test(self, num_requests: int, concurrent: int) -> Dict[str, Any]:
"""Run load test"""
start_time = time.time()

# Create semaphore to limit concurrency
semaphore = asyncio.Semaphore(concurrent)

# Create tasks
tasks = [
self._make_request(semaphore, i)
for i in range(num_requests)
]

# Run all requests
results = await asyncio.gather(*tasks, return_exceptions=True)

end_time = time.time()
duration = end_time - start_time

# Analyze results
successful = sum(1 for r in results if isinstance(r, dict) and r.get("success"))
failed = len(results) - successful

return {
"total_requests": num_requests,
"successful": successful,
"failed": failed,
"duration": duration,
"requests_per_second": num_requests / duration,
"success_rate": successful / num_requests
}

async def _make_request(self, semaphore: asyncio.Semaphore, request_id: int) -> Dict[str, Any]:
"""Make a single request"""
async with semaphore:
async with aiohttp.ClientSession() as session:
start = time.time()

try:
async with session.post(
f"{self.base_url}/process",
json={"type": "test", "data": {"id": request_id}}
) as response:
duration = time.time() - start

if response.status == 200:
data = await response.json()
return {
"success": True,
"duration": duration,
"status_code": response.status
}
else:
return {
"success": False,
"duration": duration,
"status_code": response.status
}
except Exception as e:
return {
"success": False,
"error": str(e),
"duration": time.time() - start
}

# Usage
async def run_load_test():
"""Run load test"""
tester = LoadTester("http://localhost:8080")

# Test with 1000 requests, 50 concurrent
results = await tester.run_load_test(1000, 50)

print(f"Load test results:")
print(f" Total requests: {results['total_requests']}")
print(f" Successful: {results['successful']}")
print(f" Failed: {results['failed']}")
print(f" Duration: {results['duration']:.2f}s")
print(f" RPS: {results['requests_per_second']:.2f}")
print(f" Success rate: {results['success_rate']:.2%}")

Next Steps

After implementing these best practices:

  1. Agent-to-Agent Communication - Learn about multi-agent collaboration
  2. Debugging Guide - Debug your agents effectively
  3. UI Integration Guide - Integrate with user interfaces
  4. Full Deployment Guide - Deploy to production

Ready to implement these practices? Start with Agent-to-Agent Communication to learn about building collaborative agent systems!