Passa al contenuto principale

Low Level Design

1. Introduzione

1.1 Scopo del Documento

Questo documento descrive il design dettagliato a basso livello del sistema Emblema AI Platform, specificando l'architettura tecnica, i componenti software, le interfacce e le implementazioni necessarie per una piattaforma AI enterprise completa.

1.2 Obiettivi del Sistema

  • Fornire una piattaforma AI integrata per gestione documenti, chat intelligenti e knowledge base
  • Supportare processing asincrono di documenti multimediali con chunking avanzato
  • Implementare ricerca semantica ad alte prestazioni con Milvus vector database
  • Garantire scalabilità orizzontale tramite architettura microservizi containerizzata
  • Assicurare sicurezza enterprise con autenticazione Keycloak e controlli granulari

1.3 Riferimenti

2. Architettura del Sistema

2.1 Panoramica Architetturale

Emblema implementa un'architettura microservizi containerizzata basata su Next.js 14 per il frontend e servizi Python/Node.js specializzati per il backend. Il sistema utilizza PostgreSQL per dati relazionali, Milvus per ricerca vettoriale, Redis per caching e MinIO per storage di file.

2.2 Diagramma dell'Architettura

Loading diagram...

2.3 Tecnologie Utilizzate

  • Frontend: Next.js 14, React 18, TypeScript, Tailwind CSS, Shadcn UI
  • Backend: Python FastAPI, Node.js, Celery, Redis
  • Database: PostgreSQL 15, Milvus 2.4, Redis 7
  • Storage: MinIO S3-compatible object storage
  • Authentication: Keycloak OpenID Connect
  • AI/ML: LiteLLM, BGE-M3 embeddings, MinerU, WhisperX
  • Infrastructure: Docker Compose, Traefik, Grafana/Prometheus
  • Message Queue: Celery con Redis broker
  • Monitoring: Grafana, Prometheus, Loki
  • Notifiche: Novu per sistema di notifiche multi-canale

3. Componenti del Sistema

3.1 Componente www-emblema (Frontend Application)

3.1.1 Responsabilità

Applicazione Next.js 14 principale che fornisce interfaccia utente per gestione documenti, chat AI, knowledge base e amministrazione sistema. Implementa SSR/SSG per performance ottimali e SEO.

3.1.2 Interfacce

Input: HTTP requests, form submissions, file uploads Output: Server-side rendered pages, API responses, real-time notifications API Esposte:

  • /api/v1/chat - Chat AI endpoint
  • /api/v1/file/upload - File upload con chunking
  • /api/v1/[entityType] - Generic CRUD operations
  • /api/auth/[...nextauth] - Authentication flows

3.1.3 Struttura Classi

// Architettura modulare Next.js App Router
interface APIEndpoint<TRequest, TResponse> {
method: 'GET' | 'POST' | 'PUT' | 'DELETE';
path: string;
handler: (req: TRequest) => Promise<TResponse>;
middleware: Middleware[];
validation: ValidationSchema;
}

// Chat API implementation
export const POST = withErrorHandling(async (req: Request) => {
const {
messages,
selectedChatModel,
knowledgeBaseIds,
agentIds,
contextItemIds,
parentId,
id,
} = await req.json();

const session = await auth();
if (!session) {
throw new RouteHandlerError("Unauthorized", "UNAUTHORIZED", 401);
}

// Context optimization with model-specific limits
const modelLimits = getModelContextLimits(selectedChatModel);
const contextOptimizer = createContextOptimizer(modelLimits);

// Stream AI response with MCP integration
return streamText({
model: aiModelProvider(selectedChatModel),
messages: optimizedMessages,
system: systemPrompt,
tools: mcpTools,
maxTokens: modelLimits.maxOutputTokens
});
});

// Form system with ref-based pattern
interface FormRef<T> {
submit: () => Promise<void>;
reset: (values?: T) => void;
getValues: () => T;
isDirty: () => boolean;
isValid: () => boolean;
}

export const DocumentForm = forwardRef<FormRef<DocumentSchema>, DocumentFormProps>(
({ defaultValues, onSubmit, variant = 'page' }, ref) => {
const form = useForm({
resolver: zodResolver(createDocumentSchema(t)),
defaultValues,
});

useImperativeHandle(ref, () => ({
submit: async () => await form.handleSubmit(onSubmit)(),
reset: (values) => form.reset(values || defaultValues),
getValues: () => form.getValues(),
isDirty: () => form.formState.isDirty,
isValid: () => form.formState.isValid,
}));

return <Form {...form}>...</Form>;
}
);

3.1.4 Diagramma di Sequenza

Loading diagram...

3.2 Componente background-task (Processing Service)

3.2.1 Responsabilità

Servizio Python FastAPI che gestisce elaborazione asincrona di documenti, audio e video tramite Celery workers. Implementa chunking intelligente, generazione embeddings e notificazioni.

3.2.2 Interfacce

Input: File uploads, processing configurations, task parameters Output: Processed chunks, embeddings, notifications, progress updates API Esposte:

  • /tasks/ - Task management endpoints
  • /artifacts/ - AI artifact generation
  • /cache/ - Cache management utilities

3.2.3 Struttura Classi

# Pipeline pattern per elaborazione documenti
class DocumentProcessingPipeline:
def __init__(self):
self.stages = [
FileValidationStage(),
FormatOptimizationStage(),
ContentExtractionStage(),
ChunkingStage(),
EmbeddingGenerationStage(),
StorageStage(),
NotificationStage()
]

async def process(self, document_id: str) -> ProcessingResult:
context = ProcessingContext(document_id)

for stage in self.stages:
try:
context = await stage.execute(context)
await self.update_progress(document_id, stage.name)
except StageError as e:
await self.handle_stage_error(document_id, stage.name, e)
raise

return context.result

# Factory pattern per processors specializzati
class ProcessorFactory:
processors = {
'pdf': DocumentProcessor, # MinerU-based PDF extraction
'mp3': AudioProcessor, # WhisperX transcription + diarization
'mp4': VideoProcessor # Audio extraction + transcription
}

@classmethod
def get_processor(cls, file_type: str) -> BaseProcessor:
processor_class = cls.processors.get(file_type)
if not processor_class:
raise UnsupportedFileTypeError(f"No processor for {file_type}")
return processor_class()

# Celery task configuration
@shared_task(
bind=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 3, 'countdown': 60},
time_limit=3600, # 1 hour
soft_time_limit=3000 # 50 minutes
)
def process_document_task(self, document_id: str, config: dict):
try:
pipeline = DocumentProcessingPipeline()
result = pipeline.process(document_id)
return result
except Exception as exc:
logger.error(f"Document processing failed: {exc}")
raise self.retry(exc=exc)

4. Design del Database

4.1 Schema Logico

Il database PostgreSQL è strutturato con un design normalizzato che supporta multi-tenancy tramite user-based ownership. Ogni entità ha campi standard per audit (created_by, created_at, updated_at) e tipo di entità per supportare GraphQL introspection.

4.2 Tabelle Principali

4.2.1 Tabella users

