Architettura Microservizi
L'architettura microservizi di Emblema è progettata per massimizzare scalabilità, resilienza e manutenibilità. Ogni servizio ha responsabilità specifiche e comunica attraverso interfacce ben definite, permettendo sviluppo, deploy e scaling indipendenti.
Panoramica dell'Architettura
🏗️ Service Mesh Architecture
Emblema utilizza un'architettura hybrid service mesh dove Traefik funge da API Gateway e service proxy, gestendo routing, load balancing e terminazione SSL.
🔧 Service Catalog
Frontend Services
- www-emblema: Applicazione web principale (Next.js 14)
- doc-emblema: Documentazione (Docusaurus)
Core Services
- background-task: Elaborazione AI asincrona (Python/FastAPI/Celery)
- spp-retrieval: Ricerca semantica (Python/FastAPI)
- document-render: Rendering documenti (Python)
- mcp-demo: Model Context Protocol server
- novu-bridge: Bridge per notifiche
Infrastructure Services
- Traefik: Reverse proxy e load balancer
- Keycloak: Identity and Access Management
- LiteLLM: Gateway unificato per modelli AI
- Hasura: Engine GraphQL
Comunicazione Inter-Service
🔄 Pattern di Comunicazione
1. Synchronous Communication
REST APIs: Per operazioni real-time e interactive
// Esempio client HTTP unificato
class ServiceClient {
private baseURL: string;
private timeout: number = 30000;
constructor(serviceName: string) {
this.baseURL = `http://${serviceName}`;
}
async post<T>(endpoint: string, data: any): Promise<T> {
const response = await fetch(`${this.baseURL}${endpoint}`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${await this.getToken()}`,
},
body: JSON.stringify(data),
signal: AbortSignal.timeout(this.timeout),
});
if (!response.ok) {
throw new ServiceError(`${serviceName} error: ${response.statusText}`);
}
return response.json();
}
}
// Utilizzo
const backgroundTaskClient = new ServiceClient("background-task");
const processingResult = await backgroundTaskClient.post("/tasks", {
document_id: documentId,
chunker_config: config,
});
GraphQL: Per query complesse e subscription real-time
// Hasura GraphQL client con subscription
import { createClient } from "graphql-ws";
const graphqlClient = createClient({
url: "ws://graphql-engine:8080/v1/graphql",
connectionParams: {
headers: {
"x-hasura-admin-secret": process.env.HASURA_ADMIN_SECRET,
},
},
});
// Subscription per aggiornamenti real-time
const subscription = graphqlClient.iterate({
query: `
subscription TaskUpdates($userId: uuid!) {
task(where: {created_by: {_eq: $userId}}) {
id
status
progress
error_message
}
}
`,
variables: { userId },
});
for await (const result of subscription) {
updateUI(result.data);
}
2. Asynchronous Communication
Message Queue con Redis: Per task asincroni
# background-task/celery_app.py - Configurazione reale
from celery import Celery
from celery.schedules import crontab
celery_app = Celery(
__name__,
backend=os.getenv("CELERY_RESULT_URL", "redis://localhost:6379/6"),
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/6"),
)
# Task scheduling configuration
celery_app.conf.beat_schedule = {
"monitor-pending-periodically": {
"task": "app.tasks.monitor_pending_tasks",
"schedule": TASK_MONITOR_INTERVAL, # ogni 5 secondi
},
"delete-temp-files-periodically": {
"task": "app.tasks.delete_temp_files",
"schedule": crontab(minute=0, hour=3), # 3:00 AM UTC
},
}
# Worker pool recycling per ML models
celery_app.conf.worker_max_tasks_per_child = 1 # Libera VRAM dopo ogni task
Event-Driven con WebSockets: Per notifiche real-time
// Real-time event handling
class EventBus {
private ws: WebSocket;
private handlers = new Map<string, Function[]>();
constructor() {
this.ws = new WebSocket("ws://www-emblema:3000/ws");
this.setupEventHandlers();
}
subscribe(event: string, handler: Function) {
if (!this.handlers.has(event)) {
this.handlers.set(event, []);
}
this.handlers.get(event)!.push(handler);
}
private setupEventHandlers() {
this.ws.onmessage = (event) => {
const { type, data } = JSON.parse(event.data);
const handlers = this.handlers.get(type) || [];
handlers.forEach((handler) => handler(data));
};
}
}
// Utilizzo
const eventBus = new EventBus();
eventBus.subscribe("document.processing.completed", (data) => {
showNotification(`Document ${data.document_id} processed successfully`);
});
Service Discovery e Routing
🗺️ Service Discovery
Emblema utilizza DNS-based service discovery attraverso Docker networking e Traefik per service mesh capabilities.
Docker Compose Networking:
# Configurazione networking reale
networks:
emblema:
driver: bridge
name: emblema
redis-net:
driver: bridge
services:
www-emblema:
networks:
- emblema
depends_on:
- background-task
- milvus
- graphql-engine
Service Discovery tramite DNS:
// I servizi si trovano automaticamente tramite nome DNS
const serviceEndpoints = {
backgroundTask: "http://background-task", // Docker service name
retrieval: "http://spp-retrieval:8001", // con porta custom
graphql: "http://graphql-engine:8080/v1/graphql",
milvus: "http://milvus:19530/v2/vectordb",
litellm: "http://litellm:4000/v1",
};
🎯 Dynamic Routing con Traefik
Configurazione automatica con labels:
# Configurazione Traefik labels reale
www-emblema:
labels:
- "traefik.enable=true"
- "traefik.http.routers.emblema-web.rule=Host(`${EMBLEMA_WEB_HOSTNAME}`)"
- "traefik.http.routers.emblema-web.entrypoints=websecure"
- "traefik.http.routers.emblema-web.tls=true"
- "traefik.http.routers.emblema-web.tls.certresolver=${CERT_RESOLVER:-}"
- "traefik.http.services.emblema-web.loadbalancer.server.port=3000"
background-task:
labels:
- "traefik.enable=true"
- "traefik.http.routers.background-task.rule=Host(`${BACKGROUND_TASK_HOSTNAME}`)"
- "traefik.http.routers.background-task.entrypoints=websecure"
- "traefik.http.services.background-task.loadbalancer.server.port=8000"
Path-based routing per API versioning:
# API Gateway routing rules
- "traefik.http.routers.api-v1.rule=Host(`api.emblema.local`) && PathPrefix(`/v1`)"
- "traefik.http.routers.api-v2.rule=Host(`api.emblema.local`) && PathPrefix(`/v2`)"
Load Balancing e Scalability
⚖️ Load Balancing Strategies
1. Round Robin (Default)
# Traefik load balancing configuration
services:
www-emblema:
deploy:
replicas: 3
labels:
- "traefik.http.services.www-emblema.loadbalancer.server.port=3000"
# Round-robin è il default
2. Weighted Round Robin per A/B Testing
# A/B testing con weights
www-emblema-v1:
labels:
- "traefik.http.services.www.loadbalancer.server.port=3000"
- "traefik.http.services.www.loadbalancer.server.weight=80"
www-emblema-v2:
labels:
- "traefik.http.services.www.loadbalancer.server.port=3000"
- "traefik.http.services.www.loadbalancer.server.weight=20"
3. Sticky Sessions per Stateful Services
# Session affinity
keycloak:
labels:
- "traefik.http.services.keycloak.loadbalancer.sticky.cookie.name=keycloak-session"
- "traefik.http.services.keycloak.loadbalancer.sticky.cookie.secure=true"
📈 Horizontal Scaling
Auto-scaling con Docker Compose:
# Scaling configuration
background-task-worker:
image: emblema/background-task:${EMBLEMA_VERSION:-dev}
command: /app/.venv/bin/celery -A app.celery_app:celery_app worker --concurrency=${CELERY_MAX_CONCURRENCY:-1}
deploy:
replicas: ${BACKGROUND_WORKER_REPLICAS:-2}
resources:
reservations:
devices:
- driver: nvidia
device_ids: ["0", "1"] # Multiple GPU support
capabilities: [gpu]
Resource-based scaling:
# Celery autoscaling configuration
from celery import Celery
celery_app = Celery(__name__)
# Auto-scale workers basato su queue size
celery_app.conf.worker_autoscaler = 'celery.worker.autoscale:Autoscaler'
celery_app.conf.worker_max_concurrency = int(os.getenv('CELERY_MAX_WORKERS', 4))
celery_app.conf.worker_min_concurrency = int(os.getenv('CELERY_MIN_WORKERS', 1))
# Scale up quando queue > 10 tasks
# Scale down quando idle > 300 secondi
celery_app.conf.worker_autoscaler_settings = {
'max': 4,
'min': 1,
'queue_length_threshold': 10,
'idle_timeout': 300
}
🔧 Health Checks e Circuit Breaker
Service Health Monitoring:
# Health check endpoint in ogni servizio
from fastapi import FastAPI, HTTPStatus
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get("/health")
async def health_check():
try:
# Check dependencies
await check_database_connection()
await check_redis_connection()
await check_external_services()
return JSONResponse(
status_code=HTTPStatus.OK,
content={
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": os.getenv("SERVICE_VERSION", "unknown")
}
)
except Exception as e:
return JSONResponse(
status_code=HTTPStatus.SERVICE_UNAVAILABLE,
content={
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
)
Circuit Breaker Pattern:
// Circuit breaker per service calls
class CircuitBreaker {
private failures = 0;
private lastFailureTime: Date | null = null;
private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED";
constructor(
private threshold = 5,
private timeout = 60000,
private monitoringPeriod = 10000,
) {}
async call<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === "OPEN") {
if (this.shouldAttemptReset()) {
this.state = "HALF_OPEN";
} else {
throw new Error("Circuit breaker is OPEN");
}
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess() {
this.failures = 0;
this.state = "CLOSED";
}
private onFailure() {
this.failures++;
this.lastFailureTime = new Date();
if (this.failures >= this.threshold) {
this.state = "OPEN";
}
}
private shouldAttemptReset(): boolean {
return (
this.lastFailureTime !== null &&
Date.now() - this.lastFailureTime.getTime() >= this.timeout
);
}
}
// Utilizzo
const breaker = new CircuitBreaker();
const result = await breaker.call(() => fetch("http://background-task/health"));
Data Consistency e Transaction Management
🔄 Eventual Consistency Pattern
Saga Pattern per transazioni distribuite:
# Esempio: Document processing saga
class DocumentProcessingSaga:
def __init__(self, document_id: str):
self.document_id = document_id
self.steps = []
async def execute(self):
try:
# Step 1: Upload to MinIO
storage_key = await self.upload_to_storage()
self.steps.append(('upload', storage_key))
# Step 2: Update document status
await self.update_document_status('processing')
self.steps.append(('status_update', 'processing'))
# Step 3: Process with AI
chunks = await self.process_document()
self.steps.append(('processing', chunks))
# Step 4: Store embeddings
await self.store_embeddings(chunks)
self.steps.append(('embeddings', len(chunks)))
# Step 5: Final status update
await self.update_document_status('completed')
except Exception as e:
await self.compensate()
raise
async def compensate(self):
"""Rollback in reverse order"""
for step_type, data in reversed(self.steps):
try:
if step_type == 'upload':
await self.delete_from_storage(data)
elif step_type == 'embeddings':
await self.delete_embeddings(self.document_id)
elif step_type == 'status_update':
await self.update_document_status('failed')
except Exception as e:
logger.error(f"Compensation failed for {step_type}: {e}")
📊 Event Sourcing per Audit Trail
# Event store per tracciabilità
class EventStore:
def __init__(self):
self.events = []
async def append_event(self, event: Event):
event.timestamp = datetime.utcnow()
event.id = str(uuid4())
# Store in PostgreSQL
await self.db.execute(
"INSERT INTO events (id, aggregate_id, event_type, payload, timestamp) VALUES (%s, %s, %s, %s, %s)",
(event.id, event.aggregate_id, event.event_type, event.payload, event.timestamp)
)
# Publish to Redis for real-time subscribers
await self.redis.publish(f"events:{event.aggregate_id}", event.to_json())
# Utilizzo per document lifecycle
async def handle_document_upload(document_id: str):
event = DocumentUploadedEvent(
aggregate_id=document_id,
payload={'file_size': file_size, 'file_type': file_type}
)
await event_store.append_event(event)
Monitoring e Observability
📈 Distributed Tracing
OpenTelemetry Integration:
# Tracing configuration
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Setup tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Auto-instrument FastAPI and requests
FastAPIInstrumentor.instrument_app(app)
RequestsInstrumentor().instrument()
# Manual tracing per business logic
@tracer.start_as_current_span("process_document")
async def process_document(document_id: str):
with tracer.start_as_current_span("download_file") as span:
span.set_attribute("document.id", document_id)
file_content = await download_file(document_id)
span.set_attribute("file.size", len(file_content))
with tracer.start_as_current_span("extract_text"):
text = await extract_text(file_content)
span.set_attribute("text.length", len(text))
return text
📊 Service Metrics
Prometheus metrics per servizio:
from prometheus_client import Counter, Histogram, Gauge
import time
# Service-level metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')
ACTIVE_CONNECTIONS = Gauge('active_connections', 'Active connections')
# Business metrics
DOCUMENTS_PROCESSED = Counter('documents_processed_total', 'Documents processed', ['status'])
PROCESSING_TIME = Histogram('document_processing_seconds', 'Document processing time')
QUEUE_SIZE = Gauge('celery_queue_size', 'Celery queue size', ['queue'])
# Middleware per automatic metrics
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.observe(time.time() - start_time)
return response
Fault Tolerance e Resilience
🛡️ Retry Strategies
Exponential Backoff con Jitter:
import asyncio
import random
from typing import Callable, Any
async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
) -> Any:
"""Retry with exponential backoff and jitter"""
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
if attempt == max_retries:
raise e
# Calculate delay with exponential backoff
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter to prevent thundering herd
if jitter:
delay *= (0.5 + random.random() * 0.5)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
# Utilizzo
await retry_with_backoff(
lambda: call_external_service(),
max_retries=3,
base_delay=1.0
)
🔀 Bulkhead Pattern
Resource Isolation:
# Separate thread pools per tipo di task
import concurrent.futures
from functools import wraps
class BulkheadExecutor:
def __init__(self):
# Separate pools for different workloads
self.cpu_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=4, thread_name_prefix="cpu-"
)
self.io_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=20, thread_name_prefix="io-"
)
self.ml_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix="ml-"
)
def cpu_bound(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.cpu_pool, func, *args, **kwargs)
return wrapper
def io_bound(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.io_pool, func, *args, **kwargs)
return wrapper
bulkhead = BulkheadExecutor()
# Utilizzo
@bulkhead.cpu_bound
def heavy_computation(data):
return complex_algorithm(data)
@bulkhead.io_bound
def file_operation(path):
return read_large_file(path)
Deployment e Versioning
🚀 Blue-Green Deployment
# Blue-Green deployment configuration
version: "3.8"
services:
# Blue environment (current production)
www-emblema-blue:
image: emblema/www:v1.2.3
labels:
- "traefik.http.routers.www-blue.rule=Host(`emblema.local`)"
- "traefik.http.routers.www-blue.priority=100"
- "traefik.http.services.www-blue.loadbalancer.server.port=3000"
environment:
- SERVICE_VERSION=v1.2.3
- ENVIRONMENT=blue
# Green environment (new version)
www-emblema-green:
image: emblema/www:v1.3.0
labels:
- "traefik.http.routers.www-green.rule=Host(`emblema.local`) && Headers(`X-Environment`, `green`)"
- "traefik.http.routers.www-green.priority=200"
- "traefik.http.services.www-green.loadbalancer.server.port=3000"
environment:
- SERVICE_VERSION=v1.3.0
- ENVIRONMENT=green
📈 API Versioning
URL-based versioning:
from fastapi import FastAPI
app_v1 = FastAPI(title="Emblema API", version="1.0")
app_v2 = FastAPI(title="Emblema API", version="2.0")
# V1 endpoints
@app_v1.post("/documents")
async def create_document_v1(document: DocumentV1):
return await legacy_create_document(document)
# V2 endpoints
@app_v2.post("/documents")
async def create_document_v2(document: DocumentV2):
return await create_document(document)
# Mount versioned apps
main_app = FastAPI()
main_app.mount("/v1", app_v1)
main_app.mount("/v2", app_v2)
Best Practices e Lessons Learned
✅ Best Practices Implementate
- Single Responsibility: Ogni servizio ha una responsabilità chiara
- Database per Service: Nessun database condiviso tra servizi
- API Contract First: Schema OpenAPI definiti prima dell'implementazione
- Graceful Degradation: Fallback quando servizi esterni non sono disponibili
- Idempotency: Operazioni ripetibili senza effetti collaterali
- Circuit Breaker: Prevenzione cascade failures
- Distributed Tracing: Visibilità completa delle request cross-service
⚠️ Pitfall da Evitare
- Distributed Monolith: Services troppo accoppiati
- Chatty Interfaces: Troppe chiamate small-grained
- Shared Database: Database condivisi rompono l'isolamento
- Synchronous Communication: Everything sync crea bottleneck
- Missing Compensation: Transazioni distribuite senza rollback
- Lack of Monitoring: Insufficient visibility in distributed system
Prossimi Passi
Completata l'architettura microservizi, esplora:
- Diagrammi di Flusso - Sequence diagrams dettagliati
- Low Level Design - Dettagli implementativi
- Componenti Core - Servizi specifici
- Infrastruttura - Deployment patterns
L'architettura microservizi di Emblema bilancia complessità e benefici, fornendo una base solida per crescita e evoluzione della piattaforma.