Passa al contenuto principale

Servizi Backend

I servizi backend di Emblema costituiscono il cuore dell'elaborazione intelligente, fornendo capacità di AI processing, ricerca semantica, integrazione di protocolli e orchestrazione dei dati. Ogni servizio è progettato seguendo il principio Single Responsibility e comunica attraverso API REST e message queues.

Background Task Service

🤖 Architettura e Responsabilità

Il background-task è il servizio più critico di Emblema, responsabile dell'elaborazione asincrona di documenti, audio e video attraverso pipeline AI avanzate.

# Architettura reale del servizio
from fastapi import FastAPI
from celery import Celery
from app.routers import tasks, artifacts, cache

app = FastAPI()

# Celery configuration per ML workloads
celery_app = Celery(
__name__,
backend=os.getenv("CELERY_RESULT_URL", "redis://localhost:6379/6"),
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/6"),
)

# Worker pool recycling per prevenire memory leaks in ML models
celery_app.conf.worker_max_tasks_per_child = 1 # Libera VRAM dopo ogni task

📋 Task Management System

Task Lifecycle con Monitoring Automatico:

# Monitor task automatico ogni 5 secondi
@celery_app.task(base=Singleton, name="app.tasks.monitor_pending_tasks", lock_expiry=10)
def monitor_pending_tasks():
inspector = celery_app.control.inspect()
active = inspector.active() or {}
reserved = inspector.reserved() or {}

# Calcola available worker slots
active_tasks = sum(len(active.get(worker, [])) for worker in active)
reserved_tasks = sum(len(reserved.get(worker, [])) for worker in reserved)

if reserved_tasks >= MAX_CURRENCY:
return

# Cerca task in stato CREATED o STARTED troppo a lungo
new_tasks = task_handler.list(
where={
"_or": [
{"status": {"_eq": "CREATED"}},
{
"status": {"_eq": "STARTED"},
"startedAt": {
"_lt": (datetime.now() - timedelta(seconds=TASK_MAX_EXECUTION_TIME)).isoformat()
},
}
]
},
limit=MAX_CURRENCY - reserved_tasks - active_tasks,
)

for new_task in new_tasks:
create_task(new_task)

🧠 Sistema di Chunking Intelligente

AutoChunker con Detection Automatico:

class AutoChunker(Chunker):
def __init__(self, document: Document, config: ChunkingConfig):
super().__init__(document=document, config=config)

if document.sourceType == "File":
ext = Path(document.file.name).suffix.lower()

# Document chunking - PDF, TXT, MD, DOCX
if ext in DocumentChunker.SUPPORTED_EXTENSIONS:
logger.info(f"Detected document file: {ext}")
self._chunker = DocumentChunker(document=document, config=config)

# Audio chunking - MP3 con WhisperX + Speaker Diarization
elif ext in AudioChunker.SUPPORTED_EXTENSIONS:
logger.info(f"Detected audio file: {ext}")
self._chunker = AudioChunker(document=document, config=config)

# Video chunking - MP4 con estrazione audio + WhisperX
elif ext in VideoChunker.SUPPORTED_EXTENSIONS:
logger.info(f"Detected video file: {ext}")
self._chunker = VideoChunker(document=document, config=config)

else:
all_supported = (
DocumentChunker.SUPPORTED_EXTENSIONS |
AudioChunker.SUPPORTED_EXTENSIONS |
VideoChunker.SUPPORTED_EXTENSIONS
)
raise ValueError(f"Unsupported file type: {ext}. Supported: {all_supported}")

📄 Document Processing Pipeline

DocumentChunker con MinerU Integration:

class DocumentChunker(Chunker):
SUPPORTED_EXTENSIONS = {".pdf", ".txt", ".md", ".docx"}

async def process_document(self, document_id: str, chunking_config: dict):
"""
Pipeline completa di document processing:
1. File Optimization (DOCX -> PDF conversion se necessario)
2. MinerU Processing per estrazione layout-aware
3. Token-based Chunking con BGE-M3 tokenizer
4. Embedding Generation (1024 dimensioni)
5. Milvus Storage con metadata enrichment
"""
try:
# Phase 1: File optimization
optimized_file = await self.optimize_file(document_id)

# Phase 2: MinerU processing per PDF
if optimized_file.suffix.lower() == '.pdf':
extracted_content = await self.mineru_extract(optimized_file)
else:
extracted_content = await self.text_extract(optimized_file)