CREATE TABLE users (
id UUID PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
first_name VARCHAR(255),
last_name VARCHAR(255),
entity_type VARCHAR(50) NOT NULL DEFAULT 'User',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

Descrizione Campi:

  • id: Chiave primaria UUID (sincronizzata con Keycloak)
  • email: Email utente univoca
  • first_name, last_name: Dati anagrafici utente
  • entity_type: Tipo entità per GraphQL schema
  • created_at, updated_at: Timestamp audit

4.2.2 Tabella document

CREATE TABLE document (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
type VARCHAR(50) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
created_by UUID REFERENCES users(id) ON DELETE CASCADE,
knowledge_base_id UUID REFERENCES knowledge_base(id) ON DELETE SET NULL,
file_id UUID REFERENCES file(id) ON DELETE SET NULL,
chunk_count INTEGER DEFAULT 0,
chunker_name VARCHAR(100),
chunker_config JSONB DEFAULT '{}',
summary TEXT,
entity_type VARCHAR(50) NOT NULL DEFAULT 'Document',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

Descrizione Campi:

  • id: Chiave primaria UUID
  • name: Nome documento definito dall'utente
  • type: Tipo documento (pdf, audio, video, etc.)
  • status: Stato processing (pending, processing, completed, failed)
  • created_by: FK utente proprietario
  • knowledge_base_id: FK knowledge base associata (opzionale)
  • file_id: FK file storage associato
  • chunk_count: Numero chunks generati
  • chunker_name: Nome chunker utilizzato
  • chunker_config: Configurazione chunking (JSONB)
  • summary: Riassunto AI-generated del documento

4.2.3 Indici di Performance

-- Indici per query frequenti
CREATE INDEX idx_document_created_by ON document(created_by);
CREATE INDEX idx_document_kb ON document(knowledge_base_id);
CREATE INDEX idx_document_status ON document(status);
CREATE INDEX idx_document_type ON document(type);
CREATE INDEX idx_document_created_at ON document(created_at DESC);

-- JSONB GIN index per ricerche configurazione
CREATE INDEX idx_document_chunker_config_gin ON document USING GIN(chunker_config);

-- Indice composto per paginazione efficiente
CREATE INDEX idx_document_owner_status_created ON document(created_by, status, created_at DESC);

4.3 Schema Milvus (Vector Database)

# Collection schema per document chunks con BGE-M3 embeddings
from pymilvus import Collection, CollectionSchema, FieldSchema, DataType

chunk_fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=36, is_primary=True),
FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=36),
FieldSchema(name="chunk_id", dtype=DataType.VARCHAR, max_length=36),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024), # BGE-M3
FieldSchema(name="token_count", dtype=DataType.INT32),
FieldSchema(name="position", dtype=DataType.INT32),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="metadata", dtype=DataType.JSON),
FieldSchema(name="created_by", dtype=DataType.VARCHAR, max_length=36),
FieldSchema(name="created_at", dtype=DataType.INT64)
]

chunk_schema = CollectionSchema(
fields=chunk_fields,
description="Document chunks with BGE-M3 embeddings for semantic search"
)

# Ottimizzazione indici per performance
index_params = {
"metric_type": "COSINE", # Cosine similarity
"index_type": "IVF_FLAT", # Index type for balanced performance
"params": {"nlist": 1024} # Number of cluster centers
}

collection = Collection(
name="document_chunks",
schema=chunk_schema,
using='default'
)

collection.create_index(
field_name="embedding",
index_params=index_params
)

Ricerca Semantica con Milvus

Il retrieval di documenti basato su similarity search è gestito direttamente da Milvus, senza servizi intermediari. La ricerca semantica viene implementata nei componenti che necessitano di questa funzionalità:

# Esempio di ricerca diretta in Milvus (utilizzato in background-task e API endpoints)
from pymilvus import MilvusClient

class MilvusSearchService:
def __init__(self):
self.client = MilvusClient(
uri="http://milvus:19530",
user=os.getenv("MILVUS_USER", "root"),
password=os.getenv("MILVUS_PASSWORD", "Milvus")
)

async def semantic_search(
self,
query_embedding: List[float],
collection_name: str = "document_chunks",
limit: int = 10,
filters: Optional[str] = None
) -> List[Dict]:
"""
Esegue ricerca semantica direttamente su Milvus

Args:
query_embedding: Embedding del query (1024 dimensioni BGE-M3)
collection_name: Nome della collection Milvus
limit: Numero massimo di risultati
filters: Espressione di filtro Milvus (es: "document_id in ['doc1', 'doc2']")

Returns:
Lista di chunks con score di similarità
"""
search_params = {
"metric_type": "COSINE",
"params": {"nprobe": 10}
}

results = self.client.search(
collection_name=collection_name,
data=[query_embedding],
anns_field="embedding",
search_params=search_params,
limit=limit,
expr=filters,
output_fields=["id", "content", "metadata", "document_id", "chunk_index"]
)

# Formatta risultati con score
formatted_results = []
for hits in results:
for hit in hits:
formatted_results.append({
"id": hit.get("id"),
"content": hit.get("content"),
"metadata": hit.get("metadata"),
"document_id": hit.get("document_id"),
"chunk_index": hit.get("chunk_index"),
"score": hit.distance # Similarità coseno
})

return formatted_results

Integrazione nei Componenti:

  1. API Endpoints (/api/v1/search): Utilizzano MilvusSearchService per query dirette
  2. Background Tasks: Inseriscono embeddings durante il processing
  3. Chat API: Recupera context rilevante per RAG (Retrieval Augmented Generation)
  4. Knowledge Base Search: Filtra risultati per KB specifiche

4.3 Hasura GraphQL Engine

Hasura è il layer GraphQL che fornisce API automatiche real-time sopra il database PostgreSQL, con capacità avanzate di autorizzazione e federazione.

Caratteristiche Principali

  • API GraphQL Automatiche: Generazione istantanea di query, mutations e subscriptions dal database schema
  • Supporto Multi-Database: PostgreSQL, MySQL, SQL Server, BigQuery, e altri
  • Remote Schema Stitching: Integrazione di GraphQL endpoints esterni in un unico schema unificato
  • Permessi Granulari: Controllo accesso a livello di row e column basato su ruoli e condizioni
  • Real-time Subscriptions: Aggiornamenti live tramite WebSocket per modifiche database
  • Event Triggers: Webhook automatici su operazioni database
  • Actions & Remote Joins: Estensione schema con business logic custom

Configurazione Metadata

# hasura/metadata/databases/default/tables/public_document.yaml
table:
name: document
schema: public

object_relationships:
- name: creator
using:
foreign_key_constraint_on: created_by
- name: knowledge_base
using:
foreign_key_constraint_on: knowledge_base_id

array_relationships:
- name: chunks
using:
foreign_key_constraint_on:
column: document_id
table:
name: chunk
schema: public

insert_permissions:
- role: user
permission:
check:
created_by:
_eq: X-Hasura-User-Id
columns:
- name
- type
- knowledge_base_id
- chunker_config

select_permissions:
- role: user
permission:
columns: "*"
filter:
_or:
- created_by:
_eq: X-Hasura-User-Id
- knowledge_base:
permissions:
user_id:
_eq: X-Hasura-User-Id
action:
_eq: read

update_permissions:
- role: user
permission:
columns:
- name
- summary
- chunker_config
filter:
created_by:
_eq: X-Hasura-User-Id
check: null

Remote Schema Stitching

// Integrazione con servizio di search esterno
const searchSchema = makeRemoteExecutableSchema({
schema: await introspectSchema(searchLink),
link: searchLink,
});

// Hasura schema extension
extend type Query {
searchDocuments(
query: String!
knowledgeBaseIds: [uuid!]
limit: Int = 10
): DocumentSearchResult @remote
}

type DocumentSearchResult {
results: [DocumentSearchHit!]!
total: Int!
processingTime: Float!
}

Event-Driven Architecture

# Event trigger per processing automatico
- table:
name: document
schema: public
event_triggers:
- name: process_new_document
definition:
enable_manual: true
insert:
columns: "*"
retry_conf:
num_retries: 3
interval_sec: 10
timeout_sec: 60
webhook: http://background-task:8000/webhooks/process-document
headers:
- name: X-Webhook-Secret
value_from_env: WEBHOOK_SECRET

5. API Design

5.1 Architettura API Gerarchica

Il sistema API di Emblema utilizza un'architettura gerarchica che riflette le relazioni tra entità, ottimizzando i controlli di sicurezza e mantenendo flessibilità per operazioni specifiche.

Principi Fondamentali

  1. Gerarchia delle Risorse: Le entità sono organizzate in relazioni parent-child
  2. Controllo Permessi su Root: Il check di autorizzazione avviene solo sull'elemento root per ottimizzare le performance
  3. CRUD Standardizzato: Operazioni create, read, update, delete uniformi per tutte le entità
  4. Azioni Personalizzate: Endpoint dedicati per operazioni specifiche su entità
  5. API Specializzate: Endpoint non vincolati dalla gerarchia per funzionalità complesse

Schema URL Gerarchico

/api/v1/{Entity1}/{id1}/{Entity2}/{id2}/{Entity3}/{id3}

Esempio concreto:

/api/v1/Meeting/123/Document/456/Chunk/789

Questa struttura indica:

  • Meeting con ID 123 (root entity - qui avviene il check permessi)
  • Document con ID 456 (child di Meeting)
  • Chunk con ID 789 (child di Document)

5.2 Endpoint REST Gerarchici

5.2.1 CRUD Operations

