Servizi Backend
I servizi backend di Emblema costituiscono il cuore dell'elaborazione intelligente, fornendo capacità di AI processing, ricerca semantica, integrazione di protocolli e orchestrazione dei dati. Ogni servizio è progettato seguendo il principio Single Responsibility e comunica attraverso API REST e message queues.
Background Task Service
🤖 Architettura e Responsabilità
Il background-task è il servizio più critico di Emblema, responsabile dell'elaborazione asincrona di documenti, audio e video attraverso pipeline AI avanzate.
# Architettura reale del servizio
from fastapi import FastAPI
from celery import Celery
from app.routers import tasks, artifacts, cache
app = FastAPI()
# Celery configuration per ML workloads
celery_app = Celery(
__name__,
backend=os.getenv("CELERY_RESULT_URL", "redis://localhost:6379/6"),
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/6"),
)
# Worker pool recycling per prevenire memory leaks in ML models
celery_app.conf.worker_max_tasks_per_child = 1 # Libera VRAM dopo ogni task
📋 Task Management System
Task Lifecycle con Monitoring Automatico:
# Monitor task automatico ogni 5 secondi
@celery_app.task(base=Singleton, name="app.tasks.monitor_pending_tasks", lock_expiry=10)
def monitor_pending_tasks():
inspector = celery_app.control.inspect()
active = inspector.active() or {}
reserved = inspector.reserved() or {}
# Calcola available worker slots
active_tasks = sum(len(active.get(worker, [])) for worker in active)
reserved_tasks = sum(len(reserved.get(worker, [])) for worker in reserved)
if reserved_tasks >= MAX_CURRENCY:
return
# Cerca task in stato CREATED o STARTED troppo a lungo
new_tasks = task_handler.list(
where={
"_or": [
{"status": {"_eq": "CREATED"}},
{
"status": {"_eq": "STARTED"},
"startedAt": {
"_lt": (datetime.now() - timedelta(seconds=TASK_MAX_EXECUTION_TIME)).isoformat()
},
}
]
},
limit=MAX_CURRENCY - reserved_tasks - active_tasks,
)
for new_task in new_tasks:
create_task(new_task)
🧠 Sistema di Chunking Intelligente
AutoChunker con Detection Automatico:
class AutoChunker(Chunker):
def __init__(self, document: Document, config: ChunkingConfig):
super().__init__(document=document, config=config)
if document.sourceType == "File":
ext = Path(document.file.name).suffix.lower()
# Document chunking - PDF, TXT, MD, DOCX
if ext in DocumentChunker.SUPPORTED_EXTENSIONS:
logger.info(f"Detected document file: {ext}")
self._chunker = DocumentChunker(document=document, config=config)
# Audio chunking - MP3 con WhisperX + Speaker Diarization
elif ext in AudioChunker.SUPPORTED_EXTENSIONS:
logger.info(f"Detected audio file: {ext}")
self._chunker = AudioChunker(document=document, config=config)
# Video chunking - MP4 con estrazione audio + WhisperX
elif ext in VideoChunker.SUPPORTED_EXTENSIONS:
logger.info(f"Detected video file: {ext}")
self._chunker = VideoChunker(document=document, config=config)
else:
all_supported = (
DocumentChunker.SUPPORTED_EXTENSIONS |
AudioChunker.SUPPORTED_EXTENSIONS |
VideoChunker.SUPPORTED_EXTENSIONS
)
raise ValueError(f"Unsupported file type: {ext}. Supported: {all_supported}")
📄 Document Processing Pipeline
DocumentChunker con MinerU Integration:
class DocumentChunker(Chunker):
SUPPORTED_EXTENSIONS = {".pdf", ".txt", ".md", ".docx"}
async def process_document(self, document_id: str, chunking_config: dict):
"""
Pipeline completa di document processing:
1. File Optimization (DOCX -> PDF conversion se necessario)
2. MinerU Processing per estrazione layout-aware
3. Token-based Chunking con BGE-M3 tokenizer
4. Embedding Generation (1024 dimensioni)
5. Milvus Storage con metadata enrichment
"""
try:
# Phase 1: File optimization
optimized_file = await self.optimize_file(document_id)
# Phase 2: MinerU processing per PDF
if optimized_file.suffix.lower() == '.pdf':
extracted_content = await self.mineru_extract(optimized_file)
else:
extracted_content = await self.text_extract(optimized_file)
# Phase 3: Token-based chunking
chunks = await self.create_chunks(
content=extracted_content,
chunk_size=chunking_config.get('chunk_size', 512), # token count
overlap=chunking_config.get('chunk_overlap', 75), # token overlap
strategy=chunking_config.get('chunking_strategy', 'recursive')
)
# Phase 4: BGE-M3 embeddings
for chunk in chunks:
embedding = await self.generate_embedding(chunk.content)
chunk.embedding = embedding
# Phase 5: Milvus storage
await self.store_embeddings(document_id, chunks)
return chunks
except Exception as e:
logger.error(f"Document processing failed for {document_id}: {e}")
raise
🎵 Audio/Video Processing con Speaker Diarization
AudioChunker con WhisperX Integration:
class AudioChunker(Chunker):
SUPPORTED_EXTENSIONS = {".mp3", ".wav", ".m4a"}
async def process_audio(self, document_id: str, chunking_config: dict):
"""
Audio processing con speaker identification:
1. WhisperX transcription con timestamp precisi
2. Speaker diarization automatica
3. Chunk per speaker segment o fixed-size
4. Speaker profile mapping (opzionale)
"""
try:
# WhisperX processing con diarization
diarization_result = await self.whisperx_process(
audio_file=audio_file,
language=chunking_config.get('audio_language', 'it'),
device='cuda' if torch.cuda.is_available() else 'cpu'
)
# Estrai segments con speaker info
segments = []
for segment in diarization_result.segments:
segments.append({
'start': segment.start,
'end': segment.end,
'text': segment.text,
'speaker': segment.speaker, # SPEAKER_00, SPEAKER_01, etc.
})
# Chunking strategy
if chunking_config.get('chunking_strategy') == 'diarization':
# Un chunk per speaker segment
chunks = self.create_speaker_chunks(segments)
elif chunking_config.get('chunking_strategy') == 'speaker':
# Raggruppa consecutive segments dello stesso speaker
chunks = self.group_by_speaker(segments)
else:
# Fixed-size token-based chunks
chunks = self.create_fixed_chunks(segments, chunking_config)
# Arricchimento con speaker profiles se disponibili
if hasattr(document, 'speaker_profiles'):
chunks = self.enrich_with_profiles(chunks, document.speaker_profiles)
return chunks
except Exception as e:
logger.error(f"Audio processing failed for {document_id}: {e}")
raise
🎬 Video Processing Pipeline
VideoChunker con Audio Extraction:
class VideoChunker(Chunker):
SUPPORTED_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv"}
async def process_video(self, document_id: str, chunking_config: dict):
"""
Video processing pipeline:
1. Audio extraction da video (FFmpeg)
2. Conversione a MP3 ottimizzato
3. Transcription con WhisperX + diarization
4. Chunking basato su audio timeline
"""
try:
# Extract audio track from video
audio_file = await self.extract_audio(
video_file=video_file,
output_format='mp3',
sample_rate=16000 # Optimal per WhisperX
)
# Process using AudioChunker
audio_chunker = AudioChunker(document=document, config=config)
chunks = await audio_chunker.process_audio(document_id, chunking_config)
# Add video-specific metadata
for chunk in chunks:
chunk.metadata.update({
'source_type': 'video',
'video_file': document.file.name,
'audio_extracted': True,
'video_duration': await self.get_video_duration(video_file)
})
return chunks
except Exception as e:
logger.error(f"Video processing failed for {document_id}: {e}")
raise
🔧 Configuration e Performance
Chunking Presets basati su Use Case:
# Presets configurazioni chunking
CHUNKING_PRESETS = {
"default": {
"chunk_size": 512, # Tokens - optimal per BGE-M3
"overlap": 75, # Token overlap
"strategy": "recursive",
"model_name": "bge-m3"
},
"meeting": {
"chunk_size": 400, # Ottimale per conversazioni
"overlap": 100,
"strategy": "diarization", # Un chunk per speaker
"audio_language": "it"
},
"executive": {
"chunk_size": 300, # Informazioni dense
"overlap": 100,
"strategy": "semantic",
"embedding_threshold": 0.7
},
"technical": {
"chunk_size": 600, # Contenuti tecnici complessi
"overlap": 120,
"strategy": "recursive",
"preserve_structure": True
},
"qa": {
"chunk_size": 200, # Precisione massima per Q&A
"overlap": 25,
"strategy": "sentence",
"split_sentences": True
}
}
Retrieval Service (spp-retrieval)
🔍 Architettura Search Engine
Il spp-retrieval service fornisce capacità di ricerca semantica ottimizzata attraverso Milvus vector database.
# Struttura servizio retrieval
from fastapi import FastAPI
from app.routers import retrieval
app = FastAPI()
# CORS configuration per cross-origin requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # TODO: enhance in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(retrieval.router)
🎯 Semantic Search Implementation
Vector Similarity Search con Access Control:
class SemanticSearchEngine:
def __init__(self):
self.milvus_client = MilvusClient()
self.embedding_model = BGEEmbedder()
self.cache = RedisCache()
async def search(self, query: SearchQuery) -> SearchResults:
"""
Ricerca semantica multi-fase:
1. Query embedding generation (BGE-M3)
2. Cache check per performance
3. Milvus vector similarity search
4. Metadata enrichment da PostgreSQL
5. Access control filtering
6. Result ranking e caching
"""
# Generate query embedding
query_embedding = await self.embedding_model.encode(
text=query.text,
normalize=True # Per COSINE similarity
)
# Check cache first
cache_key = self.generate_cache_key(query)
cached_results = await self.cache.get(cache_key)
if cached_results and not query.force_refresh:
return self.apply_access_control(cached_results, query.user_id)
# Milvus vector search
search_params = {
"metric_type": "COSINE",
"params": {
"nprobe": 10, # Numero di cluster da esplorare
"ef": 32 # Search expansion factor
}
}
milvus_results = await self.milvus_client.search(
collection_name="document_chunks",
query_vectors=[query_embedding],
search_params=search_params,
limit=query.limit * 2, # Over-fetch per filtering
expr=self.build_filter_expression(query.filters)
)
# Enrich con metadata da PostgreSQL
enriched_results = []
for result in milvus_results[0]: # First query results
chunk_metadata = await self.get_chunk_metadata(result.id)
document_metadata = await self.get_document_metadata(chunk_metadata.document_id)
enriched_results.append({
'chunk_id': result.id,
'score': result.distance,
'content': chunk_metadata.content,
'document': document_metadata,
'metadata': chunk_metadata.metadata
})
# Apply access controls
filtered_results = self.apply_access_control(enriched_results, query.user_id)
# Rank by relevance + recency + authority
ranked_results = self.rank_results(filtered_results, query)
# Cache results
await self.cache.set(
key=cache_key,
value=ranked_results[:query.limit],
expire=3600 # 1 hour TTL
)
return SearchResults(
results=ranked_results[:query.limit],
total=len(filtered_results),
processing_time=time.time() - start_time,
cached=False
)
🔐 Access Control Integration
Permission-aware Search:
def apply_access_control(self, results: List[SearchResult], user_id: str) -> List[SearchResult]:
"""
Filtra risultati basato sui permessi utente:
- Document ownership
- Knowledge Base membership
- Shared document access
- Role-based permissions
"""
filtered_results = []
for result in results:
document = result['document']
# Owner access - sempre permesso
if document.created_by == user_id:
filtered_results.append(result)
continue
# Knowledge Base access
if document.knowledge_base_id:
kb_access = self.check_kb_access(
kb_id=document.knowledge_base_id,
user_id=user_id
)
if kb_access:
filtered_results.append(result)
continue
# Shared document access
shared_access = self.check_shared_access(
document_id=document.id,
user_id=user_id
)
if shared_access:
# Potrebbe essere read-only
result['access_level'] = shared_access.level
filtered_results.append(result)
return filtered_results
def build_filter_expression(self, filters: SearchFilters) -> str:
"""
Costruisce expression Milvus per pre-filtering:
- Document types
- Date ranges
- Knowledge base IDs
- Custom metadata filters
"""
conditions = []
if filters.document_types:
type_conditions = [f'document_type == "{t}"' for t in filters.document_types]
conditions.append(f"({' or '.join(type_conditions)})")
if filters.knowledge_base_ids:
kb_conditions = [f'knowledge_base_id == "{kb_id}"' for kb_id in filters.knowledge_base_ids]
conditions.append(f"({' or '.join(kb_conditions)})")
if filters.date_range:
start_ts = int(filters.date_range.start.timestamp())
end_ts = int(filters.date_range.end.timestamp())
conditions.append(f"created_at >= {start_ts} and created_at <= {end_ts}")
if filters.min_score:
# Note: questo viene applicato post-search
pass
return " and ".join(conditions) if conditions else ""
⚡ Performance Optimizations
Multi-level Caching Strategy:
class CacheStrategy:
def __init__(self):
self.l1_cache = {} # In-memory per worker
self.l2_cache = RedisClient() # Distributed cache
async def get_or_compute(self, key: str, compute_fn, ttl: int = 3600):
# L1 cache check
if key in self.l1_cache:
return self.l1_cache[key]
# L2 cache check
cached_value = await self.l2_cache.get(key)
if cached_value:
# Populate L1 cache
self.l1_cache[key] = cached_value
return cached_value
# Compute and cache
result = await compute_fn()
# Store in both levels
self.l1_cache[key] = result
await self.l2_cache.setex(key, ttl, result)
return result
MCP Servers (Model Context Protocol)
🔌 MCP Demo Server Implementation
Il mcp-demo service implementa il Model Context Protocol per integrazioni intelligenti con sistemi esterni.
#!/usr/bin/env python3
"""
MCP Todo List Server Demo
Dimostra le capacità MCP:
- Resources: Lista todos e todos individuali
- Tools: Add, update, delete todos
- Prompts: Templates per operazioni comuni
"""
from mcp.server.fastmcp import FastMCP
import json
import os
# Crea istanza MCP server
mcp = FastMCP("Todo List Server")
# Storage in-memory per todos
todos: Dict[str, dict] = {}
next_id: int = 1
# Persistent storage
TODO_FILE = os.environ.get("TODO_FILE", os.path.expanduser("~/.mcp-todos.json"))
def load_todos():
"""Load todos from persistent storage"""
global todos, next_id
if os.path.exists(TODO_FILE):
try:
with open(TODO_FILE, "r") as f:
data = json.load(f)
todos = data.get("todos", {})
next_id = data.get("next_id", 1)
except Exception as e:
print(f"Error loading todos: {e}")
def save_todos():
"""Save todos to persistent storage"""
try:
with open(TODO_FILE, "w") as f:
json.dump({"todos": todos, "next_id": next_id}, f, indent=2)
except Exception as e:
print(f"Error saving todos: {e}")
🛠️ MCP Resources Implementation
Resources per accesso dati:
@mcp.resource("todo://list")
async def list_todos() -> str:
"""Resource che fornisce lista completa todos"""
if not todos:
return json.dumps({"todos": [], "message": "No todos found"})
return json.dumps({
"todos": list(todos.values()),
"count": len(todos)
}, indent=2)
@mcp.resource("todo://todo/{todo_id}")
async def get_todo(todo_id: str) -> str:
"""Resource per todo specifico"""
if todo_id not in todos:
return json.dumps({"error": f"Todo {todo_id} not found"})
return json.dumps(todos[todo_id], indent=2)
⚙️ MCP Tools Implementation
Tools per azioni esterne:
@mcp.tool("add_todo")
async def add_todo(title: str, description: str = "", priority: str = "medium") -> str:
"""Tool per aggiungere nuovo todo"""
global next_id
todo_id = str(next_id)
next_id += 1
todo = {
"id": todo_id,
"title": title,
"description": description,
"priority": priority,
"completed": False,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat()
}
todos[todo_id] = todo
save_todos()
return f"Added todo '{title}' with ID {todo_id}"
@mcp.tool("update_todo")
async def update_todo(
todo_id: str,
title: Optional[str] = None,
description: Optional[str] = None,
priority: Optional[str] = None,
completed: Optional[bool] = None
) -> str:
"""Tool per aggiornare todo esistente"""
if todo_id not in todos:
return f"Todo {todo_id} not found"
todo = todos[todo_id]
if title is not None:
todo["title"] = title
if description is not None:
todo["description"] = description
if priority is not None:
todo["priority"] = priority
if completed is not None:
todo["completed"] = completed
todo["updated_at"] = datetime.now().isoformat()
save_todos()
return f"Updated todo {todo_id}"
@mcp.tool("delete_todo")
async def delete_todo(todo_id: str) -> str:
"""Tool per eliminare todo"""
if todo_id not in todos:
return f"Todo {todo_id} not found"
title = todos[todo_id]["title"]
del todos[todo_id]
save_todos()
return f"Deleted todo '{title}' (ID: {todo_id})"
💡 MCP Prompts Templates
Prompts per operazioni comuni:
@mcp.prompt("summarize_todos")
async def summarize_todos() -> str:
"""Prompt template per summary todos"""
if not todos:
return "No todos to summarize."
completed = sum(1 for todo in todos.values() if todo["completed"])
pending = len(todos) - completed
priorities = {}
for todo in todos.values():
if not todo["completed"]:
priorities[todo["priority"]] = priorities.get(todo["priority"], 0) + 1
summary = f"""Todo Summary:
- Total todos: {len(todos)}
- Completed: {completed}
- Pending: {pending}
Pending by priority:"""
for priority, count in priorities.items():
summary += f"\n - {priority}: {count}"
return summary
@mcp.prompt("daily_agenda")
async def daily_agenda() -> str:
"""Prompt template per agenda giornaliera"""
high_priority = [t for t in todos.values() if not t["completed"] and t["priority"] == "high"]
medium_priority = [t for t in todos.values() if not t["completed"] and t["priority"] == "medium"]
agenda = "Today's Agenda:\n\n"
if high_priority:
agenda += "🔥 High Priority:\n"
for todo in high_priority:
agenda += f" - {todo['title']}\n"
agenda += "\n"
if medium_priority:
agenda += "📋 Medium Priority:\n"
for todo in medium_priority[:5]: # Limit to 5
agenda += f" - {todo['title']}\n"
agenda += "\n"
return agenda
API Gateway (Traefik)
🌐 Reverse Proxy e Load Balancing
Traefik funge da API Gateway unificato per tutti i servizi Emblema, fornendo routing intelligente, SSL termination e load balancing.
Configurazione Dinamica con Labels:
# Configurazione Traefik reale per servizi
services:
traefik:
image: traefik:v3.0
command:
- "--api.dashboard=true"
- "--providers.docker=true"
- "--providers.docker.network=emblema"
- "--entrypoints.web.address=:80"
- "--entrypoints.websecure.address=:443"
- "--certificatesresolvers.letsencrypt.acme.tlschallenge=true"
- "--certificatesresolvers.letsencrypt.acme.email=${ACME_EMAIL}"
- "--certificatesresolvers.letsencrypt.acme.storage=/letsencrypt/acme.json"
ports:
- "80:80"
- "443:443"
- "8080:8080" # Dashboard
networks:
- emblema
# Service routing examples
www-emblema:
labels:
- "traefik.enable=true"
- "traefik.http.routers.emblema-web.rule=Host(`${EMBLEMA_WEB_HOSTNAME}`)"
- "traefik.http.routers.emblema-web.entrypoints=websecure"
- "traefik.http.routers.emblema-web.tls.certresolver=letsencrypt"
- "traefik.http.services.emblema-web.loadbalancer.server.port=3000"
background-task:
labels:
- "traefik.enable=true"
- "traefik.http.routers.background-task.rule=Host(`${BACKGROUND_TASK_HOSTNAME}`)"
- "traefik.http.routers.background-task.entrypoints=websecure"
- "traefik.http.services.background-task.loadbalancer.server.port=8000"
spp-retrieval:
labels:
- "traefik.enable=true"
- "traefik.http.routers.retrieval.rule=Host(`${RETRIEVAL_HOSTNAME}`)"
- "traefik.http.routers.retrieval.entrypoints=websecure"
- "traefik.http.services.retrieval.loadbalancer.server.port=8001"
🔒 Security e SSL Management
Automatic HTTPS con Let's Encrypt:
# SSL termination automatica
traefik:
command:
- "--certificatesresolvers.letsencrypt.acme.tlschallenge=true"
- "--certificatesresolvers.letsencrypt.acme.email=${ACME_EMAIL}"
- "--certificatesresolvers.letsencrypt.acme.storage=/letsencrypt/acme.json"
volumes:
- "letsencrypt:/letsencrypt"
# Custom certificates per air-gapped environments
traefik-custom:
volumes:
- "./config/traefik/certs:/certs:ro"
command:
- "--providers.file.directory=/certs"
- "--providers.file.watch=true"
⚖️ Load Balancing Strategies
Multi-instance Load Balancing:
# Round-robin load balancing
www-emblema-1:
labels:
- "traefik.http.services.www-emblema.loadbalancer.server.port=3000"
- "traefik.http.services.www-emblema.loadbalancer.server.weight=100"
www-emblema-2:
labels:
- "traefik.http.services.www-emblema.loadbalancer.server.port=3000"
- "traefik.http.services.www-emblema.loadbalancer.server.weight=100"
# Weighted routing per A/B testing
www-emblema-canary:
labels:
- "traefik.http.services.www-emblema-canary.loadbalancer.server.weight=10"
- "traefik.http.routers.www-canary.rule=Host(`emblema.local`) && Headers(`X-Canary`, `true`)"
📊 Monitoring e Health Checks
Service Health Monitoring:
# Health check endpoints
background-task:
labels:
- "traefik.http.services.background-task.loadbalancer.healthcheck.path=/health"
- "traefik.http.services.background-task.loadbalancer.healthcheck.interval=30s"
- "traefik.http.services.background-task.loadbalancer.healthcheck.timeout=10s"
# Metrics collection
traefik:
command:
- "--metrics.prometheus=true"
- "--metrics.prometheus.addEntryPointsLabels=true"
- "--metrics.prometheus.addServicesLabels=true"
labels:
- "traefik.http.routers.traefik-metrics.rule=Host(`traefik.emblema.local`) && Path(`/metrics`)"
Inter-Service Communication
🔄 Service Discovery
DNS-based Discovery tramite Docker Networks:
# Service client con automatic discovery
class ServiceClient:
def __init__(self, service_name: str, port: int = None):
self.base_url = f"http://{service_name}"
if port:
self.base_url += f":{port}"
async def health_check(self) -> bool:
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.base_url}/health", timeout=5.0)
return response.status_code == 200
except:
return False
# Usage examples
background_task_client = ServiceClient("background-task")
retrieval_client = ServiceClient("spp-retrieval", 8001)
mcp_client = ServiceClient("mcp-demo", 4001)
📡 Message Queue Integration
Async Communication via Redis/Celery:
# Event publishing tra servizi
class EventBus:
def __init__(self):
self.redis = redis.Redis.from_url(os.getenv("REDIS_URL"))
async def publish_event(self, event_type: str, data: dict):
event = {
"type": event_type,
"data": data,
"timestamp": datetime.now().isoformat(),
"source_service": os.getenv("SERVICE_NAME")
}
await self.redis.publish(f"events:{event_type}", json.dumps(event))
async def subscribe_events(self, event_types: List[str], handler):
pubsub = self.redis.pubsub()
for event_type in event_types:
await pubsub.subscribe(f"events:{event_type}")
async for message in pubsub.listen():
if message['type'] == 'message':
event = json.loads(message['data'])
await handler(event)
# Event handlers
async def handle_document_processed(event):
document_id = event['data']['document_id']
# Trigger notifications, update UI, etc.
event_bus = EventBus()
await event_bus.subscribe_events(['document.processed'], handle_document_processed)
Performance Monitoring
📈 Service Metrics
Prometheus Integration:
from prometheus_client import Counter, Histogram, Gauge
# Service-level metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total requests', ['service', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'Request duration', ['service'])
ACTIVE_TASKS = Gauge('celery_active_tasks', 'Active Celery tasks', ['queue'])
# Business metrics
DOCUMENTS_PROCESSED = Counter('documents_processed_total', 'Documents processed', ['status'])
EMBEDDINGS_GENERATED = Counter('embeddings_generated_total', 'Embeddings generated')
SEARCH_QUERIES = Counter('search_queries_total', 'Search queries', ['user_id'])
# Health check metrics
SERVICE_HEALTH = Gauge('service_health', 'Service health status', ['service'])
Prossimi Passi
I servizi backend costituiscono il foundation tecnologico di Emblema. Per approfondimenti:
- Layer Dati - PostgreSQL, Milvus, MinIO, Redis
- Autenticazione - Keycloak, JWT, permissions
- Architettura Docker - Containerizzazione
- Diagrammi di Flusso - Workflow operativi
I servizi backend di Emblema sono progettati per scalabilità, affidabilità e manutenibilità, fornendo una base solida per l'elaborazione intelligente di contenuti multimediali.