Passa al contenuto principale

Architettura Microservizi

L'architettura microservizi di Emblema è progettata per massimizzare scalabilità, resilienza e manutenibilità. Ogni servizio ha responsabilità specifiche e comunica attraverso interfacce ben definite, permettendo sviluppo, deploy e scaling indipendenti.

Panoramica dell'Architettura

🏗️ Service Mesh Architecture

Emblema utilizza un'architettura hybrid service mesh dove Traefik funge da API Gateway e service proxy, gestendo routing, load balancing e terminazione SSL.

Loading diagram...

🔧 Service Catalog

Frontend Services

  • www-emblema: Applicazione web principale (Next.js 14)
  • doc-emblema: Documentazione (Docusaurus)

Core Services

  • background-task: Elaborazione AI asincrona (Python/FastAPI/Celery)
  • spp-retrieval: Ricerca semantica (Python/FastAPI)
  • document-render: Rendering documenti (Python)
  • mcp-demo: Model Context Protocol server
  • novu-bridge: Bridge per notifiche

Infrastructure Services

  • Traefik: Reverse proxy e load balancer
  • Keycloak: Identity and Access Management
  • LiteLLM: Gateway unificato per modelli AI
  • Hasura: Engine GraphQL

Comunicazione Inter-Service

🔄 Pattern di Comunicazione

1. Synchronous Communication

REST APIs: Per operazioni real-time e interactive