// Generic CRUD handler per entità gerarchiche
interface HierarchicalRequest {
entityPath: EntityPathSegment[];
operation: "GET" | "POST" | "PUT" | "DELETE";
body?: any;
}

interface EntityPathSegment {
entityType: string;
entityId?: string;
}

// Esempio: GET /api/v1/Meeting/123/Document
const handler = async (req: HierarchicalRequest) => {
// 1. Parse entity path
const rootEntity = req.entityPath[0]; // Meeting
const childEntity = req.entityPath[1]; // Document

// 2. Check permissions ONLY on root entity
const hasAccess = await checkPermission(
req.userId,
rootEntity.entityType,
"read",
{ entityId: rootEntity.entityId },
);

if (!hasAccess) {
throw new ForbiddenError("Access denied to Meeting");
}

// 3. Execute operation on child entity
switch (req.operation) {
case "GET":
return await getDocumentsForMeeting(rootEntity.entityId);
case "POST":
return await createDocumentInMeeting(rootEntity.entityId, req.body);
// ... altri metodi
}
};

5.2.2 Esempi Endpoint Gerarchici

GET /api/v1/KnowledgeBase/kb-123/Document

  • Lista tutti i documenti nella knowledge base kb-123
  • Check permessi su KnowledgeBase

POST /api/v1/Meeting/meet-456/Document

  • Crea nuovo documento nel meeting meet-456
  • Check permessi su Meeting

GET /api/v1/Document/doc-789/Chunk?page=1&size=20

  • Lista chunks del documento doc-789 con paginazione
  • Check permessi su Document

PUT /api/v1/Task/task-111/Artifact/art-222

  • Aggiorna artifact art-222 del task task-111
  • Check permessi su Task

5.2.3 Azioni Personalizzate

Per operazioni che vanno oltre il CRUD standard, utilizziamo endpoint action:

/api/v1/{Entity1}/{id1}/{Entity2}/{id2}/action/{actionName}

Esempi:

// POST /api/v1/Document/doc-123/Chunk/chunk-456/action/regenerate
// Rigenera embeddings per un chunk specifico
export const regenerateChunkEmbeddings = async (
documentId: string,
chunkId: string,
config?: RegenerateConfig,
) => {
// Check permission on document (root)
await checkDocumentAccess(documentId, "update");

// Execute specialized action
const chunk = await getChunk(chunkId);
const newEmbeddings = await generateEmbeddings(chunk.content, config);

await updateChunkEmbeddings(chunkId, newEmbeddings);

return { success: true, chunkId, embeddingDimensions: newEmbeddings.length };
};

// POST /api/v1/KnowledgeBase/kb-123/action/reindex
// Reindicizza tutti i documenti di una knowledge base
export const reindexKnowledgeBase = async (
knowledgeBaseId: string,
options?: ReindexOptions,
) => {
await checkKnowledgeBaseAccess(knowledgeBaseId, "admin");

const taskId = await scheduleReindexTask(knowledgeBaseId, options);

return { taskId, status: "scheduled" };
};

// POST /api/v1/Meeting/meet-123/Document/doc-456/action/export
// Esporta documento in formato specifico
export const exportDocument = async (
meetingId: string,
documentId: string,
format: "pdf" | "docx" | "markdown",
) => {
await checkMeetingAccess(meetingId, "read");

const exportUrl = await generateExport(documentId, format);

return { exportUrl, expiresAt: new Date(Date.now() + 3600000) };
};

5.3 API Specializzate Non Gerarchiche

Alcune funzionalità richiedono endpoint dedicati che non seguono la struttura gerarchica:

5.3.1 Chat API

POST /api/v1/chat

Endpoint complesso per conversazioni AI che integra multiple risorse:

interface ChatRequest {
id: string;
messages: Message[];
selectedChatModel: string;
knowledgeBaseIds?: string[]; // Multiple KB access
agentIds?: string[]; // Multiple agents
contextItemIds?: string[]; // Documents, chunks, etc.
parentId?: string; // Thread management
}

// Implementation con permission checks multipli
export const chatHandler = async (req: ChatRequest) => {
// 1. Check access to all referenced resources
const accessChecks = await Promise.all([
...req.knowledgeBaseIds.map((id) => checkKBAccess(id, "read")),
...req.agentIds.map((id) => checkAgentAccess(id, "use")),
...req.contextItemIds.map((id) => checkContextAccess(id, "read")),
]);

if (!accessChecks.every((check) => check === true)) {
throw new ForbiddenError("Access denied to some resources");
}

// 2. Build context from all sources
const context = await buildChatContext(req);

// 3. Stream AI response
return streamAIResponse(context, req.selectedChatModel);
};

5.3.2 File Upload API

POST /api/v1/file/upload/init PUT /api/v1/file/upload/chunk POST /api/v1/file/upload/complete

Sistema di upload multi-part per file di grandi dimensioni:

// Inizializza upload session
export const initUpload = async (req: UploadInitRequest) => {
const session = await createUploadSession({
fileName: req.fileName,
fileSize: req.fileSize,
chunks: Math.ceil(req.fileSize / req.chunkSize),
metadata: req.metadata,
});

return {
uploadId: session.id,
chunkUrls: generateSignedUrls(session),
expiresAt: session.expiresAt,
};
};

// Upload singolo chunk
export const uploadChunk = async (
uploadId: string,
chunkIndex: number,
data: Buffer,
) => {
const session = await getUploadSession(uploadId);

if (session.userId !== req.userId) {
throw new ForbiddenError("Not your upload session");
}

await storeChunk(uploadId, chunkIndex, data);
await updateProgress(uploadId, chunkIndex);

return {
chunkIndex,
received: data.length,
progress: calculateProgress(session),
};
};

5.3.3 Search API

POST /api/v1/search

Ricerca semantica cross-entity con filtri complessi:

interface SearchRequest {
query: string;
filters: {
entityTypes?: string[];
knowledgeBaseIds?: string[];
dateRange?: { from: Date; to: Date };
tags?: string[];
};
options: {
limit?: number;
offset?: number;
includeHighlights?: boolean;
minScore?: number;
};
}

export const searchHandler = async (req: SearchRequest) => {
// Build permission-aware search scope
const accessibleResources = await getUserAccessibleResources(req.userId);

const searchScope = intersectFilters(req.filters, accessibleResources);

// Execute vector search with Milvus
const results = await vectorSearch({
query: req.query,
scope: searchScope,
options: req.options,
});

// Enrich with metadata and highlights
return enrichSearchResults(results, req.options);
};

5.4 Controlli di Accesso Ottimizzati

// Cache dei permessi per ottimizzare performance
class PermissionCache {
private cache = new LRUCache<string, boolean>({
max: 10000,
ttl: 300000, // 5 minuti
});

async checkAccess(
userId: string,
entityType: string,
entityId: string,
action: string,
): Promise<boolean> {
const cacheKey = `${userId}:${entityType}:${entityId}:${action}`;

// Check cache first
const cached = this.cache.get(cacheKey);
if (cached !== undefined) return cached;

// Load from database
const hasAccess = await this.loadPermission(
userId,
entityType,
entityId,
action,
);

this.cache.set(cacheKey, hasAccess);
return hasAccess;
}

private async loadPermission(
userId: string,
entityType: string,
entityId: string,
action: string,
): Promise<boolean> {
// 1. Check ownership
const entity = await getEntity(entityType, entityId);
if (entity.createdBy === userId) return true;

// 2. Check group permissions
const userGroups = await getUserGroups(userId);
const groupPerms = await getGroupPermissions(userGroups, entityType);

// 3. Check specific permissions
const specificPerms = await getEntityPermissions(
userId,
entityType,
entityId,
);

return evaluatePermissions(action, groupPerms, specificPerms);
}
}

5.5 GraphQL Schema (Hasura)

// Error handling middleware per API routes
interface APIError {
code: string;
message: string;
details?: any;
statusCode: number;
}

class RouteHandlerError extends Error {
constructor(
public message: string,
public code: string,
public statusCode: number = 500,
public details?: any,
) {
super(message);
this.name = "RouteHandlerError";
}
}