# Phase 3: Token-based chunking
chunks = await self.create_chunks(
content=extracted_content,
chunk_size=chunking_config.get('chunk_size', 512), # token count
overlap=chunking_config.get('chunk_overlap', 75), # token overlap
strategy=chunking_config.get('chunking_strategy', 'recursive')
)

# Phase 4: BGE-M3 embeddings
for chunk in chunks:
embedding = await self.generate_embedding(chunk.content)
chunk.embedding = embedding

# Phase 5: Milvus storage
await self.store_embeddings(document_id, chunks)

return chunks

except Exception as e:
logger.error(f"Document processing failed for {document_id}: {e}")
raise

🎵 Audio/Video Processing con Speaker Diarization

AudioChunker con WhisperX Integration:

class AudioChunker(Chunker):
SUPPORTED_EXTENSIONS = {".mp3", ".wav", ".m4a"}

async def process_audio(self, document_id: str, chunking_config: dict):
"""
Audio processing con speaker identification:
1. WhisperX transcription con timestamp precisi
2. Speaker diarization automatica
3. Chunk per speaker segment o fixed-size
4. Speaker profile mapping (opzionale)
"""
try:
# WhisperX processing con diarization
diarization_result = await self.whisperx_process(
audio_file=audio_file,
language=chunking_config.get('audio_language', 'it'),
device='cuda' if torch.cuda.is_available() else 'cpu'
)

# Estrai segments con speaker info
segments = []
for segment in diarization_result.segments:
segments.append({
'start': segment.start,
'end': segment.end,
'text': segment.text,
'speaker': segment.speaker, # SPEAKER_00, SPEAKER_01, etc.
})

# Chunking strategy
if chunking_config.get('chunking_strategy') == 'diarization':
# Un chunk per speaker segment
chunks = self.create_speaker_chunks(segments)
elif chunking_config.get('chunking_strategy') == 'speaker':
# Raggruppa consecutive segments dello stesso speaker
chunks = self.group_by_speaker(segments)
else:
# Fixed-size token-based chunks
chunks = self.create_fixed_chunks(segments, chunking_config)

# Arricchimento con speaker profiles se disponibili
if hasattr(document, 'speaker_profiles'):
chunks = self.enrich_with_profiles(chunks, document.speaker_profiles)

return chunks

except Exception as e:
logger.error(f"Audio processing failed for {document_id}: {e}")
raise

🎬 Video Processing Pipeline

VideoChunker con Audio Extraction:

class VideoChunker(Chunker):
SUPPORTED_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv"}

async def process_video(self, document_id: str, chunking_config: dict):
"""
Video processing pipeline:
1. Audio extraction da video (FFmpeg)
2. Conversione a MP3 ottimizzato
3. Transcription con WhisperX + diarization
4. Chunking basato su audio timeline
"""
try:
# Extract audio track from video
audio_file = await self.extract_audio(
video_file=video_file,
output_format='mp3',
sample_rate=16000 # Optimal per WhisperX
)

# Process using AudioChunker
audio_chunker = AudioChunker(document=document, config=config)
chunks = await audio_chunker.process_audio(document_id, chunking_config)

# Add video-specific metadata
for chunk in chunks:
chunk.metadata.update({
'source_type': 'video',
'video_file': document.file.name,
'audio_extracted': True,
'video_duration': await self.get_video_duration(video_file)
})

return chunks

except Exception as e:
logger.error(f"Video processing failed for {document_id}: {e}")
raise

🔧 Configuration e Performance

Chunking Presets basati su Use Case:

# Presets configurazioni chunking
CHUNKING_PRESETS = {
"default": {
"chunk_size": 512, # Tokens - optimal per BGE-M3
"overlap": 75, # Token overlap
"strategy": "recursive",
"model_name": "bge-m3"
},
"meeting": {
"chunk_size": 400, # Ottimale per conversazioni
"overlap": 100,
"strategy": "diarization", # Un chunk per speaker
"audio_language": "it"
},
"executive": {
"chunk_size": 300, # Informazioni dense
"overlap": 100,
"strategy": "semantic",
"embedding_threshold": 0.7
},
"technical": {
"chunk_size": 600, # Contenuti tecnici complessi
"overlap": 120,
"strategy": "recursive",
"preserve_structure": True
},
"qa": {
"chunk_size": 200, # Precisione massima per Q&A
"overlap": 25,
"strategy": "sentence",
"split_sentences": True
}
}

