Best Practices
Essential best practices for developing, deploying, and maintaining Pixell agents in production environments.
Note: This documentation is generated by AI based on the source code, and therefore it may have some incorrect knowledge of the project. In that case, please contact engineering@pixell.global
Development Best Practices
1. Code Organization
Structure your agent code for maintainability:
# src/
├── main.py # Entry point
├── services/ # Business logic
│ ├── data_processor.py
│ └── report_generator.py
├── api/ # REST endpoints
│ ├── routes.py
│ └── middleware.py
├── a2a/ # Agent-to-agent communication
│ └── service.py
├── config/ # Configuration
│ └── settings.py
└── tests/ # Test files
├── test_main.py
└── test_services.py
Key principles:
- Separate concerns (business logic, API, configuration)
- Use dependency injection
- Implement proper error handling
- Write comprehensive tests
2. Error Handling
Implement robust error handling:
import logging
from typing import Dict, Any, Optional
from fastapi import HTTPException
class AgentError(Exception):
"""Base exception for agent errors"""
def __init__(self, message: str, error_code: str = None):
self.message = message
self.error_code = error_code
super().__init__(self.message)
class ValidationError(AgentError):
"""Validation error"""
pass
class ProcessingError(AgentError):
"""Processing error"""
pass
def process_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Process request with proper error handling"""
try:
# Validate input
if not validate_request(request):
raise ValidationError("Invalid request format")
# Process request
result = perform_processing(request)
return {"success": True, "result": result}
except ValidationError as e:
logging.warning(f"Validation error: {e.message}")
raise HTTPException(status_code=400, detail=e.message)
except ProcessingError as e:
logging.error(f"Processing error: {e.message}")
raise HTTPException(status_code=500, detail="Internal processing error")
except Exception as e:
logging.error(f"Unexpected error: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
def validate_request(request: Dict[str, Any]) -> bool:
"""Validate request structure"""
required_fields = ["type", "data"]
return all(field in request for field in required_fields)
3. Logging and Monitoring
Implement comprehensive logging:
import logging
import json
from datetime import datetime
from typing import Dict, Any
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agent.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class AgentLogger:
"""Structured logging for agents"""
@staticmethod
def log_request(request_id: str, request: Dict[str, Any]):
"""Log incoming request"""
logger.info(json.dumps({
"event": "request_received",
"request_id": request_id,
"type": request.get("type"),
"timestamp": datetime.utcnow().isoformat()
}))
@staticmethod
def log_response(request_id: str, response: Dict[str, Any], duration: float):
"""Log response"""
logger.info(json.dumps({
"event": "response_sent",
"request_id": request_id,
"success": response.get("success", False),
"duration_ms": duration * 1000,
"timestamp": datetime.utcnow().isoformat()
}))
@staticmethod
def log_error(request_id: str, error: str, error_type: str):
"""Log error"""
logger.error(json.dumps({
"event": "error_occurred",
"request_id": request_id,
"error": error,
"error_type": error_type,
"timestamp": datetime.utcnow().isoformat()
}))
Security Best Practices
1. Authentication and Authorization
Implement proper security:
from fastapi import HTTPException, Depends, Header
from typing import Optional
import jwt
import os
class SecurityManager:
"""Handle authentication and authorization"""
def __init__(self):
self.jwt_secret = os.getenv("JWT_SECRET")
self.api_key = os.getenv("API_KEY")
def verify_api_key(self, api_key: Optional[str] = Header(None)) -> bool:
"""Verify API key"""
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
if api_key != self.api_key:
raise HTTPException(status_code=401, detail="Invalid API key")
return True
def verify_jwt_token(self, token: Optional[str] = Header(None)) -> Dict[str, Any]:
"""Verify JWT token"""
if not token:
raise HTTPException(status_code=401, detail="Token required")
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
# Usage in endpoints
@app.post("/secure-endpoint")
async def secure_endpoint(
request: Dict[str, Any],
api_key: str = Depends(security_manager.verify_api_key),
user: Dict[str, Any] = Depends(security_manager.verify_jwt_token)
):
"""Secure endpoint with authentication"""
# Process request
pass
2. Input Validation
Validate all inputs:
from pydantic import BaseModel, validator
from typing import Dict, Any, List
import re
class ProcessRequest(BaseModel):
"""Validated request model"""
type: str
data: Dict[str, Any]
priority: str = "normal"
@validator('type')
def validate_type(cls, v):
allowed_types = ["analysis", "report", "process"]
if v not in allowed_types:
raise ValueError(f"Type must be one of {allowed_types}")
return v
@validator('priority')
def validate_priority(cls, v):
allowed_priorities = ["low", "normal", "high", "critical"]
if v not in allowed_priorities:
raise ValueError(f"Priority must be one of {allowed_priorities}")
return v
@validator('data')
def validate_data(cls, v):
if not isinstance(v, dict):
raise ValueError("Data must be a dictionary")
# Check for malicious content
for key, value in v.items():
if isinstance(value, str):
# Check for SQL injection patterns
if re.search(r'(union|select|insert|delete|update|drop)', value.lower()):
raise ValueError("Potentially malicious input detected")
return v
3. Rate Limiting
Implement rate limiting:
from fastapi import Request, HTTPException
from collections import defaultdict, deque
import time
from typing import Dict, Deque
class RateLimiter:
"""Simple rate limiter"""
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: Dict[str, Deque[float]] = defaultdict(lambda: deque())
def is_allowed(self, client_ip: str) -> bool:
"""Check if request is allowed"""
now = time.time()
client_requests = self.requests[client_ip]
# Remove old requests outside window
while client_requests and client_requests[0] <= now - self.window_seconds:
client_requests.popleft()
# Check if under limit
if len(client_requests) >= self.max_requests:
return False
# Add current request
client_requests.append(now)
return True
# Usage
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
"""Rate limiting middleware"""
client_ip = request.client.host
if not rate_limiter.is_allowed(client_ip):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
response = await call_next(request)
return response
Performance Best Practices
1. Async Programming
Use async/await for better performance:
import asyncio
import aiohttp
from typing import List, Dict, Any
import time
class AsyncProcessor:
"""Async request processor"""
def __init__(self):
self.session = None
async def __aenter__(self):
"""Async context manager entry"""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.close()
async def process_requests(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process multiple requests concurrently"""
tasks = [self.process_single_request(req) for req in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"success": False,
"error": str(result),
"request_id": requests[i].get("id")
})
else:
processed_results.append(result)
return processed_results
async def process_single_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single request"""
# Simulate processing
await asyncio.sleep(0.1)
return {
"success": True,
"result": f"Processed {request.get('type')}",
"request_id": request.get("id")
}
# Usage
async def main():
requests = [
{"id": "1", "type": "analysis"},
{"id": "2", "type": "report"},
{"id": "3", "type": "process"}
]
async with AsyncProcessor() as processor:
results = await processor.process_requests(requests)
print(results)
2. Caching
Implement caching for better performance:
import redis
import json
from typing import Any, Optional
import hashlib
class CacheManager:
"""Redis-based caching"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = 300 # 5 minutes
def _generate_key(self, prefix: str, data: Any) -> str:
"""Generate cache key"""
data_str = json.dumps(data, sort_keys=True)
hash_key = hashlib.md5(data_str.encode()).hexdigest()
return f"{prefix}:{hash_key}"
async def get(self, key: str) -> Optional[Any]:
"""Get cached value"""
try:
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
except Exception as e:
print(f"Cache get error: {e}")
return None
async def set(self, key: str, value: Any, ttl: int = None) -> bool:
"""Set cached value"""
try:
ttl = ttl or self.default_ttl
self.redis_client.setex(key, ttl, json.dumps(value))
return True
except Exception as e:
print(f"Cache set error: {e}")
return False
async def get_or_set(self, key: str, factory_func, ttl: int = None) -> Any:
"""Get from cache or set using factory function"""
cached = await self.get(key)
if cached is not None:
return cached
value = await factory_func()
await self.set(key, value, ttl)
return value
# Usage
cache = CacheManager()
async def get_expensive_data(query: str) -> Dict[str, Any]:
"""Get expensive data with caching"""
cache_key = cache._generate_key("query", query)
return await cache.get_or_set(
cache_key,
lambda: perform_expensive_query(query),
ttl=600 # 10 minutes
)
3. Connection Pooling
Use connection pooling for external services:
import aiohttp
import asyncio
from typing import Dict, Any
class ConnectionPool:
"""Connection pool for external services"""
def __init__(self, max_connections: int = 100):
self.max_connections = max_connections
self.session = None
self.connector = None
async def __aenter__(self):
"""Async context manager entry"""
self.connector = aiohttp.TCPConnector(
limit=self.max_connections,
limit_per_host=30,
keepalive_timeout=30,
enable_cleanup_closed=True
)
self.session = aiohttp.ClientSession(connector=self.connector)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.close()
if self.connector:
await self.connector.close()
async def make_request(self, url: str, method: str = "GET", **kwargs) -> Dict[str, Any]:
"""Make HTTP request using connection pool"""
async with self.session.request(method, url, **kwargs) as response:
return await response.json()
# Usage
async def fetch_data():
"""Fetch data using connection pool"""
async with ConnectionPool(max_connections=50) as pool:
tasks = [
pool.make_request(f"https://api.example.com/data/{i}")
for i in range(100)
]
results = await asyncio.gather(*tasks)
return results
Deployment Best Practices
1. Configuration Management
Use environment-based configuration:
import os
from typing import Dict, Any, Optional
from pydantic import BaseSettings
class Settings(BaseSettings):
"""Application settings"""
# Environment
environment: str = "development"
debug: bool = False
# Server
host: str = "0.0.0.0"
port: int = 8080
# Database
database_url: str
database_pool_size: int = 10
# Redis
redis_url: str = "redis://localhost:6379"
# Security
api_key: str
jwt_secret: str
# Logging
log_level: str = "INFO"
log_format: str = "json"
class Config:
env_file = ".env"
case_sensitive = False
# Environment-specific settings
class DevelopmentSettings(Settings):
debug: bool = True
log_level: str = "DEBUG"
class ProductionSettings(Settings):
debug: bool = False
log_level: str = "INFO"
def get_settings() -> Settings:
"""Get settings based on environment"""
env = os.getenv("ENVIRONMENT", "development")
if env == "production":
return ProductionSettings()
else:
return DevelopmentSettings()
2. Health Checks
Implement comprehensive health checks:
from fastapi import APIRouter, HTTPException
from typing import Dict, Any
import asyncio
import time
class HealthChecker:
"""Health check manager"""
def __init__(self):
self.start_time = time.time()
self.checks = {}
def register_check(self, name: str, check_func):
"""Register a health check"""
self.checks[name] = check_func
async def run_checks(self) -> Dict[str, Any]:
"""Run all health checks"""
results = {}
for name, check_func in self.checks.items():
try:
start = time.time()
result = await check_func()
duration = time.time() - start
results[name] = {
"status": "healthy",
"duration": duration,
"details": result
}
except Exception as e:
results[name] = {
"status": "unhealthy",
"error": str(e)
}
return results
# Health check functions
async def check_database():
"""Check database connectivity"""
# Implement database check
return {"connected": True}
async def check_redis():
"""Check Redis connectivity"""
# Implement Redis check
return {"connected": True}
async def check_external_api():
"""Check external API connectivity"""
# Implement external API check
return {"connected": True}
# Register checks
health_checker = HealthChecker()
health_checker.register_check("database", check_database)
health_checker.register_check("redis", check_redis)
health_checker.register_check("external_api", check_external_api)
# Health check endpoints
@app.get("/health")
async def health_check():
"""Basic health check"""
return {"status": "healthy", "timestamp": time.time()}
@app.get("/health/detailed")
async def detailed_health_check():
"""Detailed health check"""
checks = await health_checker.run_checks()
all_healthy = all(
check["status"] == "healthy"
for check in checks.values()
)
status_code = 200 if all_healthy else 503
return {
"status": "healthy" if all_healthy else "unhealthy",
"checks": checks,
"uptime": time.time() - health_checker.start_time
}
3. Graceful Shutdown
Implement graceful shutdown:
import signal
import asyncio
from typing import List, Callable
import logging
class GracefulShutdown:
"""Handle graceful shutdown"""
def __init__(self):
self.shutdown_handlers: List[Callable] = []
self.is_shutting_down = False
def register_handler(self, handler: Callable):
"""Register shutdown handler"""
self.shutdown_handlers.append(handler)
async def shutdown(self):
"""Execute shutdown handlers"""
if self.is_shutting_down:
return
self.is_shutting_down = True
logging.info("Starting graceful shutdown...")
for handler in self.shutdown_handlers:
try:
await handler()
except Exception as e:
logging.error(f"Error in shutdown handler: {e}")
logging.info("Graceful shutdown completed")
# Global shutdown manager
shutdown_manager = GracefulShutdown()
# Shutdown handlers
async def close_database():
"""Close database connections"""
logging.info("Closing database connections...")
# Implement database cleanup
async def close_redis():
"""Close Redis connections"""
logging.info("Closing Redis connections...")
# Implement Redis cleanup
async def finish_requests():
"""Finish pending requests"""
logging.info("Finishing pending requests...")
# Implement request cleanup
# Register handlers
shutdown_manager.register_handler(close_database)
shutdown_manager.register_handler(close_redis)
shutdown_manager.register_handler(finish_requests)
# Signal handlers
def signal_handler(signum, frame):
"""Handle shutdown signals"""
logging.info(f"Received signal {signum}")
asyncio.create_task(shutdown_manager.shutdown())
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
Monitoring Best Practices
1. Metrics Collection
Collect relevant metrics:
import time
from typing import Dict, Any
from collections import defaultdict, deque
import threading
class MetricsCollector:
"""Collect and store metrics"""
def __init__(self):
self.metrics = defaultdict(list)
self.lock = threading.Lock()
def increment_counter(self, name: str, value: int = 1):
"""Increment a counter metric"""
with self.lock:
self.metrics[f"{name}_total"].append({
"timestamp": time.time(),
"value": value
})
def record_histogram(self, name: str, value: float):
"""Record a histogram value"""
with self.lock:
self.metrics[f"{name}_histogram"].append({
"timestamp": time.time(),
"value": value
})
def get_metrics(self) -> Dict[str, Any]:
"""Get current metrics"""
with self.lock:
return dict(self.metrics)
# Global metrics collector
metrics = MetricsCollector()
# Decorator for automatic metrics
def track_metrics(func):
"""Decorator to track function metrics"""
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
# Record success metrics
metrics.increment_counter("requests_total")
metrics.record_histogram("request_duration", duration)
return result
except Exception as e:
# Record error metrics
metrics.increment_counter("errors_total")
metrics.increment_counter(f"errors_{type(e).__name__}")
raise
return wrapper
# Usage
@track_metrics
async def process_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Process request with metrics tracking"""
# Process request
return {"success": True}
2. Alerting
Set up alerting for critical issues:
import asyncio
import aiohttp
from typing import Dict, Any, List
import json
class AlertManager:
"""Manage alerts and notifications"""
def __init__(self, webhook_url: str = None):
self.webhook_url = webhook_url
self.alert_rules = []
def add_rule(self, name: str, condition: callable, severity: str = "warning"):
"""Add alert rule"""
self.alert_rules.append({
"name": name,
"condition": condition,
"severity": severity
})
async def check_alerts(self, metrics: Dict[str, Any]):
"""Check all alert rules"""
for rule in self.alert_rules:
try:
if rule["condition"](metrics):
await self.send_alert(rule["name"], rule["severity"], metrics)
except Exception as e:
print(f"Error checking alert {rule['name']}: {e}")
async def send_alert(self, name: str, severity: str, metrics: Dict[str, Any]):
"""Send alert notification"""
alert_data = {
"alert": name,
"severity": severity,
"timestamp": time.time(),
"metrics": metrics
}
if self.webhook_url:
async with aiohttp.ClientSession() as session:
await session.post(self.webhook_url, json=alert_data)
# Alert rules
def high_error_rate(metrics: Dict[str, Any]) -> bool:
"""Check for high error rate"""
errors = metrics.get("errors_total", [])
total = metrics.get("requests_total", [])
if not total:
return False
error_rate = len(errors) / len(total)
return error_rate > 0.1 # 10% error rate
def high_response_time(metrics: Dict[str, Any]) -> bool:
"""Check for high response time"""
durations = metrics.get("request_duration_histogram", [])
if not durations:
return False
avg_duration = sum(d["value"] for d in durations) / len(durations)
return avg_duration > 5.0 # 5 seconds
# Setup alerts
alert_manager = AlertManager(webhook_url="https://hooks.slack.com/your-webhook")
alert_manager.add_rule("high_error_rate", high_error_rate, "critical")
alert_manager.add_rule("high_response_time", high_response_time, "warning")
Testing Best Practices
1. Unit Testing
Write comprehensive unit tests:
import pytest
import asyncio
from unittest.mock import Mock, patch
from your_agent import AgentProcessor, ValidationError
class TestAgentProcessor:
"""Test agent processor"""
@pytest.fixture
def processor(self):
"""Create processor instance"""
return AgentProcessor()
@pytest.fixture
def valid_request(self):
"""Valid request fixture"""
return {
"type": "analysis",
"data": {"dataset": "test_data"}
}
@pytest.fixture
def invalid_request(self):
"""Invalid request fixture"""
return {
"type": "invalid_type",
"data": {}
}
def test_validate_request_valid(self, processor, valid_request):
"""Test valid request validation"""
assert processor.validate_request(valid_request) == True
def test_validate_request_invalid(self, processor, invalid_request):
"""Test invalid request validation"""
with pytest.raises(ValidationError):
processor.validate_request(invalid_request)
@pytest.mark.asyncio
async def test_process_request_success(self, processor, valid_request):
"""Test successful request processing"""
result = await processor.process_request(valid_request)
assert result["success"] == True
assert "result" in result
@pytest.mark.asyncio
async def test_process_request_error(self, processor, invalid_request):
"""Test request processing error"""
with pytest.raises(ValidationError):
await processor.process_request(invalid_request)
@patch('your_agent.external_api_call')
@pytest.mark.asyncio
async def test_external_api_integration(self, mock_api, processor, valid_request):
"""Test external API integration"""
mock_api.return_value = {"status": "success"}
result = await processor.process_request(valid_request)
assert result["success"] == True
mock_api.assert_called_once()
2. Integration Testing
Test complete workflows:
import pytest
import asyncio
from fastapi.testclient import TestClient
from your_agent import app
class TestIntegration:
"""Integration tests"""
@pytest.fixture
def client(self):
"""Test client fixture"""
return TestClient(app)
def test_health_endpoint(self, client):
"""Test health endpoint"""
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_process_endpoint_success(self, client):
"""Test process endpoint success"""
request_data = {
"type": "analysis",
"data": {"dataset": "test"}
}
response = client.post("/process", json=request_data)
assert response.status_code == 200
data = response.json()
assert data["success"] == True
assert "result" in data
def test_process_endpoint_validation_error(self, client):
"""Test process endpoint validation error"""
request_data = {
"type": "invalid_type",
"data": {}
}
response = client.post("/process", json=request_data)
assert response.status_code == 400
def test_rate_limiting(self, client):
"""Test rate limiting"""
# Make many requests quickly
for _ in range(150): # Exceed rate limit
response = client.post("/process", json={"type": "test", "data": {}})
if response.status_code == 429:
break
assert response.status_code == 429
3. Load Testing
Test performance under load:
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
class LoadTester:
"""Load testing utility"""
def __init__(self, base_url: str):
self.base_url = base_url
async def run_load_test(self, num_requests: int, concurrent: int) -> Dict[str, Any]:
"""Run load test"""
start_time = time.time()
# Create semaphore to limit concurrency
semaphore = asyncio.Semaphore(concurrent)
# Create tasks
tasks = [
self._make_request(semaphore, i)
for i in range(num_requests)
]
# Run all requests
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
duration = end_time - start_time
# Analyze results
successful = sum(1 for r in results if isinstance(r, dict) and r.get("success"))
failed = len(results) - successful
return {
"total_requests": num_requests,
"successful": successful,
"failed": failed,
"duration": duration,
"requests_per_second": num_requests / duration,
"success_rate": successful / num_requests
}
async def _make_request(self, semaphore: asyncio.Semaphore, request_id: int) -> Dict[str, Any]:
"""Make a single request"""
async with semaphore:
async with aiohttp.ClientSession() as session:
start = time.time()
try:
async with session.post(
f"{self.base_url}/process",
json={"type": "test", "data": {"id": request_id}}
) as response:
duration = time.time() - start
if response.status == 200:
data = await response.json()
return {
"success": True,
"duration": duration,
"status_code": response.status
}
else:
return {
"success": False,
"duration": duration,
"status_code": response.status
}
except Exception as e:
return {
"success": False,
"error": str(e),
"duration": time.time() - start
}
# Usage
async def run_load_test():
"""Run load test"""
tester = LoadTester("http://localhost:8080")
# Test with 1000 requests, 50 concurrent
results = await tester.run_load_test(1000, 50)
print(f"Load test results:")
print(f" Total requests: {results['total_requests']}")
print(f" Successful: {results['successful']}")
print(f" Failed: {results['failed']}")
print(f" Duration: {results['duration']:.2f}s")
print(f" RPS: {results['requests_per_second']:.2f}")
print(f" Success rate: {results['success_rate']:.2%}")
Next Steps
After implementing these best practices:
- Agent-to-Agent Communication - Learn about multi-agent collaboration
- Debugging Guide - Debug your agents effectively
- UI Integration Guide - Integrate with user interfaces
- Full Deployment Guide - Deploy to production
Ready to implement these practices? Start with Agent-to-Agent Communication to learn about building collaborative agent systems!