export const withErrorHandling = (handler: Function) => {
return async (req: Request, context?: any) => {
try {
return await handler(req, context);
} catch (error) {
if (error instanceof RouteHandlerError) {
return NextResponse.json(
{
error: {
code: error.code,
message: error.message,
details: error.details,
},
},
{ status: error.statusCode },
);
}

// Log unexpected errors
console.error("Unexpected API error:", error);

return NextResponse.json(
{
error: {
code: "INTERNAL_SERVER_ERROR",
message: "An unexpected error occurred",
},
},
{ status: 500 },
);
}
};
};

6. Sicurezza

6.1 Autenticazione

  • Tipo: OpenID Connect tramite Keycloak
  • JWT Access Token: Validità 15 minuti
  • Refresh Token: Validità 7 giorni
  • Session Management: NextAuth.js con cookie sicuri
// NextAuth configuration per Keycloak
export const authConfig: NextAuthConfig = {
providers: [
Keycloak({
clientId: process.env.AUTH_KEYCLOAK_ID!,
clientSecret: process.env.AUTH_KEYCLOAK_SECRET!,
issuer: process.env.AUTH_KEYCLOAK_ISSUER!,
authorization: {
params: {
scope: "openid email profile",
response_type: "code",
grant_type: "authorization_code",
},
},
}),
],
callbacks: {
jwt({ token, account, profile }) {
if (account) {
token.accessToken = account.access_token;
token.refreshToken = account.refresh_token;
token.expiresAt = account.expires_at;
}
return token;
},
session({ session, token }) {
session.accessToken = token.accessToken;
session.user.id = token.sub;
return session;
},
},
session: {
strategy: "jwt",
maxAge: 7 * 24 * 60 * 60, // 7 days
},
};

6.2 Autorizzazione

  • Modello: RBAC (Role-Based Access Control) + ABAC (Attribute-Based)
  • Ruoli: Admin, Manager, User, Guest
  • Permissions: Resource-level con conditions granulari
// Permission system con conditions
interface Permission {
resource: string; // 'document', 'knowledge_base', etc.
action: string; // 'create', 'read', 'update', 'delete'
conditions?: AccessCondition[];
}

interface AccessCondition {
field: string; // 'created_by', 'status', 'knowledge_base_id'
operator: "eq" | "ne" | "in" | "contains" | "gt" | "lt";
value: any;
}

// Example: User can only read their own completed documents
const userDocumentReadPermission: Permission = {
resource: "document",
action: "read",
conditions: [
{ field: "created_by", operator: "eq", value: "{{user.id}}" },
{ field: "status", operator: "eq", value: "completed" },
],
};

class PermissionService {
async checkPermission(
userId: string,
resource: string,
action: string,
context?: Record<string, any>,
): Promise<boolean> {
const userPermissions = await this.getUserPermissions(userId);

for (const permission of userPermissions) {
if (
this.matchesResource(permission.resource, resource) &&
this.matchesAction(permission.action, action)
) {
if (!permission.conditions) return true;

const conditionsMet = await this.evaluateConditions(
permission.conditions,
{ ...context, user: { id: userId } },
);

if (conditionsMet) return true;
}
}

return false;
}
}

6.3 Crittografia

  • Dati sensibili: AES-256-GCM
  • Password: bcrypt (12 rounds)
  • Comunicazione: TLS 1.3 (certificati personalizzati o Let's Encrypt)
  • Key rotation: Supporto per multiple encryption keys
# Encryption service con key rotation
from cryptography.fernet import Fernet, MultiFernet
import os
from typing import List

class DataEncryptionService:
def __init__(self):
# Current key for new encryptions
self.current_key = os.getenv('ENCRYPTION_KEY_CURRENT')

# Previous keys for decryption (key rotation)
previous_keys_str = os.getenv('ENCRYPTION_KEYS_PREVIOUS', '')
self.previous_keys = [k.strip() for k in previous_keys_str.split(',') if k.strip()]

# MultiFernet per automatic key rotation
all_keys = [self.current_key] + self.previous_keys
self.cipher = MultiFernet([Fernet(key.encode()) for key in all_keys])

def encrypt(self, data: str) -> str:
"""Encrypt with current key"""
return self.cipher.encrypt(data.encode()).decode()

def decrypt(self, encrypted_data: str) -> str:
"""Decrypt with any available key (automatic rotation)"""
try:
return self.cipher.decrypt(encrypted_data.encode()).decode()
except Exception as e:
raise DecryptionError(f"Unable to decrypt data: {e}")

def rotate_encryption(self, encrypted_data: str) -> str:
"""Re-encrypt data with current key"""
decrypted = self.decrypt(encrypted_data)
return self.encrypt(decrypted)

6.4 Input Validation

  • Schema-based validation: Zod schemas con i18n
  • SQL Injection: Prepared statements e ORM escaping
  • XSS Protection: Content Security Policy e sanitization
  • File Upload: Tipo MIME validation, virus scanning, size limits
// Schema validation con internazionalizzazione
export const createDocumentSchema = (t: TFunction) => {
const v = createSchemaFactory("document")(t);

return z.object({
name: z
.string({
required_error: v.string("name").required(),
invalid_type_error: v.string("name").required(),
})
.min(1, v.string("name").required())
.max(255, v.string("name").maxLength(255)),

knowledgeBaseId: z
.string()
.uuid(v.string("knowledgeBaseId").uuid())
.optional(),

tags: z
.array(z.string().min(1).max(50))
.max(10, v.string("tags").maxItems(10))
.optional(),

chunkingConfig: z
.object({
strategy: z.enum([
"recursive",
"semantic",
"agentic",
"sentence",
"fixed",
]),
chunkSize: z.number().min(100).max(2048).default(512),
chunkOverlap: z.number().min(0).max(500).default(75),
})
.optional(),
});
};

7. Performance e Scalabilità

7.1 Requisiti Performance

  • Response Time: < 200ms per 95% delle richieste API
  • Throughput: 1000 richieste/secondo per endpoint principali
  • Search Latency: < 100ms per ricerche semantiche (Milvus)
  • File Processing: < 2 minuti per documenti PDF standard
  • Availability: 99.9% uptime con graceful degradation

7.2 Strategie di Caching

// Multi-layer caching system
class CacheService {
private l1Cache = new LRUCache<string, any>({ max: 1000, ttl: 300000 }); // 5 minuti L1
private l2Cache: RedisClient; // L2 distribuito

constructor() {
this.l2Cache = new Redis({
host: "redis",
port: 6379,
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3,
});
}

async get<T>(key: string): Promise<T | null> {
// L1 Cache (in-memory)
const l1Value = this.l1Cache.get(key);
if (l1Value !== undefined) {
this.recordCacheHit("L1", key);
return l1Value;
}

// L2 Cache (Redis)
try {
const l2Value = await this.l2Cache.get(key);
if (l2Value) {
const parsed = JSON.parse(l2Value);
this.l1Cache.set(key, parsed); // Warm L1
this.recordCacheHit("L2", key);
return parsed;
}
} catch (error) {
console.warn("L2 cache error:", error);
}

this.recordCacheMiss(key);
return null;
}

async set<T>(key: string, value: T, ttl: number = 3600): Promise<void> {
// Store in both levels
this.l1Cache.set(key, value, { ttl: Math.min(ttl, 300) * 1000 });

try {
await this.l2Cache.setex(key, ttl, JSON.stringify(value));
} catch (error) {
console.warn("L2 cache write error:", error);
}
}

// Cache invalidation patterns
async invalidatePattern(pattern: string): Promise<void> {
// L1 wildcard invalidation
const l1Keys = Array.from(this.l1Cache.keys());
const matchingL1Keys = l1Keys.filter((key) => key.includes(pattern));
matchingL1Keys.forEach((key) => this.l1Cache.delete(key));

// L2 pattern-based invalidation
const stream = this.l2Cache.scanStream({ match: pattern });
const pipeline = this.l2Cache.pipeline();

stream.on("data", (keys: string[]) => {
keys.forEach((key) => pipeline.del(key));
});

stream.on("end", () => {
pipeline.exec();
});
}
}

7.3 Ottimizzazione Database

-- Connection pooling configuration
-- postgresql.conf optimizations
max_connections = 200
shared_buffers = 256MB
effective_cache_size = 1GB
work_mem = 4MB
maintenance_work_mem = 64MB
wal_buffers = 16MB
checkpoint_completion_target = 0.9
random_page_cost = 1.1 -- For SSD storage

-- Query optimization example
EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
SELECT
d.id,
d.name,
d.status,
d.chunk_count,
kb.name as knowledge_base_name,
f.size as file_size
FROM document d
LEFT JOIN knowledge_base kb ON d.knowledge_base_id = kb.id
LEFT JOIN file f ON d.file_id = f.id
WHERE d.created_by = $1
AND d.status = ANY($2)
AND d.created_at >= $3
ORDER BY d.created_at DESC
LIMIT $4 OFFSET $5;

-- Materialized view per analytics
CREATE MATERIALIZED VIEW document_analytics AS
SELECT
DATE_TRUNC('day', created_at) as date,
type,
status,
COUNT(*) as document_count,
AVG(chunk_count) as avg_chunks,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed_count
FROM document
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY DATE_TRUNC('day', created_at), type, status;

-- Automatic refresh job
CREATE OR REPLACE FUNCTION refresh_analytics()
RETURNS void AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY document_analytics;
ANALYZE document_analytics;
END;
$$ LANGUAGE plpgsql;

-- Scheduled refresh (via pg_cron extension)
SELECT cron.schedule('refresh-analytics', '*/15 * * * *', 'SELECT refresh_analytics();');

8. Monitoraggio e Logging

8.1 Logging Strutturato

  • Formato: JSON structured logging
  • Livelli: ERROR, WARN, INFO, DEBUG
  • Retention: 30 giorni con rotazione automatica
  • Aggregazione: Loki per log centralization
// Structured logging service
interface LogContext {
userId?: string;
requestId: string;
action: string;
resource?: string;
duration?: number;
metadata?: Record<string, any>;
}

class Logger {
private winston: winston.Logger;

constructor() {
this.winston = winston.createLogger({
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json(),
),
transports: [
new winston.transports.Console(),
new winston.transports.File({
filename: "app.log",
maxsize: 100 * 1024 * 1024, // 100MB
maxFiles: 5,
}),
],
});
}

info(message: string, context: LogContext) {
this.winston.info(message, {
...context,
timestamp: new Date().toISOString(),
level: "info",
});
}

error(message: string, error: Error, context: LogContext) {
this.winston.error(message, {
...context,
error: {
name: error.name,
message: error.message,
stack: error.stack,
},
timestamp: new Date().toISOString(),
level: "error",
});
}

// Performance logging
performance(action: string, duration: number, context: Partial<LogContext>) {
this.info(`Performance: ${action}`, {
...context,
action: `perf:${action}`,
duration,
requestId: context.requestId || "unknown",
});
}
}