Retrieval Service (spp-retrieval)

🔍 Architettura Search Engine

Il spp-retrieval service fornisce capacità di ricerca semantica ottimizzata attraverso Milvus vector database.

# Struttura servizio retrieval
from fastapi import FastAPI
from app.routers import retrieval

app = FastAPI()

# CORS configuration per cross-origin requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # TODO: enhance in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

app.include_router(retrieval.router)

🎯 Semantic Search Implementation

Vector Similarity Search con Access Control:

class SemanticSearchEngine:
def __init__(self):
self.milvus_client = MilvusClient()
self.embedding_model = BGEEmbedder()
self.cache = RedisCache()

async def search(self, query: SearchQuery) -> SearchResults:
"""
Ricerca semantica multi-fase:
1. Query embedding generation (BGE-M3)
2. Cache check per performance
3. Milvus vector similarity search
4. Metadata enrichment da PostgreSQL
5. Access control filtering
6. Result ranking e caching
"""
# Generate query embedding
query_embedding = await self.embedding_model.encode(
text=query.text,
normalize=True # Per COSINE similarity
)

# Check cache first
cache_key = self.generate_cache_key(query)
cached_results = await self.cache.get(cache_key)
if cached_results and not query.force_refresh:
return self.apply_access_control(cached_results, query.user_id)

# Milvus vector search
search_params = {
"metric_type": "COSINE",
"params": {
"nprobe": 10, # Numero di cluster da esplorare
"ef": 32 # Search expansion factor
}
}

milvus_results = await self.milvus_client.search(
collection_name="document_chunks",
query_vectors=[query_embedding],
search_params=search_params,
limit=query.limit * 2, # Over-fetch per filtering
expr=self.build_filter_expression(query.filters)
)

# Enrich con metadata da PostgreSQL
enriched_results = []
for result in milvus_results[0]: # First query results
chunk_metadata = await self.get_chunk_metadata(result.id)
document_metadata = await self.get_document_metadata(chunk_metadata.document_id)

enriched_results.append({
'chunk_id': result.id,
'score': result.distance,
'content': chunk_metadata.content,
'document': document_metadata,
'metadata': chunk_metadata.metadata
})

# Apply access controls
filtered_results = self.apply_access_control(enriched_results, query.user_id)

# Rank by relevance + recency + authority
ranked_results = self.rank_results(filtered_results, query)

# Cache results
await self.cache.set(
key=cache_key,
value=ranked_results[:query.limit],
expire=3600 # 1 hour TTL
)

return SearchResults(
results=ranked_results[:query.limit],
total=len(filtered_results),
processing_time=time.time() - start_time,
cached=False
)

🔐 Access Control Integration

Permission-aware Search:

def apply_access_control(self, results: List[SearchResult], user_id: str) -> List[SearchResult]:
"""
Filtra risultati basato sui permessi utente:
- Document ownership
- Knowledge Base membership
- Shared document access
- Role-based permissions
"""
filtered_results = []

for result in results:
document = result['document']

# Owner access - sempre permesso
if document.created_by == user_id:
filtered_results.append(result)
continue

# Knowledge Base access
if document.knowledge_base_id:
kb_access = self.check_kb_access(
kb_id=document.knowledge_base_id,
user_id=user_id
)
if kb_access:
filtered_results.append(result)
continue

# Shared document access
shared_access = self.check_shared_access(
document_id=document.id,
user_id=user_id
)
if shared_access:
# Potrebbe essere read-only
result['access_level'] = shared_access.level
filtered_results.append(result)

return filtered_results

def build_filter_expression(self, filters: SearchFilters) -> str:
"""
Costruisce expression Milvus per pre-filtering:
- Document types
- Date ranges
- Knowledge base IDs
- Custom metadata filters
"""
conditions = []

if filters.document_types:
type_conditions = [f'document_type == "{t}"' for t in filters.document_types]
conditions.append(f"({' or '.join(type_conditions)})")

if filters.knowledge_base_ids:
kb_conditions = [f'knowledge_base_id == "{kb_id}"' for kb_id in filters.knowledge_base_ids]
conditions.append(f"({' or '.join(kb_conditions)})")