// Esempio client HTTP unificato
class ServiceClient {
private baseURL: string;
private timeout: number = 30000;

constructor(serviceName: string) {
this.baseURL = `http://${serviceName}`;
}

async post<T>(endpoint: string, data: any): Promise<T> {
const response = await fetch(`${this.baseURL}${endpoint}`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${await this.getToken()}`,
},
body: JSON.stringify(data),
signal: AbortSignal.timeout(this.timeout),
});

if (!response.ok) {
throw new ServiceError(`${serviceName} error: ${response.statusText}`);
}

return response.json();
}
}

// Utilizzo
const backgroundTaskClient = new ServiceClient("background-task");
const processingResult = await backgroundTaskClient.post("/tasks", {
document_id: documentId,
chunker_config: config,
});

GraphQL: Per query complesse e subscription real-time

// Hasura GraphQL client con subscription
import { createClient } from "graphql-ws";

const graphqlClient = createClient({
url: "ws://graphql-engine:8080/v1/graphql",
connectionParams: {
headers: {
"x-hasura-admin-secret": process.env.HASURA_ADMIN_SECRET,
},
},
});

// Subscription per aggiornamenti real-time
const subscription = graphqlClient.iterate({
query: `
subscription TaskUpdates($userId: uuid!) {
task(where: {created_by: {_eq: $userId}}) {
id
status
progress
error_message
}
}
`,
variables: { userId },
});

for await (const result of subscription) {
updateUI(result.data);
}

2. Asynchronous Communication

Message Queue con Redis: Per task asincroni

# background-task/celery_app.py - Configurazione reale
from celery import Celery
from celery.schedules import crontab

celery_app = Celery(
__name__,
backend=os.getenv("CELERY_RESULT_URL", "redis://localhost:6379/6"),
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/6"),
)

# Task scheduling configuration
celery_app.conf.beat_schedule = {
"monitor-pending-periodically": {
"task": "app.tasks.monitor_pending_tasks",
"schedule": TASK_MONITOR_INTERVAL, # ogni 5 secondi
},
"delete-temp-files-periodically": {
"task": "app.tasks.delete_temp_files",
"schedule": crontab(minute=0, hour=3), # 3:00 AM UTC
},
}

# Worker pool recycling per ML models
celery_app.conf.worker_max_tasks_per_child = 1 # Libera VRAM dopo ogni task

Event-Driven con WebSockets: Per notifiche real-time

// Real-time event handling
class EventBus {
private ws: WebSocket;
private handlers = new Map<string, Function[]>();

constructor() {
this.ws = new WebSocket("ws://www-emblema:3000/ws");
this.setupEventHandlers();
}

subscribe(event: string, handler: Function) {
if (!this.handlers.has(event)) {
this.handlers.set(event, []);
}
this.handlers.get(event)!.push(handler);
}

private setupEventHandlers() {
this.ws.onmessage = (event) => {
const { type, data } = JSON.parse(event.data);
const handlers = this.handlers.get(type) || [];
handlers.forEach((handler) => handler(data));
};
}
}

// Utilizzo
const eventBus = new EventBus();
eventBus.subscribe("document.processing.completed", (data) => {
showNotification(`Document ${data.document_id} processed successfully`);
});

Service Discovery e Routing

🗺️ Service Discovery

Emblema utilizza DNS-based service discovery attraverso Docker networking e Traefik per service mesh capabilities.

Docker Compose Networking:

# Configurazione networking reale
networks:
emblema:
driver: bridge
name: emblema
redis-net:
driver: bridge

services:
www-emblema:
networks:
- emblema
depends_on:
- background-task
- milvus
- graphql-engine

Service Discovery tramite DNS:

// I servizi si trovano automaticamente tramite nome DNS
const serviceEndpoints = {
backgroundTask: "http://background-task", // Docker service name
retrieval: "http://spp-retrieval:8001", // con porta custom
graphql: "http://graphql-engine:8080/v1/graphql",
milvus: "http://milvus:19530/v2/vectordb",
litellm: "http://litellm:4000/v1",
};

🎯 Dynamic Routing con Traefik

Configurazione automatica con labels:

# Configurazione Traefik labels reale
www-emblema:
labels:
- "traefik.enable=true"
- "traefik.http.routers.emblema-web.rule=Host(`${EMBLEMA_WEB_HOSTNAME}`)"
- "traefik.http.routers.emblema-web.entrypoints=websecure"
- "traefik.http.routers.emblema-web.tls=true"
- "traefik.http.routers.emblema-web.tls.certresolver=${CERT_RESOLVER:-}"
- "traefik.http.services.emblema-web.loadbalancer.server.port=3000"

background-task:
labels:
- "traefik.enable=true"
- "traefik.http.routers.background-task.rule=Host(`${BACKGROUND_TASK_HOSTNAME}`)"
- "traefik.http.routers.background-task.entrypoints=websecure"
- "traefik.http.services.background-task.loadbalancer.server.port=8000"

Path-based routing per API versioning:

# API Gateway routing rules
- "traefik.http.routers.api-v1.rule=Host(`api.emblema.local`) && PathPrefix(`/v1`)"
- "traefik.http.routers.api-v2.rule=Host(`api.emblema.local`) && PathPrefix(`/v2`)"

Load Balancing e Scalability

⚖️ Load Balancing Strategies

1. Round Robin (Default)

# Traefik load balancing configuration
services:
www-emblema:
deploy:
replicas: 3
labels:
- "traefik.http.services.www-emblema.loadbalancer.server.port=3000"
# Round-robin è il default

2. Weighted Round Robin per A/B Testing

# A/B testing con weights
www-emblema-v1:
labels:
- "traefik.http.services.www.loadbalancer.server.port=3000"
- "traefik.http.services.www.loadbalancer.server.weight=80"

www-emblema-v2:
labels:
- "traefik.http.services.www.loadbalancer.server.port=3000"
- "traefik.http.services.www.loadbalancer.server.weight=20"

3. Sticky Sessions per Stateful Services

# Session affinity
keycloak:
labels:
- "traefik.http.services.keycloak.loadbalancer.sticky.cookie.name=keycloak-session"
- "traefik.http.services.keycloak.loadbalancer.sticky.cookie.secure=true"

📈 Horizontal Scaling

Auto-scaling con Docker Compose:

# Scaling configuration
background-task-worker:
image: emblema/background-task:${EMBLEMA_VERSION:-dev}
command: /app/.venv/bin/celery -A app.celery_app:celery_app worker --concurrency=${CELERY_MAX_CONCURRENCY:-1}
deploy:
replicas: ${BACKGROUND_WORKER_REPLICAS:-2}
resources:
reservations:
devices:
- driver: nvidia
device_ids: ["0", "1"] # Multiple GPU support
capabilities: [gpu]

Resource-based scaling:

# Celery autoscaling configuration
from celery import Celery

celery_app = Celery(__name__)

# Auto-scale workers basato su queue size
celery_app.conf.worker_autoscaler = 'celery.worker.autoscale:Autoscaler'
celery_app.conf.worker_max_concurrency = int(os.getenv('CELERY_MAX_WORKERS', 4))
celery_app.conf.worker_min_concurrency = int(os.getenv('CELERY_MIN_WORKERS', 1))

# Scale up quando queue > 10 tasks
# Scale down quando idle > 300 secondi
celery_app.conf.worker_autoscaler_settings = {
'max': 4,
'min': 1,
'queue_length_threshold': 10,
'idle_timeout': 300
}

🔧 Health Checks e Circuit Breaker

Service Health Monitoring:

# Health check endpoint in ogni servizio
from fastapi import FastAPI, HTTPStatus
from fastapi.responses import JSONResponse

app = FastAPI()

@app.get("/health")
async def health_check():
try:
# Check dependencies
await check_database_connection()
await check_redis_connection()
await check_external_services()

return JSONResponse(
status_code=HTTPStatus.OK,
content={
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": os.getenv("SERVICE_VERSION", "unknown")
}
)
except Exception as e:
return JSONResponse(
status_code=HTTPStatus.SERVICE_UNAVAILABLE,
content={
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
)

Circuit Breaker Pattern:

// Circuit breaker per service calls
class CircuitBreaker {
private failures = 0;
private lastFailureTime: Date | null = null;
private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED";

constructor(
private threshold = 5,
private timeout = 60000,
private monitoringPeriod = 10000,
) {}

async call<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === "OPEN") {
if (this.shouldAttemptReset()) {
this.state = "HALF_OPEN";
} else {
throw new Error("Circuit breaker is OPEN");
}
}

try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}

private onSuccess() {
this.failures = 0;
this.state = "CLOSED";
}

private onFailure() {
this.failures++;
this.lastFailureTime = new Date();

if (this.failures >= this.threshold) {
this.state = "OPEN";
}
}

private shouldAttemptReset(): boolean {
return (
this.lastFailureTime !== null &&
Date.now() - this.lastFailureTime.getTime() >= this.timeout
);
}
}

// Utilizzo
const breaker = new CircuitBreaker();
const result = await breaker.call(() => fetch("http://background-task/health"));

Data Consistency e Transaction Management

🔄 Eventual Consistency Pattern

Saga Pattern per transazioni distribuite:

# Esempio: Document processing saga
class DocumentProcessingSaga:
def __init__(self, document_id: str):
self.document_id = document_id
self.steps = []

async def execute(self):
try:
# Step 1: Upload to MinIO
storage_key = await self.upload_to_storage()
self.steps.append(('upload', storage_key))

# Step 2: Update document status
await self.update_document_status('processing')
self.steps.append(('status_update', 'processing'))

# Step 3: Process with AI
chunks = await self.process_document()
self.steps.append(('processing', chunks))

# Step 4: Store embeddings
await self.store_embeddings(chunks)
self.steps.append(('embeddings', len(chunks)))

# Step 5: Final status update
await self.update_document_status('completed')

except Exception as e:
await self.compensate()
raise

async def compensate(self):
"""Rollback in reverse order"""
for step_type, data in reversed(self.steps):
try:
if step_type == 'upload':
await self.delete_from_storage(data)
elif step_type == 'embeddings':
await self.delete_embeddings(self.document_id)
elif step_type == 'status_update':
await self.update_document_status('failed')
except Exception as e:
logger.error(f"Compensation failed for {step_type}: {e}")

📊 Event Sourcing per Audit Trail

# Event store per tracciabilità
class EventStore:
def __init__(self):
self.events = []

async def append_event(self, event: Event):
event.timestamp = datetime.utcnow()
event.id = str(uuid4())

# Store in PostgreSQL
await self.db.execute(
"INSERT INTO events (id, aggregate_id, event_type, payload, timestamp) VALUES (%s, %s, %s, %s, %s)",
(event.id, event.aggregate_id, event.event_type, event.payload, event.timestamp)
)

# Publish to Redis for real-time subscribers
await self.redis.publish(f"events:{event.aggregate_id}", event.to_json())

# Utilizzo per document lifecycle
async def handle_document_upload(document_id: str):
event = DocumentUploadedEvent(
aggregate_id=document_id,
payload={'file_size': file_size, 'file_type': file_type}
)
await event_store.append_event(event)

Monitoring e Observability

📈 Distributed Tracing

OpenTelemetry Integration:

# Tracing configuration
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Setup tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)

span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# Auto-instrument FastAPI and requests
FastAPIInstrumentor.instrument_app(app)
RequestsInstrumentor().instrument()

# Manual tracing per business logic
@tracer.start_as_current_span("process_document")
async def process_document(document_id: str):
with tracer.start_as_current_span("download_file") as span:
span.set_attribute("document.id", document_id)
file_content = await download_file(document_id)
span.set_attribute("file.size", len(file_content))

with tracer.start_as_current_span("extract_text"):
text = await extract_text(file_content)
span.set_attribute("text.length", len(text))

return text

📊 Service Metrics

Prometheus metrics per servizio:

from prometheus_client import Counter, Histogram, Gauge
import time

# Service-level metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')
ACTIVE_CONNECTIONS = Gauge('active_connections', 'Active connections')

# Business metrics
DOCUMENTS_PROCESSED = Counter('documents_processed_total', 'Documents processed', ['status'])
PROCESSING_TIME = Histogram('document_processing_seconds', 'Document processing time')
QUEUE_SIZE = Gauge('celery_queue_size', 'Celery queue size', ['queue'])

# Middleware per automatic metrics
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()

response = await call_next(request)

REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()

REQUEST_DURATION.observe(time.time() - start_time)

return response

Fault Tolerance e Resilience

🛡️ Retry Strategies

Exponential Backoff con Jitter:

import asyncio
import random
from typing import Callable, Any

async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
) -> Any:
"""Retry with exponential backoff and jitter"""

for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
if attempt == max_retries:
raise e

# Calculate delay with exponential backoff
delay = min(base_delay * (2 ** attempt), max_delay)

# Add jitter to prevent thundering herd
if jitter:
delay *= (0.5 + random.random() * 0.5)

logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)

# Utilizzo
await retry_with_backoff(
lambda: call_external_service(),
max_retries=3,
base_delay=1.0
)

🔀 Bulkhead Pattern

Resource Isolation:

# Separate thread pools per tipo di task
import concurrent.futures
from functools import wraps

class BulkheadExecutor:
def __init__(self):
# Separate pools for different workloads
self.cpu_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=4, thread_name_prefix="cpu-"
)
self.io_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=20, thread_name_prefix="io-"
)
self.ml_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix="ml-"
)

def cpu_bound(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.cpu_pool, func, *args, **kwargs)
return wrapper

def io_bound(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.io_pool, func, *args, **kwargs)
return wrapper

bulkhead = BulkheadExecutor()

# Utilizzo
@bulkhead.cpu_bound
def heavy_computation(data):
return complex_algorithm(data)

@bulkhead.io_bound
def file_operation(path):
return read_large_file(path)

Deployment e Versioning

🚀 Blue-Green Deployment

# Blue-Green deployment configuration
version: "3.8"

services:
# Blue environment (current production)
www-emblema-blue:
image: emblema/www:v1.2.3
labels:
- "traefik.http.routers.www-blue.rule=Host(`emblema.local`)"
- "traefik.http.routers.www-blue.priority=100"
- "traefik.http.services.www-blue.loadbalancer.server.port=3000"
environment:
- SERVICE_VERSION=v1.2.3
- ENVIRONMENT=blue

# Green environment (new version)
www-emblema-green:
image: emblema/www:v1.3.0
labels:
- "traefik.http.routers.www-green.rule=Host(`emblema.local`) && Headers(`X-Environment`, `green`)"
- "traefik.http.routers.www-green.priority=200"
- "traefik.http.services.www-green.loadbalancer.server.port=3000"
environment:
- SERVICE_VERSION=v1.3.0
- ENVIRONMENT=green

📈 API Versioning

URL-based versioning:

from fastapi import FastAPI

app_v1 = FastAPI(title="Emblema API", version="1.0")
app_v2 = FastAPI(title="Emblema API", version="2.0")

# V1 endpoints
@app_v1.post("/documents")
async def create_document_v1(document: DocumentV1):
return await legacy_create_document(document)

# V2 endpoints
@app_v2.post("/documents")
async def create_document_v2(document: DocumentV2):
return await create_document(document)

# Mount versioned apps
main_app = FastAPI()
main_app.mount("/v1", app_v1)
main_app.mount("/v2", app_v2)

Best Practices e Lessons Learned

✅ Best Practices Implementate

  1. Single Responsibility: Ogni servizio ha una responsabilità chiara
  2. Database per Service: Nessun database condiviso tra servizi
  3. API Contract First: Schema OpenAPI definiti prima dell'implementazione
  4. Graceful Degradation: Fallback quando servizi esterni non sono disponibili
  5. Idempotency: Operazioni ripetibili senza effetti collaterali
  6. Circuit Breaker: Prevenzione cascade failures
  7. Distributed Tracing: Visibilità completa delle request cross-service

⚠️ Pitfall da Evitare

  1. Distributed Monolith: Services troppo accoppiati
  2. Chatty Interfaces: Troppe chiamate small-grained
  3. Shared Database: Database condivisi rompono l'isolamento
  4. Synchronous Communication: Everything sync crea bottleneck
  5. Missing Compensation: Transazioni distribuite senza rollback
  6. Lack of Monitoring: Insufficient visibility in distributed system

Prossimi Passi

Completata l'architettura microservizi, esplora:

  1. Diagrammi di Flusso - Sequence diagrams dettagliati
  2. Low Level Design - Dettagli implementativi
  3. Componenti Core - Servizi specifici
  4. Infrastruttura - Deployment patterns

L'architettura microservizi di Emblema bilancia complessità e benefici, fornendo una base solida per crescita e evoluzione della piattaforma.

Questa pagina ti è stata utile?