8.2 Metriche (Prometheus)

// Prometheus metrics configuration
import prometheus from "prom-client";

// Application metrics
const httpRequestDuration = new prometheus.Histogram({
name: "http_request_duration_seconds",
help: "HTTP request duration in seconds",
labelNames: ["method", "route", "status_code", "user_id"],
buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10],
});

const documentProcessingDuration = new prometheus.Histogram({
name: "document_processing_duration_seconds",
help: "Document processing duration in seconds",
labelNames: ["document_type", "chunker_name", "status"],
buckets: [1, 5, 10, 30, 60, 300, 600, 1800, 3600],
});

const searchLatency = new prometheus.Histogram({
name: "search_latency_seconds",
help: "Search query latency in seconds",
labelNames: ["search_type", "collection", "cache_hit"],
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
});

const activeUsers = new prometheus.Gauge({
name: "active_users_total",
help: "Number of active users in the last 5 minutes",
});

// Database connection pool metrics
const dbConnections = new prometheus.Gauge({
name: "database_connections_active",
help: "Number of active database connections",
labelNames: ["database", "state"],
});

// Custom middleware per metriche HTTP
export const metricsMiddleware = (
req: Request,
res: Response,
next: Function,
) => {
const start = Date.now();

res.on("finish", () => {
const duration = (Date.now() - start) / 1000;

httpRequestDuration
.labels(
req.method,
req.route?.path || req.path,
res.statusCode.toString(),
req.user?.id || "anonymous",
)
.observe(duration);
});

next();
};

8.3 Alerting (Grafana)

# Grafana alert rules configuration
apiVersion: 1
groups:
- name: emblema-alerts
folder: Emblema
interval: 1m
rules:
- uid: high-error-rate
title: High Error Rate
condition: C
data:
- refId: A
queryType: ""
model:
expr: 'rate(http_requests_total{status_code=~"5.."}[5m]) > 0.1'
interval: ""
refId: A
noDataState: NoData
execErrState: Alerting
for: 5m
annotations:
summary: "High error rate detected: {{ $value }} errors/sec"
description: "Error rate is above 10% for the last 5 minutes"
labels:
severity: critical
team: platform

- uid: document-processing-failures
title: Document Processing Failures
condition: C
data:
- refId: A
queryType: ""
model:
expr: "increase(document_processing_failures_total[10m]) > 5"
interval: ""
refId: A
for: 5m
annotations:
summary: "Multiple document processing failures"
description: "{{ $value }} documents failed processing in the last 10 minutes"
labels:
severity: warning
team: ai-services

- uid: search-latency-high
title: Search Latency High
condition: C
data:
- refId: A
queryType: ""
model:
expr: "histogram_quantile(0.95, rate(search_latency_seconds_bucket[5m])) > 0.5"
interval: ""
refId: A
for: 3m
annotations:
summary: "Search latency is high: {{ $value }}s"
description: "95th percentile search latency is above 500ms"
labels:
severity: warning
team: search

9. Deployment e Configurazione

9.1 Ambiente di Deployment

  • Containerizzazione: Docker multi-stage builds
  • Orchestrazione: Docker Compose per development, Kubernetes per production
  • CI/CD: GitHub Actions con security scanning
  • Reverse Proxy: Traefik con automatic SSL/TLS
# Multi-stage Dockerfile per www-emblema
FROM node:18-alpine as base
RUN apk add --no-cache libc6-compat
WORKDIR /app

# Install dependencies based on the preferred package manager
COPY package.json pnpm-lock.yaml* ./
RUN corepack enable pnpm && pnpm install --frozen-lockfile

# Build stage
FROM base as builder
WORKDIR /app
COPY . .
RUN pnpm build

# Production stage
FROM node:18-alpine as runner
WORKDIR /app

ENV NODE_ENV=production
ENV NEXT_TELEMETRY_DISABLED=1

RUN addgroup --system --gid 1001 nodejs
RUN adduser --system --uid 1001 nextjs

COPY --from=builder /app/public ./public
COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./
COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static

USER nextjs

EXPOSE 3000
ENV PORT=3000
ENV HOSTNAME="0.0.0.0"

CMD ["node", "server.js"]

9.2 Configurazione Docker Compose

# docker-compose.yaml (excerpt)
version: "3.8"

services:
www-emblema:
image: emblema/www:${EMBLEMA_VERSION:-dev}
restart: always
depends_on:
- milvus
- minio
- graphql-engine
- keycloak
- litellm
- background-task
environment:
- HASURA_API_URL=http://graphql-engine:8080/v1/graphql
- MILVUS_API_URL=http://milvus:19530/v2/vectordb
- LITELLM_API_URL=http://litellm:4000/v1
- BACKGROUND_TASK_API_URL=http://background-task
- AUTH_KEYCLOAK_ISSUER=${KEYCLOAK_REALM_URL}
- MAX_FILE_SIZE=${MAX_FILE_SIZE:-1073741824}
labels:
- "traefik.enable=true"
- "traefik.http.routers.emblema-web.rule=Host(`${EMBLEMA_WEB_HOSTNAME}`)"
- "traefik.http.routers.emblema-web.entrypoints=websecure"
- "traefik.http.routers.emblema-web.tls=true"
- "traefik.http.services.emblema-web.loadbalancer.server.port=3000"
networks:
- emblema

background-task:
image: emblema/background-task:${EMBLEMA_VERSION:-dev}
restart: always
depends_on:
- redis
- milvus
- minio
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- MILVUS_URI=http://milvus:19530
- MINIO_ENDPOINT=minio:9000
- BGE_MODEL_PATH=/app/models/bge-m3
volumes:
- ./models:/app/models:ro
- temp_processing:/tmp/processing
networks:
- emblema