if filters.date_range:
start_ts = int(filters.date_range.start.timestamp())
end_ts = int(filters.date_range.end.timestamp())
conditions.append(f"created_at >= {start_ts} and created_at <= {end_ts}")

if filters.min_score:
# Note: questo viene applicato post-search
pass

return " and ".join(conditions) if conditions else ""

⚡ Performance Optimizations

Multi-level Caching Strategy:

class CacheStrategy:
def __init__(self):
self.l1_cache = {} # In-memory per worker
self.l2_cache = RedisClient() # Distributed cache

async def get_or_compute(self, key: str, compute_fn, ttl: int = 3600):
# L1 cache check
if key in self.l1_cache:
return self.l1_cache[key]

# L2 cache check
cached_value = await self.l2_cache.get(key)
if cached_value:
# Populate L1 cache
self.l1_cache[key] = cached_value
return cached_value

# Compute and cache
result = await compute_fn()

# Store in both levels
self.l1_cache[key] = result
await self.l2_cache.setex(key, ttl, result)

return result

MCP Servers (Model Context Protocol)

🔌 MCP Demo Server Implementation

Il mcp-demo service implementa il Model Context Protocol per integrazioni intelligenti con sistemi esterni.

#!/usr/bin/env python3
"""
MCP Todo List Server Demo

Dimostra le capacità MCP:
- Resources: Lista todos e todos individuali
- Tools: Add, update, delete todos
- Prompts: Templates per operazioni comuni
"""

from mcp.server.fastmcp import FastMCP
import json
import os

# Crea istanza MCP server
mcp = FastMCP("Todo List Server")

# Storage in-memory per todos
todos: Dict[str, dict] = {}
next_id: int = 1

# Persistent storage
TODO_FILE = os.environ.get("TODO_FILE", os.path.expanduser("~/.mcp-todos.json"))

def load_todos():
"""Load todos from persistent storage"""
global todos, next_id
if os.path.exists(TODO_FILE):
try:
with open(TODO_FILE, "r") as f:
data = json.load(f)
todos = data.get("todos", {})
next_id = data.get("next_id", 1)
except Exception as e:
print(f"Error loading todos: {e}")

def save_todos():
"""Save todos to persistent storage"""
try:
with open(TODO_FILE, "w") as f:
json.dump({"todos": todos, "next_id": next_id}, f, indent=2)
except Exception as e:
print(f"Error saving todos: {e}")

🛠️ MCP Resources Implementation

Resources per accesso dati:

@mcp.resource("todo://list")
async def list_todos() -> str:
"""Resource che fornisce lista completa todos"""
if not todos:
return json.dumps({"todos": [], "message": "No todos found"})

return json.dumps({
"todos": list(todos.values()),
"count": len(todos)
}, indent=2)

@mcp.resource("todo://todo/{todo_id}")
async def get_todo(todo_id: str) -> str:
"""Resource per todo specifico"""
if todo_id not in todos:
return json.dumps({"error": f"Todo {todo_id} not found"})

return json.dumps(todos[todo_id], indent=2)

⚙️ MCP Tools Implementation

Tools per azioni esterne:

@mcp.tool("add_todo")
async def add_todo(title: str, description: str = "", priority: str = "medium") -> str:
"""Tool per aggiungere nuovo todo"""
global next_id

todo_id = str(next_id)
next_id += 1

todo = {
"id": todo_id,
"title": title,
"description": description,
"priority": priority,
"completed": False,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat()
}

todos[todo_id] = todo
save_todos()

return f"Added todo '{title}' with ID {todo_id}"

@mcp.tool("update_todo")
async def update_todo(
todo_id: str,
title: Optional[str] = None,
description: Optional[str] = None,
priority: Optional[str] = None,
completed: Optional[bool] = None
) -> str:
"""Tool per aggiornare todo esistente"""
if todo_id not in todos:
return f"Todo {todo_id} not found"

todo = todos[todo_id]

if title is not None:
todo["title"] = title
if description is not None:
todo["description"] = description
if priority is not None:
todo["priority"] = priority
if completed is not None:
todo["completed"] = completed

todo["updated_at"] = datetime.now().isoformat()
save_todos()

return f"Updated todo {todo_id}"

@mcp.tool("delete_todo")
async def delete_todo(todo_id: str) -> str:
"""Tool per eliminare todo"""
if todo_id not in todos:
return f"Todo {todo_id} not found"