background-task-worker:
image: emblema/background-task:${EMBLEMA_VERSION:-dev}
restart: always
command:
[
"celery",
"worker",
"-A",
"app.celery_app",
"--loglevel=info",
"--concurrency=4",
]
depends_on:
- redis
- background-task
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
volumes:
- temp_processing:/tmp/processing
networks:
- emblema
deploy:
replicas: 2

volumes:
temp_processing:
driver: local

networks:
emblema:
external: true

9.3 Variabili di Ambiente

VariableDescrizioneValore Default
EMBLEMA_VERSIONVersione Docker imagesdev
BASE_DOMAINDomain base per serviziemblema.local
USE_SSLEnable SSL/TLS1
POSTGRES_USERPostgreSQL usernameemblema
POSTGRES_PASSWORDPostgreSQL passwordgenerata
REDIS_MASTER_PASSWORDRedis master passwordgenerata
KEYCLOAK_ADMINKeycloak admin useremblema
KEYCLOAK_ADMIN_PASSWORDKeycloak admin passwordgenerata
HASURA_ADMIN_SECRETHasura GraphQL admin secretgenerata
MILVUS_USERMilvus usernameroot
MILVUS_PASSWORDMilvus passwordMilvus
MINIO_ROOT_USERMinIO admin useremblema
MAX_FILE_SIZEMax file upload size (bytes)1073741824
DEFAULT_CHAT_MODELDefault LLM modelgpt-4o-mini
BGE_MODEL_PATHBGE-M3 model location/app/models/bge-m3

10. Testing

10.1 Stato Attuale

⚠️ IMPORTANTE: Attualmente il progetto Emblema non ha test implementati. Questa sezione descrive il piano di implementazione testing da sviluppare.

Situazione Corrente

  • Frontend Tests: 0% coverage - Nessun test implementato
  • Backend Tests: 0% coverage - Nessun test implementato
  • Integration Tests: Non presenti
  • E2E Tests: Non configurati
  • Performance Tests: Non esistenti

Debito Tecnico

L'assenza di test rappresenta un rischio significativo per:

  • Regressioni non rilevate durante sviluppo
  • Difficoltà nel refactoring sicuro
  • Mancanza di documentazione eseguibile
  • Impossibilità di CI/CD affidabile

10.2 Piano di Implementazione Testing

Fase 1: Testing Foundation (Settimane 1-2)

Obiettivo: Configurare infrastruttura di testing

# Frontend testing setup
pnpm add -D jest @testing-library/react @testing-library/jest-dom
pnpm add -D @testing-library/user-event msw whatwg-fetch
pnpm add -D @types/jest jest-environment-jsdom

# Backend testing setup
cd apps/background-task
uv add --dev pytest pytest-asyncio pytest-cov
uv add --dev factory-boy faker pytest-mock
uv add --dev testcontainers pytest-xdist

Configurazione Jest (Da Implementare):

// jest.config.js
module.exports = {
testEnvironment: "jsdom",
setupFilesAfterEnv: ["<rootDir>/jest.setup.js"],
testPathIgnorePatterns: ["/node_modules/", "/.next/"],
transform: {
"^.+\\.(js|jsx|ts|tsx)$": ["babel-jest", { presets: ["next/babel"] }],
},
moduleNameMapper: {
"^@/(.*)$": "<rootDir>/$1",
"\\.(css|less|sass|scss)$": "identity-obj-proxy",
},
collectCoverageFrom: [
"app/**/*.{js,jsx,ts,tsx}",
"components/**/*.{js,jsx,ts,tsx}",
"!**/*.d.ts",
"!**/node_modules/**",
],
};

Configurazione Pytest (Da Implementare):

# pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
asyncio_mode = auto
addopts =
--verbose
--cov=app
--cov-report=html
--cov-report=term-missing
--cov-fail-under=80

Fase 2: Core Unit Tests (Settimane 3-4)

Frontend - Componenti Critici da Testare:

// Da Implementare: Test per DocumentUpload component
describe("DocumentUpload", () => {
it("should validate file types before upload", async () => {
// Test implementation needed
});

it("should handle chunked upload for large files", async () => {
// Test implementation needed
});

it("should show progress during upload", async () => {
// Test implementation needed
});
});

// Da Implementare: Test per Chat API handler
describe("Chat API Handler", () => {
it("should require authentication", async () => {
// Test implementation needed
});

it("should validate model selection", async () => {
// Test implementation needed
});

it("should stream responses correctly", async () => {
// Test implementation needed
});
});

Backend - Servizi Critici da Testare:

# Da Implementare: Test per Document Processor
class TestDocumentProcessor:
async def test_pdf_processing(self):
"""Test MinerU PDF processing pipeline"""
# Implementation needed
pass

async def test_audio_transcription(self):
"""Test WhisperX audio processing"""
# Implementation needed
pass

async def test_chunking_strategies(self):
"""Test different chunking strategies"""
# Implementation needed
pass

# Da Implementare: Test per Embedding Service
class TestEmbeddingService:
async def test_bge_m3_generation(self):
"""Test BGE-M3 1024-dim embeddings"""
# Implementation needed
pass

async def test_batch_processing(self):
"""Test batch embedding generation"""
# Implementation needed
pass

Fase 3: Integration Testing (Settimane 5-6)

Database Integration (Da Implementare):

# Test con TestContainers
@pytest.fixture
async def test_db():
"""Spin up test PostgreSQL container"""
# Implementation needed
pass

@pytest.fixture
async def test_milvus():
"""Spin up test Milvus container"""
# Implementation needed
pass

class TestDatabaseIntegration:
async def test_document_lifecycle(self, test_db):
"""Test complete document CRUD operations"""
# Implementation needed
pass

async def test_vector_search(self, test_milvus):
"""Test Milvus vector search operations"""
# Implementation needed
pass

API Integration (Da Implementare):

// Test E2E flussi principali
describe("Document Processing Flow", () => {
it("should process document from upload to search", async () => {
// 1. Upload document
// 2. Wait for processing
// 3. Verify chunks created
// 4. Test search functionality
// Implementation needed
});
});

Fase 4: Performance & Load Testing (Settimane 7-8)

k6 Performance Tests (Da Implementare):

// k6-tests/document-upload.js
import http from "k6/http";
import { check } from "k6";

export const options = {
stages: [
{ duration: "2m", target: 100 },
{ duration: "5m", target: 100 },
{ duration: "2m", target: 0 },
],
thresholds: {
http_req_duration: ["p(95)<500"],
http_req_failed: ["rate<0.05"],
},
};

export default function () {
// Test document upload performance
// Implementation needed
}

10.3 Metriche Target

Una volta implementati, i test dovrebbero raggiungere:

  • Unit Test Coverage: ≥ 80% per business logic
  • Integration Test Coverage: Tutti i flussi critici
  • Performance Benchmarks:
    • API Response: < 200ms (p95)
    • Document Processing: < 2 min per 100 pagine
    • Search Latency: < 100ms (p95)
  • Reliability: 99.9% test success rate in CI

10.4 CI/CD Integration (Da Configurare)

# .github/workflows/test.yml (Da Creare)
name: Test Suite

on: [push, pull_request]

jobs:
frontend-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
- run: pnpm install
- run: pnpm test:frontend
- run: pnpm test:frontend:coverage

backend-tests:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
redis:
image: redis:7
milvus:
image: milvusdb/milvus:2.4-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- run: uv sync
- run: uv run pytest
- run: uv run pytest --cov

10.5 Priorità Implementazione

  1. Critical Path Testing (Prima priorità):

    • Authentication flow
    • Document upload/processing
    • Search functionality
    • Chat API
  2. Core Services (Seconda priorità):

    • Chunking algorithms
    • Embedding generation
    • Permission system
    • Background tasks
  3. UI Components (Terza priorità):

    • Forms validation
    • Error handling
    • Loading states
    • Data tables
  4. Performance (Quarta priorità):

    • Load testing
    • Stress testing
    • Benchmark suite
  • API: Supertest per API endpoint testing
  • Message Queue: Embedded Redis per Celery testing
  • File Processing: Test files con formati multipli
# Integration test con TestContainers
import pytest
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer
from testcontainers.minio import MinioContainer
from sqlalchemy import create_engine
from app.config import get_settings

@pytest.fixture(scope="session")
def postgres_container():
with PostgresContainer("postgres:15") as postgres:
yield postgres

@pytest.fixture(scope="session")
def redis_container():
with RedisContainer("redis:7-alpine") as redis:
yield redis

@pytest.fixture(scope="session")
def minio_container():
with MinioContainer() as minio:
yield minio

@pytest.fixture
def integration_db(postgres_container):
"""Database engine for integration tests"""
engine = create_engine(postgres_container.get_connection_url())

# Run migrations
from alembic.config import Config
from alembic import command

alembic_cfg = Config("alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", postgres_container.get_connection_url())
command.upgrade(alembic_cfg, "head")

yield engine
engine.dispose()

@pytest.mark.integration
def test_full_document_pipeline(integration_db, redis_container, minio_container):
"""Test complete document processing pipeline"""
# 1. Upload document to MinIO
# 2. Create document record in PostgreSQL
# 3. Trigger Celery task
# 4. Verify chunks in database
# 5. Check embeddings in Milvus (mocked)
pass

10.3 Performance Testing

  • Tool: k6 per load testing
  • Scenario: 1000 utenti concorrenti per 10 minuti
  • Criteri: Response time < 200ms per 95% requests
  • Monitoring: Real-time metrics durante test
// k6 performance test script
import http from "k6/http";
import { check, sleep } from "k6";
import { Rate, Trend } from "k6/metrics";

// Custom metrics
const errorRate = new Rate("errors");
const searchLatency = new Trend("search_latency");

export const options = {
stages: [
{ duration: "2m", target: 100 }, // Ramp up
{ duration: "5m", target: 1000 }, // Stay at 1000 users
{ duration: "2m", target: 0 }, // Ramp down
],
thresholds: {
http_req_duration: ["p(95)<200"], // 95% of requests under 200ms
http_req_failed: ["rate<0.02"], // Error rate under 2%
errors: ["rate<0.02"],
},
};

const BASE_URL = "https://emblema.local";
const AUTH_TOKEN = "your-jwt-token";

export function setup() {
// Login and get auth token
const loginResponse = http.post(`${BASE_URL}/api/auth/signin`, {
email: "test@example.com",
password: "password123",
});

return { token: loginResponse.json("token") };
}

export default function (data) {
const headers = {
Authorization: `Bearer ${data.token}`,
"Content-Type": "application/json",
};

// Test document search
const searchStart = Date.now();
const searchResponse = http.post(
`${BASE_URL}/api/v1/search`,
JSON.stringify({
query: "machine learning algorithms",
limit: 10,
knowledgeBaseIds: ["kb-123"],
}),
{ headers },
);

const searchDuration = Date.now() - searchStart;
searchLatency.add(searchDuration);

check(searchResponse, {
"search status is 200": (r) => r.status === 200,
"search has results": (r) => JSON.parse(r.body).results.length > 0,
"search under 500ms": () => searchDuration < 500,
}) || errorRate.add(1);

// Test chat API
const chatResponse = http.post(
`${BASE_URL}/api/v1/chat`,
JSON.stringify({
id: `chat-${__VU}-${__ITER}`,
messages: [{ role: "user", content: "Explain quantum computing" }],
selectedChatModel: "gpt-4o-mini",
}),
{ headers },
);

check(chatResponse, {
"chat status is 200": (r) => r.status === 200,
"chat response not empty": (r) => r.body.length > 0,
}) || errorRate.add(1);

sleep(1);
}

export function teardown(data) {
// Cleanup operations
console.log("Performance test completed");
}

10.3 Priorità di Implementazione

Fase 1: Foundation Testing (Settimane 1-2)

  1. Setup Testing Infrastructure:
    • Configurazione Jest per frontend
    • Setup pytest per backend
    • TestContainers per integration tests
    • CI/CD pipeline per automated testing

Fase 2: Core Unit Tests (Settimane 3-4)

  1. API Endpoints Critici:

    • Hierarchical API endpoints
    • Chat API functionality
    • File upload/processing
    • Search API
  2. Business Logic:

    • Document chunking strategies
    • BGE-M3 embedding generation
    • Permission/authorization logic
    • File processing pipeline

Fase 3: Integration Testing (Settimane 5-6)

  1. End-to-End Workflows:
    • Document upload → processing → search
    • User authentication → API access
    • Chat with knowledge base integration
    • Hierarchical entity relationships

Fase 4: Performance & Load Testing (Settimane 7-8)

  1. Performance Baselines:
    • Hierarchical API response times
    • Search latency optimization
    • File processing throughput
    • Concurrent user handling

10.4 Metriche di Successo Target

  • Code Coverage: Minimo 80% per componenti business-critical
  • Test Execution Time: Suite completa < 15 minuti
  • Flaky Tests: Tasso di fallimento casuale < 1%
  • Regression Detection: Identificazione automatica breaking changes
  • Performance Baseline: Response time < 200ms per 95% richieste API
  • Integration Success: Pipeline complete senza intervento manuale

11. Gestione Errori e Fallback

11.1 Strategie di Fallback

  • Circuit Breaker: Hystrix pattern per servizi esterni
  • Retry Policy: Exponential backoff con jitter
  • Timeout: 30 secondi per operazioni sincrone, 1 ora per processing
  • Graceful Degradation: Funzionalità ridotte in caso di fallimenti
// Circuit breaker implementation
class CircuitBreaker {
private failures = 0;
private lastFailureTime = 0;
private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED";

constructor(
private threshold = 5,
private timeout = 60000, // 1 minute
private monitoringPeriod = 10000, // 10 seconds
) {}

async execute<T>(
operation: () => Promise<T>,
fallback?: () => Promise<T>,
): Promise<T> {
if (this.state === "OPEN") {
if (Date.now() - this.lastFailureTime > this.timeout) {
this.state = "HALF_OPEN";
} else {
if (fallback) {
console.warn("Circuit breaker OPEN, executing fallback");
return await fallback();
}
throw new Error("Circuit breaker is OPEN");
}
}

try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();

if (fallback && this.state === "OPEN") {
console.warn("Operation failed, executing fallback");
return await fallback();
}
throw error;
}
}

private onSuccess() {
this.failures = 0;
this.state = "CLOSED";
}

private onFailure() {
this.failures++;
this.lastFailureTime = Date.now();

if (this.failures >= this.threshold) {
this.state = "OPEN";
console.error(`Circuit breaker opened after ${this.failures} failures`);
}
}
}

// Retry with exponential backoff
class RetryPolicy {
static async execute<T>(
operation: () => Promise<T>,
maxRetries = 3,
baseDelay = 1000,
maxDelay = 30000,
jitter = true,
): Promise<T> {
let lastError: Error;

for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;

if (attempt === maxRetries) {
break;
}

// Calculate delay with exponential backoff
let delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);

// Add jitter to prevent thundering herd
if (jitter) {
delay = delay * (0.5 + Math.random() * 0.5);
}

console.warn(
`Operation failed, retrying in ${delay}ms (attempt ${attempt + 1}/${maxRetries})`,
);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}

throw new Error(
`Operation failed after ${maxRetries} retries: ${lastError.message}`,
);
}
}

11.2 Gestione Eccezioni Globale

# FastAPI global exception handler
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import traceback
import logging

logger = logging.getLogger(__name__)

class EmblemaException(Exception):
"""Base exception for Emblema platform"""
def __init__(self, message: str, code: str = "GENERIC_ERROR", status_code: int = 500):
self.message = message
self.code = code
self.status_code = status_code
super().__init__(message)

class ProcessingError(EmblemaException):
"""Document processing related errors"""
def __init__(self, message: str, document_id: str = None):
super().__init__(message, "PROCESSING_ERROR", 422)
self.document_id = document_id

class SearchError(EmblemaException):
"""Search service related errors"""
def __init__(self, message: str, query: str = None):
super().__init__(message, "SEARCH_ERROR", 500)
self.query = query

@app.exception_handler(EmblemaException)
async def emblema_exception_handler(request: Request, exc: EmblemaException):
"""Handle custom Emblema exceptions"""
logger.error(f"Emblema exception: {exc.code} - {exc.message}", extra={
'error_code': exc.code,
'path': request.url.path,
'method': request.method
})