title = todos[todo_id]["title"]
del todos[todo_id]
save_todos()

return f"Deleted todo '{title}' (ID: {todo_id})"

💡 MCP Prompts Templates

Prompts per operazioni comuni:

@mcp.prompt("summarize_todos")
async def summarize_todos() -> str:
"""Prompt template per summary todos"""
if not todos:
return "No todos to summarize."

completed = sum(1 for todo in todos.values() if todo["completed"])
pending = len(todos) - completed

priorities = {}
for todo in todos.values():
if not todo["completed"]:
priorities[todo["priority"]] = priorities.get(todo["priority"], 0) + 1

summary = f"""Todo Summary:
- Total todos: {len(todos)}
- Completed: {completed}
- Pending: {pending}

Pending by priority:"""

for priority, count in priorities.items():
summary += f"\n - {priority}: {count}"

return summary

@mcp.prompt("daily_agenda")
async def daily_agenda() -> str:
"""Prompt template per agenda giornaliera"""
high_priority = [t for t in todos.values() if not t["completed"] and t["priority"] == "high"]
medium_priority = [t for t in todos.values() if not t["completed"] and t["priority"] == "medium"]

agenda = "Today's Agenda:\n\n"

if high_priority:
agenda += "🔥 High Priority:\n"
for todo in high_priority:
agenda += f" - {todo['title']}\n"
agenda += "\n"

if medium_priority:
agenda += "📋 Medium Priority:\n"
for todo in medium_priority[:5]: # Limit to 5
agenda += f" - {todo['title']}\n"
agenda += "\n"

return agenda

API Gateway (Traefik)

🌐 Reverse Proxy e Load Balancing

Traefik funge da API Gateway unificato per tutti i servizi Emblema, fornendo routing intelligente, SSL termination e load balancing.

Configurazione Dinamica con Labels:

# Configurazione Traefik reale per servizi
services:
traefik:
image: traefik:v3.0
command:
- "--api.dashboard=true"
- "--providers.docker=true"
- "--providers.docker.network=emblema"
- "--entrypoints.web.address=:80"
- "--entrypoints.websecure.address=:443"
- "--certificatesresolvers.letsencrypt.acme.tlschallenge=true"
- "--certificatesresolvers.letsencrypt.acme.email=${ACME_EMAIL}"
- "--certificatesresolvers.letsencrypt.acme.storage=/letsencrypt/acme.json"
ports:
- "80:80"
- "443:443"
- "8080:8080" # Dashboard
networks:
- emblema

# Service routing examples
www-emblema:
labels:
- "traefik.enable=true"
- "traefik.http.routers.emblema-web.rule=Host(`${EMBLEMA_WEB_HOSTNAME}`)"
- "traefik.http.routers.emblema-web.entrypoints=websecure"
- "traefik.http.routers.emblema-web.tls.certresolver=letsencrypt"
- "traefik.http.services.emblema-web.loadbalancer.server.port=3000"

background-task:
labels:
- "traefik.enable=true"
- "traefik.http.routers.background-task.rule=Host(`${BACKGROUND_TASK_HOSTNAME}`)"
- "traefik.http.routers.background-task.entrypoints=websecure"
- "traefik.http.services.background-task.loadbalancer.server.port=8000"

spp-retrieval:
labels:
- "traefik.enable=true"
- "traefik.http.routers.retrieval.rule=Host(`${RETRIEVAL_HOSTNAME}`)"
- "traefik.http.routers.retrieval.entrypoints=websecure"
- "traefik.http.services.retrieval.loadbalancer.server.port=8001"

🔒 Security e SSL Management

Automatic HTTPS con Let's Encrypt:

# SSL termination automatica
traefik:
command:
- "--certificatesresolvers.letsencrypt.acme.tlschallenge=true"
- "--certificatesresolvers.letsencrypt.acme.email=${ACME_EMAIL}"
- "--certificatesresolvers.letsencrypt.acme.storage=/letsencrypt/acme.json"
volumes:
- "letsencrypt:/letsencrypt"

# Custom certificates per air-gapped environments
traefik-custom:
volumes:
- "./config/traefik/certs:/certs:ro"
command:
- "--providers.file.directory=/certs"
- "--providers.file.watch=true"

⚖️ Load Balancing Strategies