return JSONResponse(
status_code=exc.status_code,
content={
'error': {
'code': exc.code,
'message': exc.message,
'timestamp': datetime.utcnow().isoformat()
}
}
)

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle Pydantic validation errors"""
errors = []
for error in exc.errors():
errors.append({
'field': '.'.join(str(x) for x in error['loc'][1:]),
'message': error['msg'],
'type': error['type']
})

return JSONResponse(
status_code=422,
content={
'error': {
'code': 'VALIDATION_ERROR',
'message': 'Request validation failed',
'details': errors
}
}
)

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Catch-all exception handler"""
logger.error(f"Unexpected error: {str(exc)}", extra={
'path': request.url.path,
'method': request.method,
'traceback': traceback.format_exc()
})

return JSONResponse(
status_code=500,
content={
'error': {
'code': 'INTERNAL_SERVER_ERROR',
'message': 'An unexpected error occurred'
}
}
)

12. Considerazioni Non Funzionali

12.1 Maintainability

  • Clean Code: Principi SOLID e design patterns consolidati
  • Documentation: Comprehensive inline documentation e README
  • Code Reviews: Mandatory PR reviews con automated checks
  • Refactoring: Continuous code improvement e debt reduction
// Example: Clean architecture pattern per business logic
interface DocumentService {
create(data: CreateDocumentRequest): Promise<Document>;
process(id: string, config: ProcessingConfig): Promise<Task>;
search(query: SearchQuery): Promise<SearchResults>;
}

class DocumentServiceImpl implements DocumentService {
constructor(
private documentRepository: DocumentRepository,
private fileStorageService: FileStorageService,
private processingService: ProcessingService,
private notificationService: NotificationService,
) {}

async create(data: CreateDocumentRequest): Promise<Document> {
// 1. Validate input
await this.validateCreateRequest(data);

// 2. Store file
const file = await this.fileStorageService.upload(data.file);

// 3. Create document record
const document = await this.documentRepository.create({
name: data.name,
type: file.extension,
status: "pending",
fileId: file.id,
createdBy: data.userId,
});

// 4. Trigger processing
const task = await this.processingService.scheduleProcessing(
document.id,
data.config,
);

// 5. Send notification
await this.notificationService.notifyDocumentCreated(document, data.userId);

return document;
}

private async validateCreateRequest(
data: CreateDocumentRequest,
): Promise<void> {
if (!data.file) {
throw new ValidationError("File is required");
}

if (data.file.size > MAX_FILE_SIZE) {
throw new ValidationError("File size exceeds limit");
}

const allowedTypes = ["pdf", "mp3", "mp4", "txt", "md"];
if (!allowedTypes.includes(data.file.extension)) {
throw new ValidationError("Unsupported file type");
}
}
}

12.2 Scalability

  • Horizontal Scaling: Microservices stateless con load balancing
  • Database Sharding: Partitioning strategy per large datasets
  • Caching Layers: Multi-level caching con invalidation
  • Async Processing: Message queues per decoupling
# Horizontal scaling configuration per Celery workers
from celery import Celery
from kombu import Queue

# Queue specialization per document types
celery_app = Celery('emblema')

celery_app.conf.update(
# Task routing per specialized workers
task_routes={
'process_pdf_document': {'queue': 'pdf_processing'},
'process_audio_document': {'queue': 'audio_processing'},
'process_video_document': {'queue': 'video_processing'},
'generate_embeddings': {'queue': 'embedding_generation'},
'send_notification': {'queue': 'notifications'}
},

# Performance optimizations
worker_prefetch_multiplier=1,
task_acks_late=True,
worker_disable_rate_limits=True,

# Auto-scaling configuration
worker_autoscaler='celery.worker.autoscale:Autoscaler',
worker_max_tasks_per_child=1000,

# Queue configurations
task_queues=[
Queue('pdf_processing', routing_key='pdf'),
Queue('audio_processing', routing_key='audio'),
Queue('video_processing', routing_key='video'),
Queue('embedding_generation', routing_key='embeddings'),
Queue('notifications', routing_key='notifications')
]
)

# Database connection pooling
from sqlalchemy.pool import QueuePool

engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Connection pool size
max_overflow=30, # Additional connections
pool_pre_ping=True, # Validate connections
pool_recycle=3600 # Recycle connections every hour
)

12.3 Reliability

  • Health Checks: Comprehensive service monitoring
  • Graceful Shutdown: Clean service termination
  • Data Consistency: ACID transactions e compensating actions
  • Backup Strategy: Automated database e file backups
# Health check endpoints
from fastapi import FastAPI, HTTPException
from sqlalchemy import text
from redis.exceptions import ConnectionError as RedisConnectionError

@app.get("/health")
async def health_check():
"""Basic health check"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}

@app.get("/health/detailed")
async def detailed_health_check():
"""Detailed health check with dependencies"""
health_status = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"dependencies": {}
}

# Database health
try:
async with get_db_session() as db:
await db.execute(text("SELECT 1"))
health_status["dependencies"]["database"] = "healthy"
except Exception as e:
health_status["dependencies"]["database"] = f"unhealthy: {str(e)}"
health_status["status"] = "degraded"

# Redis health
try:
redis_client = get_redis_client()
await redis_client.ping()
health_status["dependencies"]["redis"] = "healthy"
except RedisConnectionError as e:
health_status["dependencies"]["redis"] = f"unhealthy: {str(e)}"
health_status["status"] = "degraded"

# Milvus health
try:
milvus_client = get_milvus_client()
collections = await milvus_client.list_collections()
health_status["dependencies"]["milvus"] = "healthy"
except Exception as e:
health_status["dependencies"]["milvus"] = f"unhealthy: {str(e)}"
health_status["status"] = "degraded"

if health_status["status"] == "degraded":
raise HTTPException(status_code=503, detail=health_status)

return health_status

# Graceful shutdown handler
import signal
import asyncio

class GracefulShutdown:
def __init__(self):
self.shutdown = False
self.tasks = set()

def signal_handler(self, signum, frame):
print(f"Received signal {signum}, initiating graceful shutdown...")
self.shutdown = True

async def wait_for_tasks(self, timeout=30):
"""Wait for running tasks to complete"""
if not self.tasks:
return

print(f"Waiting for {len(self.tasks)} tasks to complete...")

try:
await asyncio.wait_for(
asyncio.gather(*self.tasks, return_exceptions=True),
timeout=timeout
)
except asyncio.TimeoutError:
print(f"Timeout reached, cancelling {len(self.tasks)} remaining tasks")
for task in self.tasks:
task.cancel()

# Wait a bit more for cancellation
await asyncio.sleep(1)

def register_task(self, task):
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)

# Initialize graceful shutdown
shutdown_handler = GracefulShutdown()
signal.signal(signal.SIGTERM, shutdown_handler.signal_handler)
signal.signal(signal.SIGINT, shutdown_handler.signal_handler)

13. Rischi e Mitigazioni

RischioProbabilitàImpattoMitigazione
Milvus vector DB overloadMediaAltoConnection pooling, search query optimization, read replicas
File processing memory leakBassaAltoResource monitoring, worker restart policies, memory limits
BGE-M3 model unavailabilityBassaMolto AltoLocal model caching, fallback embedding services, model replication
MinIO storage corruptionBassaAltoBackup replication, integrity checks, disaster recovery procedures
Keycloak authentication failureMediaMolto AltoHA Keycloak setup, session caching, graceful degradation
PostgreSQL connection exhaustionMediaAltoConnection pooling, query optimization, read replicas
Celery worker deadlockMediaMedioTask timeout policies, worker health monitoring, automatic restart
LLM API rate limitingAltaMedioMultiple provider integration, request queuing, caching strategies

14. Appendici

14.1 Glossario

  • BGE-M3: Beijing Academy of Artificial Intelligence's multilingual embedding model (1024 dimensions)
  • MCP: Model Context Protocol per integrazione AI tools
  • Chunking: Processo di suddivisione documenti in segmenti per elaborazione
  • Milvus: Vector database open-source per similarity search
  • Celery: Distributed task queue per Python
  • Keycloak: Identity and access management solution
  • MinIO: S3-compatible object storage
  • Traefik: Modern reverse proxy e load balancer
  • LiteLLM: Unified interface per multiple LLM providers

14.2 Riferimenti Tecnici

Fine Documento

Questo documento è confidenziale e proprietario di Emblema AI Platform. La riproduzione o distribuzione non autorizzata è vietata.


Low Level Design v1.0 - Documento tecnico completo per implementazione sistema Emblema AI Platform

Questa pagina ti è stata utile?