Multi-instance Load Balancing:

# Round-robin load balancing
www-emblema-1:
labels:
- "traefik.http.services.www-emblema.loadbalancer.server.port=3000"
- "traefik.http.services.www-emblema.loadbalancer.server.weight=100"

www-emblema-2:
labels:
- "traefik.http.services.www-emblema.loadbalancer.server.port=3000"
- "traefik.http.services.www-emblema.loadbalancer.server.weight=100"

# Weighted routing per A/B testing
www-emblema-canary:
labels:
- "traefik.http.services.www-emblema-canary.loadbalancer.server.weight=10"
- "traefik.http.routers.www-canary.rule=Host(`emblema.local`) && Headers(`X-Canary`, `true`)"

📊 Monitoring e Health Checks

Service Health Monitoring:

# Health check endpoints
background-task:
labels:
- "traefik.http.services.background-task.loadbalancer.healthcheck.path=/health"
- "traefik.http.services.background-task.loadbalancer.healthcheck.interval=30s"
- "traefik.http.services.background-task.loadbalancer.healthcheck.timeout=10s"

# Metrics collection
traefik:
command:
- "--metrics.prometheus=true"
- "--metrics.prometheus.addEntryPointsLabels=true"
- "--metrics.prometheus.addServicesLabels=true"
labels:
- "traefik.http.routers.traefik-metrics.rule=Host(`traefik.emblema.local`) && Path(`/metrics`)"

Inter-Service Communication

🔄 Service Discovery

DNS-based Discovery tramite Docker Networks:

# Service client con automatic discovery
class ServiceClient:
def __init__(self, service_name: str, port: int = None):
self.base_url = f"http://{service_name}"
if port:
self.base_url += f":{port}"

async def health_check(self) -> bool:
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.base_url}/health", timeout=5.0)
return response.status_code == 200
except:
return False

# Usage examples
background_task_client = ServiceClient("background-task")
retrieval_client = ServiceClient("spp-retrieval", 8001)
mcp_client = ServiceClient("mcp-demo", 4001)

📡 Message Queue Integration

Async Communication via Redis/Celery:

# Event publishing tra servizi
class EventBus:
def __init__(self):
self.redis = redis.Redis.from_url(os.getenv("REDIS_URL"))

async def publish_event(self, event_type: str, data: dict):
event = {
"type": event_type,
"data": data,
"timestamp": datetime.now().isoformat(),
"source_service": os.getenv("SERVICE_NAME")
}

await self.redis.publish(f"events:{event_type}", json.dumps(event))

async def subscribe_events(self, event_types: List[str], handler):
pubsub = self.redis.pubsub()
for event_type in event_types:
await pubsub.subscribe(f"events:{event_type}")

async for message in pubsub.listen():
if message['type'] == 'message':
event = json.loads(message['data'])
await handler(event)

# Event handlers
async def handle_document_processed(event):
document_id = event['data']['document_id']
# Trigger notifications, update UI, etc.

event_bus = EventBus()
await event_bus.subscribe_events(['document.processed'], handle_document_processed)

Performance Monitoring

📈 Service Metrics

Prometheus Integration:

from prometheus_client import Counter, Histogram, Gauge

# Service-level metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total requests', ['service', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'Request duration', ['service'])
ACTIVE_TASKS = Gauge('celery_active_tasks', 'Active Celery tasks', ['queue'])

# Business metrics
DOCUMENTS_PROCESSED = Counter('documents_processed_total', 'Documents processed', ['status'])
EMBEDDINGS_GENERATED = Counter('embeddings_generated_total', 'Embeddings generated')
SEARCH_QUERIES = Counter('search_queries_total', 'Search queries', ['user_id'])

# Health check metrics
SERVICE_HEALTH = Gauge('service_health', 'Service health status', ['service'])

Prossimi Passi

I servizi backend costituiscono il foundation tecnologico di Emblema. Per approfondimenti:

  1. Layer Dati - PostgreSQL, Milvus, MinIO, Redis
  2. Autenticazione - Keycloak, JWT, permissions
  3. Architettura Docker - Containerizzazione
  4. Diagrammi di Flusso - Workflow operativi

I servizi backend di Emblema sono progettati per scalabilità, affidabilità e manutenibilità, fornendo una base solida per l'elaborazione intelligente di contenuti multimediali.

Questa pagina ti è stata utile?