Low Level Design
1. Introduzione
1.1 Scopo del Documento
Questo documento descrive il design dettagliato a basso livello del sistema Emblema AI Platform, specificando l'architettura tecnica, i componenti software, le interfacce e le implementazioni necessarie per una piattaforma AI enterprise completa.
1.2 Obiettivi del Sistema
- Fornire una piattaforma AI integrata per gestione documenti, chat intelligenti e knowledge base
- Supportare processing asincrono di documenti multimediali con chunking avanzato
- Implementare ricerca semantica ad alte prestazioni con Milvus vector database
- Garantire scalabilità orizzontale tramite architettura microservizi containerizzata
- Assicurare sicurezza enterprise con autenticazione Keycloak e controlli granulari
1.3 Riferimenti
- High Level Design (HLD): Architettura di Sistema
- Documentazione API: Riferimenti API
- Standards aziendali: CLAUDE.md project guidelines
2. Architettura del Sistema
2.1 Panoramica Architetturale
Emblema implementa un'architettura microservizi containerizzata basata su Next.js 14 per il frontend e servizi Python/Node.js specializzati per il backend. Il sistema utilizza PostgreSQL per dati relazionali, Milvus per ricerca vettoriale, Redis per caching e MinIO per storage di file.
2.2 Diagramma dell'Architettura
2.3 Tecnologie Utilizzate
- Frontend: Next.js 14, React 18, TypeScript, Tailwind CSS, Shadcn UI
- Backend: Python FastAPI, Node.js, Celery, Redis
- Database: PostgreSQL 15, Milvus 2.4, Redis 7
- Storage: MinIO S3-compatible object storage
- Authentication: Keycloak OpenID Connect
- AI/ML: LiteLLM, BGE-M3 embeddings, MinerU, WhisperX
- Infrastructure: Docker Compose, Traefik, Grafana/Prometheus
- Message Queue: Celery con Redis broker
- Monitoring: Grafana, Prometheus, Loki
- Notifiche: Novu per sistema di notifiche multi-canale
3. Componenti del Sistema
3.1 Componente www-emblema (Frontend Application)
3.1.1 Responsabilità
Applicazione Next.js 14 principale che fornisce interfaccia utente per gestione documenti, chat AI, knowledge base e amministrazione sistema. Implementa SSR/SSG per performance ottimali e SEO.
3.1.2 Interfacce
Input: HTTP requests, form submissions, file uploads Output: Server-side rendered pages, API responses, real-time notifications API Esposte:
/api/v1/chat- Chat AI endpoint/api/v1/file/upload- File upload con chunking/api/v1/[entityType]- Generic CRUD operations/api/auth/[...nextauth]- Authentication flows
3.1.3 Struttura Classi
// Architettura modulare Next.js App Router
interface APIEndpoint<TRequest, TResponse> {
method: 'GET' | 'POST' | 'PUT' | 'DELETE';
path: string;
handler: (req: TRequest) => Promise<TResponse>;
middleware: Middleware[];
validation: ValidationSchema;
}
// Chat API implementation
export const POST = withErrorHandling(async (req: Request) => {
const {
messages,
selectedChatModel,
knowledgeBaseIds,
agentIds,
contextItemIds,
parentId,
id,
} = await req.json();
const session = await auth();
if (!session) {
throw new RouteHandlerError("Unauthorized", "UNAUTHORIZED", 401);
}
// Context optimization with model-specific limits
const modelLimits = getModelContextLimits(selectedChatModel);
const contextOptimizer = createContextOptimizer(modelLimits);
// Stream AI response with MCP integration
return streamText({
model: aiModelProvider(selectedChatModel),
messages: optimizedMessages,
system: systemPrompt,
tools: mcpTools,
maxTokens: modelLimits.maxOutputTokens
});
});
// Form system with ref-based pattern
interface FormRef<T> {
submit: () => Promise<void>;
reset: (values?: T) => void;
getValues: () => T;
isDirty: () => boolean;
isValid: () => boolean;
}
export const DocumentForm = forwardRef<FormRef<DocumentSchema>, DocumentFormProps>(
({ defaultValues, onSubmit, variant = 'page' }, ref) => {
const form = useForm({
resolver: zodResolver(createDocumentSchema(t)),
defaultValues,
});
useImperativeHandle(ref, () => ({
submit: async () => await form.handleSubmit(onSubmit)(),
reset: (values) => form.reset(values || defaultValues),
getValues: () => form.getValues(),
isDirty: () => form.formState.isDirty,
isValid: () => form.formState.isValid,
}));
return <Form {...form}>...</Form>;
}
);
3.1.4 Diagramma di Sequenza
3.2 Componente background-task (Processing Service)
3.2.1 Responsabilità
Servizio Python FastAPI che gestisce elaborazione asincrona di documenti, audio e video tramite Celery workers. Implementa chunking intelligente, generazione embeddings e notificazioni.
3.2.2 Interfacce
Input: File uploads, processing configurations, task parameters Output: Processed chunks, embeddings, notifications, progress updates API Esposte:
/tasks/- Task management endpoints/artifacts/- AI artifact generation/cache/- Cache management utilities
3.2.3 Struttura Classi
# Pipeline pattern per elaborazione documenti
class DocumentProcessingPipeline:
def __init__(self):
self.stages = [
FileValidationStage(),
FormatOptimizationStage(),
ContentExtractionStage(),
ChunkingStage(),
EmbeddingGenerationStage(),
StorageStage(),
NotificationStage()
]
async def process(self, document_id: str) -> ProcessingResult:
context = ProcessingContext(document_id)
for stage in self.stages:
try:
context = await stage.execute(context)
await self.update_progress(document_id, stage.name)
except StageError as e:
await self.handle_stage_error(document_id, stage.name, e)
raise
return context.result
# Factory pattern per processors specializzati
class ProcessorFactory:
processors = {
'pdf': DocumentProcessor, # MinerU-based PDF extraction
'mp3': AudioProcessor, # WhisperX transcription + diarization
'mp4': VideoProcessor # Audio extraction + transcription
}
@classmethod
def get_processor(cls, file_type: str) -> BaseProcessor:
processor_class = cls.processors.get(file_type)
if not processor_class:
raise UnsupportedFileTypeError(f"No processor for {file_type}")
return processor_class()
# Celery task configuration
@shared_task(
bind=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 3, 'countdown': 60},
time_limit=3600, # 1 hour
soft_time_limit=3000 # 50 minutes
)
def process_document_task(self, document_id: str, config: dict):
try:
pipeline = DocumentProcessingPipeline()
result = pipeline.process(document_id)
return result
except Exception as exc:
logger.error(f"Document processing failed: {exc}")
raise self.retry(exc=exc)
4. Design del Database
4.1 Schema Logico
Il database PostgreSQL è strutturato con un design normalizzato che supporta multi-tenancy tramite user-based ownership. Ogni entità ha campi standard per audit (created_by, created_at, updated_at) e tipo di entità per supportare GraphQL introspection.
4.2 Tabelle Principali
4.2.1 Tabella users
CREATE TABLE users (
id UUID PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
first_name VARCHAR(255),
last_name VARCHAR(255),
entity_type VARCHAR(50) NOT NULL DEFAULT 'User',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
Descrizione Campi:
id: Chiave primaria UUID (sincronizzata con Keycloak)email: Email utente univocafirst_name,last_name: Dati anagrafici utenteentity_type: Tipo entità per GraphQL schemacreated_at,updated_at: Timestamp audit
4.2.2 Tabella document
CREATE TABLE document (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
type VARCHAR(50) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
created_by UUID REFERENCES users(id) ON DELETE CASCADE,
knowledge_base_id UUID REFERENCES knowledge_base(id) ON DELETE SET NULL,
file_id UUID REFERENCES file(id) ON DELETE SET NULL,
chunk_count INTEGER DEFAULT 0,
chunker_name VARCHAR(100),
chunker_config JSONB DEFAULT '{}',
summary TEXT,
entity_type VARCHAR(50) NOT NULL DEFAULT 'Document',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
Descrizione Campi:
id: Chiave primaria UUIDname: Nome documento definito dall'utentetype: Tipo documento (pdf, audio, video, etc.)status: Stato processing (pending, processing, completed, failed)created_by: FK utente proprietarioknowledge_base_id: FK knowledge base associata (opzionale)file_id: FK file storage associatochunk_count: Numero chunks generatichunker_name: Nome chunker utilizzatochunker_config: Configurazione chunking (JSONB)summary: Riassunto AI-generated del documento
4.2.3 Indici di Performance
-- Indici per query frequenti
CREATE INDEX idx_document_created_by ON document(created_by);
CREATE INDEX idx_document_kb ON document(knowledge_base_id);
CREATE INDEX idx_document_status ON document(status);
CREATE INDEX idx_document_type ON document(type);
CREATE INDEX idx_document_created_at ON document(created_at DESC);
-- JSONB GIN index per ricerche configurazione
CREATE INDEX idx_document_chunker_config_gin ON document USING GIN(chunker_config);
-- Indice composto per paginazione efficiente
CREATE INDEX idx_document_owner_status_created ON document(created_by, status, created_at DESC);
4.3 Schema Milvus (Vector Database)
# Collection schema per document chunks con BGE-M3 embeddings
from pymilvus import Collection, CollectionSchema, FieldSchema, DataType
chunk_fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=36, is_primary=True),
FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=36),
FieldSchema(name="chunk_id", dtype=DataType.VARCHAR, max_length=36),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024), # BGE-M3
FieldSchema(name="token_count", dtype=DataType.INT32),
FieldSchema(name="position", dtype=DataType.INT32),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="metadata", dtype=DataType.JSON),
FieldSchema(name="created_by", dtype=DataType.VARCHAR, max_length=36),
FieldSchema(name="created_at", dtype=DataType.INT64)
]
chunk_schema = CollectionSchema(
fields=chunk_fields,
description="Document chunks with BGE-M3 embeddings for semantic search"
)
# Ottimizzazione indici per performance
index_params = {
"metric_type": "COSINE", # Cosine similarity
"index_type": "IVF_FLAT", # Index type for balanced performance
"params": {"nlist": 1024} # Number of cluster centers
}
collection = Collection(
name="document_chunks",
schema=chunk_schema,
using='default'
)
collection.create_index(
field_name="embedding",
index_params=index_params
)
Ricerca Semantica con Milvus
Il retrieval di documenti basato su similarity search è gestito direttamente da Milvus, senza servizi intermediari. La ricerca semantica viene implementata nei componenti che necessitano di questa funzionalità:
# Esempio di ricerca diretta in Milvus (utilizzato in background-task e API endpoints)
from pymilvus import MilvusClient
class MilvusSearchService:
def __init__(self):
self.client = MilvusClient(
uri="http://milvus:19530",
user=os.getenv("MILVUS_USER", "root"),
password=os.getenv("MILVUS_PASSWORD", "Milvus")
)
async def semantic_search(
self,
query_embedding: List[float],
collection_name: str = "document_chunks",
limit: int = 10,
filters: Optional[str] = None
) -> List[Dict]:
"""
Esegue ricerca semantica direttamente su Milvus
Args:
query_embedding: Embedding del query (1024 dimensioni BGE-M3)
collection_name: Nome della collection Milvus
limit: Numero massimo di risultati
filters: Espressione di filtro Milvus (es: "document_id in ['doc1', 'doc2']")
Returns:
Lista di chunks con score di similarità
"""
search_params = {
"metric_type": "COSINE",
"params": {"nprobe": 10}
}
results = self.client.search(
collection_name=collection_name,
data=[query_embedding],
anns_field="embedding",
search_params=search_params,
limit=limit,
expr=filters,
output_fields=["id", "content", "metadata", "document_id", "chunk_index"]
)
# Formatta risultati con score
formatted_results = []
for hits in results:
for hit in hits:
formatted_results.append({
"id": hit.get("id"),
"content": hit.get("content"),
"metadata": hit.get("metadata"),
"document_id": hit.get("document_id"),
"chunk_index": hit.get("chunk_index"),
"score": hit.distance # Similarità coseno
})
return formatted_results
Integrazione nei Componenti:
- API Endpoints (
/api/v1/search): Utilizzano MilvusSearchService per query dirette - Background Tasks: Inseriscono embeddings durante il processing
- Chat API: Recupera context rilevante per RAG (Retrieval Augmented Generation)
- Knowledge Base Search: Filtra risultati per KB specifiche
4.3 Hasura GraphQL Engine
Hasura è il layer GraphQL che fornisce API automatiche real-time sopra il database PostgreSQL, con capacità avanzate di autorizzazione e federazione.
Caratteristiche Principali
- API GraphQL Automatiche: Generazione istantanea di query, mutations e subscriptions dal database schema
- Supporto Multi-Database: PostgreSQL, MySQL, SQL Server, BigQuery, e altri
- Remote Schema Stitching: Integrazione di GraphQL endpoints esterni in un unico schema unificato
- Permessi Granulari: Controllo accesso a livello di row e column basato su ruoli e condizioni
- Real-time Subscriptions: Aggiornamenti live tramite WebSocket per modifiche database
- Event Triggers: Webhook automatici su operazioni database
- Actions & Remote Joins: Estensione schema con business logic custom
Configurazione Metadata
# hasura/metadata/databases/default/tables/public_document.yaml
table:
name: document
schema: public
object_relationships:
- name: creator
using:
foreign_key_constraint_on: created_by
- name: knowledge_base
using:
foreign_key_constraint_on: knowledge_base_id
array_relationships:
- name: chunks
using:
foreign_key_constraint_on:
column: document_id
table:
name: chunk
schema: public
insert_permissions:
- role: user
permission:
check:
created_by:
_eq: X-Hasura-User-Id
columns:
- name
- type
- knowledge_base_id
- chunker_config
select_permissions:
- role: user
permission:
columns: "*"
filter:
_or:
- created_by:
_eq: X-Hasura-User-Id
- knowledge_base:
permissions:
user_id:
_eq: X-Hasura-User-Id
action:
_eq: read
update_permissions:
- role: user
permission:
columns:
- name
- summary
- chunker_config
filter:
created_by:
_eq: X-Hasura-User-Id
check: null
Remote Schema Stitching
// Integrazione con servizio di search esterno
const searchSchema = makeRemoteExecutableSchema({
schema: await introspectSchema(searchLink),
link: searchLink,
});
// Hasura schema extension
extend type Query {
searchDocuments(
query: String!
knowledgeBaseIds: [uuid!]
limit: Int = 10
): DocumentSearchResult @remote
}
type DocumentSearchResult {
results: [DocumentSearchHit!]!
total: Int!
processingTime: Float!
}
Event-Driven Architecture
# Event trigger per processing automatico
- table:
name: document
schema: public
event_triggers:
- name: process_new_document
definition:
enable_manual: true
insert:
columns: "*"
retry_conf:
num_retries: 3
interval_sec: 10
timeout_sec: 60
webhook: http://background-task:8000/webhooks/process-document
headers:
- name: X-Webhook-Secret
value_from_env: WEBHOOK_SECRET
5. API Design
5.1 Architettura API Gerarchica
Il sistema API di Emblema utilizza un'architettura gerarchica che riflette le relazioni tra entità, ottimizzando i controlli di sicurezza e mantenendo flessibilità per operazioni specifiche.
Principi Fondamentali
- Gerarchia delle Risorse: Le entità sono organizzate in relazioni parent-child
- Controllo Permessi su Root: Il check di autorizzazione avviene solo sull'elemento root per ottimizzare le performance
- CRUD Standardizzato: Operazioni create, read, update, delete uniformi per tutte le entità
- Azioni Personalizzate: Endpoint dedicati per operazioni specifiche su entità
- API Specializzate: Endpoint non vincolati dalla gerarchia per funzionalità complesse
Schema URL Gerarchico
/api/v1/{Entity1}/{id1}/{Entity2}/{id2}/{Entity3}/{id3}
Esempio concreto:
/api/v1/Meeting/123/Document/456/Chunk/789
Questa struttura indica:
- Meeting con ID 123 (root entity - qui avviene il check permessi)
- Document con ID 456 (child di Meeting)
- Chunk con ID 789 (child di Document)
5.2 Endpoint REST Gerarchici
5.2.1 CRUD Operations
// Generic CRUD handler per entità gerarchiche
interface HierarchicalRequest {
entityPath: EntityPathSegment[];
operation: "GET" | "POST" | "PUT" | "DELETE";
body?: any;
}
interface EntityPathSegment {
entityType: string;
entityId?: string;
}
// Esempio: GET /api/v1/Meeting/123/Document
const handler = async (req: HierarchicalRequest) => {
// 1. Parse entity path
const rootEntity = req.entityPath[0]; // Meeting
const childEntity = req.entityPath[1]; // Document
// 2. Check permissions ONLY on root entity
const hasAccess = await checkPermission(
req.userId,
rootEntity.entityType,
"read",
{ entityId: rootEntity.entityId },
);
if (!hasAccess) {
throw new ForbiddenError("Access denied to Meeting");
}
// 3. Execute operation on child entity
switch (req.operation) {
case "GET":
return await getDocumentsForMeeting(rootEntity.entityId);
case "POST":
return await createDocumentInMeeting(rootEntity.entityId, req.body);
// ... altri metodi
}
};
5.2.2 Esempi Endpoint Gerarchici
GET /api/v1/KnowledgeBase/kb-123/Document
- Lista tutti i documenti nella knowledge base kb-123
- Check permessi su KnowledgeBase
POST /api/v1/Meeting/meet-456/Document
- Crea nuovo documento nel meeting meet-456
- Check permessi su Meeting
GET /api/v1/Document/doc-789/Chunk?page=1&size=20
- Lista chunks del documento doc-789 con paginazione
- Check permessi su Document
PUT /api/v1/Task/task-111/Artifact/art-222
- Aggiorna artifact art-222 del task task-111
- Check permessi su Task
5.2.3 Azioni Personalizzate
Per operazioni che vanno oltre il CRUD standard, utilizziamo endpoint action:
/api/v1/{Entity1}/{id1}/{Entity2}/{id2}/action/{actionName}
Esempi:
// POST /api/v1/Document/doc-123/Chunk/chunk-456/action/regenerate
// Rigenera embeddings per un chunk specifico
export const regenerateChunkEmbeddings = async (
documentId: string,
chunkId: string,
config?: RegenerateConfig,
) => {
// Check permission on document (root)
await checkDocumentAccess(documentId, "update");
// Execute specialized action
const chunk = await getChunk(chunkId);
const newEmbeddings = await generateEmbeddings(chunk.content, config);
await updateChunkEmbeddings(chunkId, newEmbeddings);
return { success: true, chunkId, embeddingDimensions: newEmbeddings.length };
};
// POST /api/v1/KnowledgeBase/kb-123/action/reindex
// Reindicizza tutti i documenti di una knowledge base
export const reindexKnowledgeBase = async (
knowledgeBaseId: string,
options?: ReindexOptions,
) => {
await checkKnowledgeBaseAccess(knowledgeBaseId, "admin");
const taskId = await scheduleReindexTask(knowledgeBaseId, options);
return { taskId, status: "scheduled" };
};
// POST /api/v1/Meeting/meet-123/Document/doc-456/action/export
// Esporta documento in formato specifico
export const exportDocument = async (
meetingId: string,
documentId: string,
format: "pdf" | "docx" | "markdown",
) => {
await checkMeetingAccess(meetingId, "read");
const exportUrl = await generateExport(documentId, format);
return { exportUrl, expiresAt: new Date(Date.now() + 3600000) };
};
5.3 API Specializzate Non Gerarchiche
Alcune funzionalità richiedono endpoint dedicati che non seguono la struttura gerarchica:
5.3.1 Chat API
POST /api/v1/chat
Endpoint complesso per conversazioni AI che integra multiple risorse:
interface ChatRequest {
id: string;
messages: Message[];
selectedChatModel: string;
knowledgeBaseIds?: string[]; // Multiple KB access
agentIds?: string[]; // Multiple agents
contextItemIds?: string[]; // Documents, chunks, etc.
parentId?: string; // Thread management
}
// Implementation con permission checks multipli
export const chatHandler = async (req: ChatRequest) => {
// 1. Check access to all referenced resources
const accessChecks = await Promise.all([
...req.knowledgeBaseIds.map((id) => checkKBAccess(id, "read")),
...req.agentIds.map((id) => checkAgentAccess(id, "use")),
...req.contextItemIds.map((id) => checkContextAccess(id, "read")),
]);
if (!accessChecks.every((check) => check === true)) {
throw new ForbiddenError("Access denied to some resources");
}
// 2. Build context from all sources
const context = await buildChatContext(req);
// 3. Stream AI response
return streamAIResponse(context, req.selectedChatModel);
};
5.3.2 File Upload API
POST /api/v1/file/upload/init PUT /api/v1/file/upload/chunk POST /api/v1/file/upload/complete
Sistema di upload multi-part per file di grandi dimensioni:
// Inizializza upload session
export const initUpload = async (req: UploadInitRequest) => {
const session = await createUploadSession({
fileName: req.fileName,
fileSize: req.fileSize,
chunks: Math.ceil(req.fileSize / req.chunkSize),
metadata: req.metadata,
});
return {
uploadId: session.id,
chunkUrls: generateSignedUrls(session),
expiresAt: session.expiresAt,
};
};
// Upload singolo chunk
export const uploadChunk = async (
uploadId: string,
chunkIndex: number,
data: Buffer,
) => {
const session = await getUploadSession(uploadId);
if (session.userId !== req.userId) {
throw new ForbiddenError("Not your upload session");
}
await storeChunk(uploadId, chunkIndex, data);
await updateProgress(uploadId, chunkIndex);
return {
chunkIndex,
received: data.length,
progress: calculateProgress(session),
};
};
5.3.3 Search API
POST /api/v1/search
Ricerca semantica cross-entity con filtri complessi:
interface SearchRequest {
query: string;
filters: {
entityTypes?: string[];
knowledgeBaseIds?: string[];
dateRange?: { from: Date; to: Date };
tags?: string[];
};
options: {
limit?: number;
offset?: number;
includeHighlights?: boolean;
minScore?: number;
};
}
export const searchHandler = async (req: SearchRequest) => {
// Build permission-aware search scope
const accessibleResources = await getUserAccessibleResources(req.userId);
const searchScope = intersectFilters(req.filters, accessibleResources);
// Execute vector search with Milvus
const results = await vectorSearch({
query: req.query,
scope: searchScope,
options: req.options,
});
// Enrich with metadata and highlights
return enrichSearchResults(results, req.options);
};
5.4 Controlli di Accesso Ottimizzati
// Cache dei permessi per ottimizzare performance
class PermissionCache {
private cache = new LRUCache<string, boolean>({
max: 10000,
ttl: 300000, // 5 minuti
});
async checkAccess(
userId: string,
entityType: string,
entityId: string,
action: string,
): Promise<boolean> {
const cacheKey = `${userId}:${entityType}:${entityId}:${action}`;
// Check cache first
const cached = this.cache.get(cacheKey);
if (cached !== undefined) return cached;
// Load from database
const hasAccess = await this.loadPermission(
userId,
entityType,
entityId,
action,
);
this.cache.set(cacheKey, hasAccess);
return hasAccess;
}
private async loadPermission(
userId: string,
entityType: string,
entityId: string,
action: string,
): Promise<boolean> {
// 1. Check ownership
const entity = await getEntity(entityType, entityId);
if (entity.createdBy === userId) return true;
// 2. Check group permissions
const userGroups = await getUserGroups(userId);
const groupPerms = await getGroupPermissions(userGroups, entityType);
// 3. Check specific permissions
const specificPerms = await getEntityPermissions(
userId,
entityType,
entityId,
);
return evaluatePermissions(action, groupPerms, specificPerms);
}
}
5.5 GraphQL Schema (Hasura)
// Error handling middleware per API routes
interface APIError {
code: string;
message: string;
details?: any;
statusCode: number;
}
class RouteHandlerError extends Error {
constructor(
public message: string,
public code: string,
public statusCode: number = 500,
public details?: any,
) {
super(message);
this.name = "RouteHandlerError";
}
}
export const withErrorHandling = (handler: Function) => {
return async (req: Request, context?: any) => {
try {
return await handler(req, context);
} catch (error) {
if (error instanceof RouteHandlerError) {
return NextResponse.json(
{
error: {
code: error.code,
message: error.message,
details: error.details,
},
},
{ status: error.statusCode },
);
}
// Log unexpected errors
console.error("Unexpected API error:", error);
return NextResponse.json(
{
error: {
code: "INTERNAL_SERVER_ERROR",
message: "An unexpected error occurred",
},
},
{ status: 500 },
);
}
};
};
6. Sicurezza
6.1 Autenticazione
- Tipo: OpenID Connect tramite Keycloak
- JWT Access Token: Validità 15 minuti
- Refresh Token: Validità 7 giorni
- Session Management: NextAuth.js con cookie sicuri
// NextAuth configuration per Keycloak
export const authConfig: NextAuthConfig = {
providers: [
Keycloak({
clientId: process.env.AUTH_KEYCLOAK_ID!,
clientSecret: process.env.AUTH_KEYCLOAK_SECRET!,
issuer: process.env.AUTH_KEYCLOAK_ISSUER!,
authorization: {
params: {
scope: "openid email profile",
response_type: "code",
grant_type: "authorization_code",
},
},
}),
],
callbacks: {
jwt({ token, account, profile }) {
if (account) {
token.accessToken = account.access_token;
token.refreshToken = account.refresh_token;
token.expiresAt = account.expires_at;
}
return token;
},
session({ session, token }) {
session.accessToken = token.accessToken;
session.user.id = token.sub;
return session;
},
},
session: {
strategy: "jwt",
maxAge: 7 * 24 * 60 * 60, // 7 days
},
};
6.2 Autorizzazione
- Modello: RBAC (Role-Based Access Control) + ABAC (Attribute-Based)
- Ruoli: Admin, Manager, User, Guest
- Permissions: Resource-level con conditions granulari
// Permission system con conditions
interface Permission {
resource: string; // 'document', 'knowledge_base', etc.
action: string; // 'create', 'read', 'update', 'delete'
conditions?: AccessCondition[];
}
interface AccessCondition {
field: string; // 'created_by', 'status', 'knowledge_base_id'
operator: "eq" | "ne" | "in" | "contains" | "gt" | "lt";
value: any;
}
// Example: User can only read their own completed documents
const userDocumentReadPermission: Permission = {
resource: "document",
action: "read",
conditions: [
{ field: "created_by", operator: "eq", value: "{{user.id}}" },
{ field: "status", operator: "eq", value: "completed" },
],
};
class PermissionService {
async checkPermission(
userId: string,
resource: string,
action: string,
context?: Record<string, any>,
): Promise<boolean> {
const userPermissions = await this.getUserPermissions(userId);
for (const permission of userPermissions) {
if (
this.matchesResource(permission.resource, resource) &&
this.matchesAction(permission.action, action)
) {
if (!permission.conditions) return true;
const conditionsMet = await this.evaluateConditions(
permission.conditions,
{ ...context, user: { id: userId } },
);
if (conditionsMet) return true;
}
}
return false;
}
}
6.3 Crittografia
- Dati sensibili: AES-256-GCM
- Password: bcrypt (12 rounds)
- Comunicazione: TLS 1.3 (certificati personalizzati o Let's Encrypt)
- Key rotation: Supporto per multiple encryption keys
# Encryption service con key rotation
from cryptography.fernet import Fernet, MultiFernet
import os
from typing import List
class DataEncryptionService:
def __init__(self):
# Current key for new encryptions
self.current_key = os.getenv('ENCRYPTION_KEY_CURRENT')
# Previous keys for decryption (key rotation)
previous_keys_str = os.getenv('ENCRYPTION_KEYS_PREVIOUS', '')
self.previous_keys = [k.strip() for k in previous_keys_str.split(',') if k.strip()]
# MultiFernet per automatic key rotation
all_keys = [self.current_key] + self.previous_keys
self.cipher = MultiFernet([Fernet(key.encode()) for key in all_keys])
def encrypt(self, data: str) -> str:
"""Encrypt with current key"""
return self.cipher.encrypt(data.encode()).decode()
def decrypt(self, encrypted_data: str) -> str:
"""Decrypt with any available key (automatic rotation)"""
try:
return self.cipher.decrypt(encrypted_data.encode()).decode()
except Exception as e:
raise DecryptionError(f"Unable to decrypt data: {e}")
def rotate_encryption(self, encrypted_data: str) -> str:
"""Re-encrypt data with current key"""
decrypted = self.decrypt(encrypted_data)
return self.encrypt(decrypted)
6.4 Input Validation
- Schema-based validation: Zod schemas con i18n
- SQL Injection: Prepared statements e ORM escaping
- XSS Protection: Content Security Policy e sanitization
- File Upload: Tipo MIME validation, virus scanning, size limits
// Schema validation con internazionalizzazione
export const createDocumentSchema = (t: TFunction) => {
const v = createSchemaFactory("document")(t);
return z.object({
name: z
.string({
required_error: v.string("name").required(),
invalid_type_error: v.string("name").required(),
})
.min(1, v.string("name").required())
.max(255, v.string("name").maxLength(255)),
knowledgeBaseId: z
.string()
.uuid(v.string("knowledgeBaseId").uuid())
.optional(),
tags: z
.array(z.string().min(1).max(50))
.max(10, v.string("tags").maxItems(10))
.optional(),
chunkingConfig: z
.object({
strategy: z.enum([
"recursive",
"semantic",
"agentic",
"sentence",
"fixed",
]),
chunkSize: z.number().min(100).max(2048).default(512),
chunkOverlap: z.number().min(0).max(500).default(75),
})
.optional(),
});
};
7. Performance e Scalabilità
7.1 Requisiti Performance
- Response Time: < 200ms per 95% delle richieste API
- Throughput: 1000 richieste/secondo per endpoint principali
- Search Latency: < 100ms per ricerche semantiche (Milvus)
- File Processing: < 2 minuti per documenti PDF standard
- Availability: 99.9% uptime con graceful degradation
7.2 Strategie di Caching
// Multi-layer caching system
class CacheService {
private l1Cache = new LRUCache<string, any>({ max: 1000, ttl: 300000 }); // 5 minuti L1
private l2Cache: RedisClient; // L2 distribuito
constructor() {
this.l2Cache = new Redis({
host: "redis",
port: 6379,
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3,
});
}
async get<T>(key: string): Promise<T | null> {
// L1 Cache (in-memory)
const l1Value = this.l1Cache.get(key);
if (l1Value !== undefined) {
this.recordCacheHit("L1", key);
return l1Value;
}
// L2 Cache (Redis)
try {
const l2Value = await this.l2Cache.get(key);
if (l2Value) {
const parsed = JSON.parse(l2Value);
this.l1Cache.set(key, parsed); // Warm L1
this.recordCacheHit("L2", key);
return parsed;
}
} catch (error) {
console.warn("L2 cache error:", error);
}
this.recordCacheMiss(key);
return null;
}
async set<T>(key: string, value: T, ttl: number = 3600): Promise<void> {
// Store in both levels
this.l1Cache.set(key, value, { ttl: Math.min(ttl, 300) * 1000 });
try {
await this.l2Cache.setex(key, ttl, JSON.stringify(value));
} catch (error) {
console.warn("L2 cache write error:", error);
}
}
// Cache invalidation patterns
async invalidatePattern(pattern: string): Promise<void> {
// L1 wildcard invalidation
const l1Keys = Array.from(this.l1Cache.keys());
const matchingL1Keys = l1Keys.filter((key) => key.includes(pattern));
matchingL1Keys.forEach((key) => this.l1Cache.delete(key));
// L2 pattern-based invalidation
const stream = this.l2Cache.scanStream({ match: pattern });
const pipeline = this.l2Cache.pipeline();
stream.on("data", (keys: string[]) => {
keys.forEach((key) => pipeline.del(key));
});
stream.on("end", () => {
pipeline.exec();
});
}
}
7.3 Ottimizzazione Database
-- Connection pooling configuration
-- postgresql.conf optimizations
max_connections = 200
shared_buffers = 256MB
effective_cache_size = 1GB
work_mem = 4MB
maintenance_work_mem = 64MB
wal_buffers = 16MB
checkpoint_completion_target = 0.9
random_page_cost = 1.1 -- For SSD storage
-- Query optimization example
EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
SELECT
d.id,
d.name,
d.status,
d.chunk_count,
kb.name as knowledge_base_name,
f.size as file_size
FROM document d
LEFT JOIN knowledge_base kb ON d.knowledge_base_id = kb.id
LEFT JOIN file f ON d.file_id = f.id
WHERE d.created_by = $1
AND d.status = ANY($2)
AND d.created_at >= $3
ORDER BY d.created_at DESC
LIMIT $4 OFFSET $5;
-- Materialized view per analytics
CREATE MATERIALIZED VIEW document_analytics AS
SELECT
DATE_TRUNC('day', created_at) as date,
type,
status,
COUNT(*) as document_count,
AVG(chunk_count) as avg_chunks,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed_count
FROM document
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY DATE_TRUNC('day', created_at), type, status;
-- Automatic refresh job
CREATE OR REPLACE FUNCTION refresh_analytics()
RETURNS void AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY document_analytics;
ANALYZE document_analytics;
END;
$$ LANGUAGE plpgsql;
-- Scheduled refresh (via pg_cron extension)
SELECT cron.schedule('refresh-analytics', '*/15 * * * *', 'SELECT refresh_analytics();');
8. Monitoraggio e Logging
8.1 Logging Strutturato
- Formato: JSON structured logging
- Livelli: ERROR, WARN, INFO, DEBUG
- Retention: 30 giorni con rotazione automatica
- Aggregazione: Loki per log centralization
// Structured logging service
interface LogContext {
userId?: string;
requestId: string;
action: string;
resource?: string;
duration?: number;
metadata?: Record<string, any>;
}
class Logger {
private winston: winston.Logger;
constructor() {
this.winston = winston.createLogger({
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json(),
),
transports: [
new winston.transports.Console(),
new winston.transports.File({
filename: "app.log",
maxsize: 100 * 1024 * 1024, // 100MB
maxFiles: 5,
}),
],
});
}
info(message: string, context: LogContext) {
this.winston.info(message, {
...context,
timestamp: new Date().toISOString(),
level: "info",
});
}
error(message: string, error: Error, context: LogContext) {
this.winston.error(message, {
...context,
error: {
name: error.name,
message: error.message,
stack: error.stack,
},
timestamp: new Date().toISOString(),
level: "error",
});
}
// Performance logging
performance(action: string, duration: number, context: Partial<LogContext>) {
this.info(`Performance: ${action}`, {
...context,
action: `perf:${action}`,
duration,
requestId: context.requestId || "unknown",
});
}
}
8.2 Metriche (Prometheus)
// Prometheus metrics configuration
import prometheus from "prom-client";
// Application metrics
const httpRequestDuration = new prometheus.Histogram({
name: "http_request_duration_seconds",
help: "HTTP request duration in seconds",
labelNames: ["method", "route", "status_code", "user_id"],
buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10],
});
const documentProcessingDuration = new prometheus.Histogram({
name: "document_processing_duration_seconds",
help: "Document processing duration in seconds",
labelNames: ["document_type", "chunker_name", "status"],
buckets: [1, 5, 10, 30, 60, 300, 600, 1800, 3600],
});
const searchLatency = new prometheus.Histogram({
name: "search_latency_seconds",
help: "Search query latency in seconds",
labelNames: ["search_type", "collection", "cache_hit"],
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
});
const activeUsers = new prometheus.Gauge({
name: "active_users_total",
help: "Number of active users in the last 5 minutes",
});
// Database connection pool metrics
const dbConnections = new prometheus.Gauge({
name: "database_connections_active",
help: "Number of active database connections",
labelNames: ["database", "state"],
});
// Custom middleware per metriche HTTP
export const metricsMiddleware = (
req: Request,
res: Response,
next: Function,
) => {
const start = Date.now();
res.on("finish", () => {
const duration = (Date.now() - start) / 1000;
httpRequestDuration
.labels(
req.method,
req.route?.path || req.path,
res.statusCode.toString(),
req.user?.id || "anonymous",
)
.observe(duration);
});
next();
};
8.3 Alerting (Grafana)
# Grafana alert rules configuration
apiVersion: 1
groups:
- name: emblema-alerts
folder: Emblema
interval: 1m
rules:
- uid: high-error-rate
title: High Error Rate
condition: C
data:
- refId: A
queryType: ""
model:
expr: 'rate(http_requests_total{status_code=~"5.."}[5m]) > 0.1'
interval: ""
refId: A
noDataState: NoData
execErrState: Alerting
for: 5m
annotations:
summary: "High error rate detected: {{ $value }} errors/sec"
description: "Error rate is above 10% for the last 5 minutes"
labels:
severity: critical
team: platform
- uid: document-processing-failures
title: Document Processing Failures
condition: C
data:
- refId: A
queryType: ""
model:
expr: "increase(document_processing_failures_total[10m]) > 5"
interval: ""
refId: A
for: 5m
annotations:
summary: "Multiple document processing failures"
description: "{{ $value }} documents failed processing in the last 10 minutes"
labels:
severity: warning
team: ai-services
- uid: search-latency-high
title: Search Latency High
condition: C
data:
- refId: A
queryType: ""
model:
expr: "histogram_quantile(0.95, rate(search_latency_seconds_bucket[5m])) > 0.5"
interval: ""
refId: A
for: 3m
annotations:
summary: "Search latency is high: {{ $value }}s"
description: "95th percentile search latency is above 500ms"
labels:
severity: warning
team: search
9. Deployment e Configurazione
9.1 Ambiente di Deployment
- Containerizzazione: Docker multi-stage builds
- Orchestrazione: Docker Compose per development, Kubernetes per production
- CI/CD: GitHub Actions con security scanning
- Reverse Proxy: Traefik con automatic SSL/TLS
# Multi-stage Dockerfile per www-emblema
FROM node:18-alpine as base
RUN apk add --no-cache libc6-compat
WORKDIR /app
# Install dependencies based on the preferred package manager
COPY package.json pnpm-lock.yaml* ./
RUN corepack enable pnpm && pnpm install --frozen-lockfile
# Build stage
FROM base as builder
WORKDIR /app
COPY . .
RUN pnpm build
# Production stage
FROM node:18-alpine as runner
WORKDIR /app
ENV NODE_ENV=production
ENV NEXT_TELEMETRY_DISABLED=1
RUN addgroup --system --gid 1001 nodejs
RUN adduser --system --uid 1001 nextjs
COPY --from=builder /app/public ./public
COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./
COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static
USER nextjs
EXPOSE 3000
ENV PORT=3000
ENV HOSTNAME="0.0.0.0"
CMD ["node", "server.js"]
9.2 Configurazione Docker Compose
# docker-compose.yaml (excerpt)
version: "3.8"
services:
www-emblema:
image: emblema/www:${EMBLEMA_VERSION:-dev}
restart: always
depends_on:
- milvus
- minio
- graphql-engine
- keycloak
- litellm
- background-task
environment:
- HASURA_API_URL=http://graphql-engine:8080/v1/graphql
- MILVUS_API_URL=http://milvus:19530/v2/vectordb
- LITELLM_API_URL=http://litellm:4000/v1
- BACKGROUND_TASK_API_URL=http://background-task
- AUTH_KEYCLOAK_ISSUER=${KEYCLOAK_REALM_URL}
- MAX_FILE_SIZE=${MAX_FILE_SIZE:-1073741824}
labels:
- "traefik.enable=true"
- "traefik.http.routers.emblema-web.rule=Host(`${EMBLEMA_WEB_HOSTNAME}`)"
- "traefik.http.routers.emblema-web.entrypoints=websecure"
- "traefik.http.routers.emblema-web.tls=true"
- "traefik.http.services.emblema-web.loadbalancer.server.port=3000"
networks:
- emblema
background-task:
image: emblema/background-task:${EMBLEMA_VERSION:-dev}
restart: always
depends_on:
- redis
- milvus
- minio
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- MILVUS_URI=http://milvus:19530
- MINIO_ENDPOINT=minio:9000
- BGE_MODEL_PATH=/app/models/bge-m3
volumes:
- ./models:/app/models:ro
- temp_processing:/tmp/processing
networks:
- emblema
background-task-worker:
image: emblema/background-task:${EMBLEMA_VERSION:-dev}
restart: always
command:
[
"celery",
"worker",
"-A",
"app.celery_app",
"--loglevel=info",
"--concurrency=4",
]
depends_on:
- redis
- background-task
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
volumes:
- temp_processing:/tmp/processing
networks:
- emblema
deploy:
replicas: 2
volumes:
temp_processing:
driver: local
networks:
emblema:
external: true
9.3 Variabili di Ambiente
| Variable | Descrizione | Valore Default |
| EMBLEMA_VERSION | Versione Docker images | dev |
| BASE_DOMAIN | Domain base per servizi | emblema.local |
| USE_SSL | Enable SSL/TLS | 1 |
| POSTGRES_USER | PostgreSQL username | emblema |
| POSTGRES_PASSWORD | PostgreSQL password | generata |
| REDIS_MASTER_PASSWORD | Redis master password | generata |
| KEYCLOAK_ADMIN | Keycloak admin user | emblema |
| KEYCLOAK_ADMIN_PASSWORD | Keycloak admin password | generata |
| HASURA_ADMIN_SECRET | Hasura GraphQL admin secret | generata |
| MILVUS_USER | Milvus username | root |
| MILVUS_PASSWORD | Milvus password | Milvus |
| MINIO_ROOT_USER | MinIO admin user | emblema |
| MAX_FILE_SIZE | Max file upload size (bytes) | 1073741824 |
| DEFAULT_CHAT_MODEL | Default LLM model | gpt-4o-mini |
| BGE_MODEL_PATH | BGE-M3 model location | /app/models/bge-m3 |
10. Testing
10.1 Stato Attuale
⚠️ IMPORTANTE: Attualmente il progetto Emblema non ha test implementati. Questa sezione descrive il piano di implementazione testing da sviluppare.
Situazione Corrente
- Frontend Tests: 0% coverage - Nessun test implementato
- Backend Tests: 0% coverage - Nessun test implementato
- Integration Tests: Non presenti
- E2E Tests: Non configurati
- Performance Tests: Non esistenti
Debito Tecnico
L'assenza di test rappresenta un rischio significativo per:
- Regressioni non rilevate durante sviluppo
- Difficoltà nel refactoring sicuro
- Mancanza di documentazione eseguibile
- Impossibilità di CI/CD affidabile
10.2 Piano di Implementazione Testing
Fase 1: Testing Foundation (Settimane 1-2)
Obiettivo: Configurare infrastruttura di testing
# Frontend testing setup
pnpm add -D jest @testing-library/react @testing-library/jest-dom
pnpm add -D @testing-library/user-event msw whatwg-fetch
pnpm add -D @types/jest jest-environment-jsdom
# Backend testing setup
cd apps/background-task
uv add --dev pytest pytest-asyncio pytest-cov
uv add --dev factory-boy faker pytest-mock
uv add --dev testcontainers pytest-xdist
Configurazione Jest (Da Implementare):
// jest.config.js
module.exports = {
testEnvironment: "jsdom",
setupFilesAfterEnv: ["<rootDir>/jest.setup.js"],
testPathIgnorePatterns: ["/node_modules/", "/.next/"],
transform: {
"^.+\\.(js|jsx|ts|tsx)$": ["babel-jest", { presets: ["next/babel"] }],
},
moduleNameMapper: {
"^@/(.*)$": "<rootDir>/$1",
"\\.(css|less|sass|scss)$": "identity-obj-proxy",
},
collectCoverageFrom: [
"app/**/*.{js,jsx,ts,tsx}",
"components/**/*.{js,jsx,ts,tsx}",
"!**/*.d.ts",
"!**/node_modules/**",
],
};
Configurazione Pytest (Da Implementare):
# pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
asyncio_mode = auto
addopts =
--verbose
--cov=app
--cov-report=html
--cov-report=term-missing
--cov-fail-under=80
Fase 2: Core Unit Tests (Settimane 3-4)
Frontend - Componenti Critici da Testare:
// Da Implementare: Test per DocumentUpload component
describe("DocumentUpload", () => {
it("should validate file types before upload", async () => {
// Test implementation needed
});
it("should handle chunked upload for large files", async () => {
// Test implementation needed
});
it("should show progress during upload", async () => {
// Test implementation needed
});
});
// Da Implementare: Test per Chat API handler
describe("Chat API Handler", () => {
it("should require authentication", async () => {
// Test implementation needed
});
it("should validate model selection", async () => {
// Test implementation needed
});
it("should stream responses correctly", async () => {
// Test implementation needed
});
});
Backend - Servizi Critici da Testare:
# Da Implementare: Test per Document Processor
class TestDocumentProcessor:
async def test_pdf_processing(self):
"""Test MinerU PDF processing pipeline"""
# Implementation needed
pass
async def test_audio_transcription(self):
"""Test WhisperX audio processing"""
# Implementation needed
pass
async def test_chunking_strategies(self):
"""Test different chunking strategies"""
# Implementation needed
pass
# Da Implementare: Test per Embedding Service
class TestEmbeddingService:
async def test_bge_m3_generation(self):
"""Test BGE-M3 1024-dim embeddings"""
# Implementation needed
pass
async def test_batch_processing(self):
"""Test batch embedding generation"""
# Implementation needed
pass
Fase 3: Integration Testing (Settimane 5-6)
Database Integration (Da Implementare):
# Test con TestContainers
@pytest.fixture
async def test_db():
"""Spin up test PostgreSQL container"""
# Implementation needed
pass
@pytest.fixture
async def test_milvus():
"""Spin up test Milvus container"""
# Implementation needed
pass
class TestDatabaseIntegration:
async def test_document_lifecycle(self, test_db):
"""Test complete document CRUD operations"""
# Implementation needed
pass
async def test_vector_search(self, test_milvus):
"""Test Milvus vector search operations"""
# Implementation needed
pass
API Integration (Da Implementare):
// Test E2E flussi principali
describe("Document Processing Flow", () => {
it("should process document from upload to search", async () => {
// 1. Upload document
// 2. Wait for processing
// 3. Verify chunks created
// 4. Test search functionality
// Implementation needed
});
});
Fase 4: Performance & Load Testing (Settimane 7-8)
k6 Performance Tests (Da Implementare):
// k6-tests/document-upload.js
import http from "k6/http";
import { check } from "k6";
export const options = {
stages: [
{ duration: "2m", target: 100 },
{ duration: "5m", target: 100 },
{ duration: "2m", target: 0 },
],
thresholds: {
http_req_duration: ["p(95)<500"],
http_req_failed: ["rate<0.05"],
},
};
export default function () {
// Test document upload performance
// Implementation needed
}
10.3 Metriche Target
Una volta implementati, i test dovrebbero raggiungere:
- Unit Test Coverage: ≥ 80% per business logic
- Integration Test Coverage: Tutti i flussi critici
- Performance Benchmarks:
- API Response: < 200ms (p95)
- Document Processing: < 2 min per 100 pagine
- Search Latency: < 100ms (p95)
- Reliability: 99.9% test success rate in CI
10.4 CI/CD Integration (Da Configurare)
# .github/workflows/test.yml (Da Creare)
name: Test Suite
on: [push, pull_request]
jobs:
frontend-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
- run: pnpm install
- run: pnpm test:frontend
- run: pnpm test:frontend:coverage
backend-tests:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
redis:
image: redis:7
milvus:
image: milvusdb/milvus:2.4-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- run: uv sync
- run: uv run pytest
- run: uv run pytest --cov
10.5 Priorità Implementazione
-
Critical Path Testing (Prima priorità):
- Authentication flow
- Document upload/processing
- Search functionality
- Chat API
-
Core Services (Seconda priorità):
- Chunking algorithms
- Embedding generation
- Permission system
- Background tasks
-
UI Components (Terza priorità):
- Forms validation
- Error handling
- Loading states
- Data tables
-
Performance (Quarta priorità):
- Load testing
- Stress testing
- Benchmark suite
- API: Supertest per API endpoint testing
- Message Queue: Embedded Redis per Celery testing
- File Processing: Test files con formati multipli
# Integration test con TestContainers
import pytest
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer
from testcontainers.minio import MinioContainer
from sqlalchemy import create_engine
from app.config import get_settings
@pytest.fixture(scope="session")
def postgres_container():
with PostgresContainer("postgres:15") as postgres:
yield postgres
@pytest.fixture(scope="session")
def redis_container():
with RedisContainer("redis:7-alpine") as redis:
yield redis
@pytest.fixture(scope="session")
def minio_container():
with MinioContainer() as minio:
yield minio
@pytest.fixture
def integration_db(postgres_container):
"""Database engine for integration tests"""
engine = create_engine(postgres_container.get_connection_url())
# Run migrations
from alembic.config import Config
from alembic import command
alembic_cfg = Config("alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", postgres_container.get_connection_url())
command.upgrade(alembic_cfg, "head")
yield engine
engine.dispose()
@pytest.mark.integration
def test_full_document_pipeline(integration_db, redis_container, minio_container):
"""Test complete document processing pipeline"""
# 1. Upload document to MinIO
# 2. Create document record in PostgreSQL
# 3. Trigger Celery task
# 4. Verify chunks in database
# 5. Check embeddings in Milvus (mocked)
pass
10.3 Performance Testing
- Tool: k6 per load testing
- Scenario: 1000 utenti concorrenti per 10 minuti
- Criteri: Response time < 200ms per 95% requests
- Monitoring: Real-time metrics durante test
// k6 performance test script
import http from "k6/http";
import { check, sleep } from "k6";
import { Rate, Trend } from "k6/metrics";
// Custom metrics
const errorRate = new Rate("errors");
const searchLatency = new Trend("search_latency");
export const options = {
stages: [
{ duration: "2m", target: 100 }, // Ramp up
{ duration: "5m", target: 1000 }, // Stay at 1000 users
{ duration: "2m", target: 0 }, // Ramp down
],
thresholds: {
http_req_duration: ["p(95)<200"], // 95% of requests under 200ms
http_req_failed: ["rate<0.02"], // Error rate under 2%
errors: ["rate<0.02"],
},
};
const BASE_URL = "https://emblema.local";
const AUTH_TOKEN = "your-jwt-token";
export function setup() {
// Login and get auth token
const loginResponse = http.post(`${BASE_URL}/api/auth/signin`, {
email: "test@example.com",
password: "password123",
});
return { token: loginResponse.json("token") };
}
export default function (data) {
const headers = {
Authorization: `Bearer ${data.token}`,
"Content-Type": "application/json",
};
// Test document search
const searchStart = Date.now();
const searchResponse = http.post(
`${BASE_URL}/api/v1/search`,
JSON.stringify({
query: "machine learning algorithms",
limit: 10,
knowledgeBaseIds: ["kb-123"],
}),
{ headers },
);
const searchDuration = Date.now() - searchStart;
searchLatency.add(searchDuration);
check(searchResponse, {
"search status is 200": (r) => r.status === 200,
"search has results": (r) => JSON.parse(r.body).results.length > 0,
"search under 500ms": () => searchDuration < 500,
}) || errorRate.add(1);
// Test chat API
const chatResponse = http.post(
`${BASE_URL}/api/v1/chat`,
JSON.stringify({
id: `chat-${__VU}-${__ITER}`,
messages: [{ role: "user", content: "Explain quantum computing" }],
selectedChatModel: "gpt-4o-mini",
}),
{ headers },
);
check(chatResponse, {
"chat status is 200": (r) => r.status === 200,
"chat response not empty": (r) => r.body.length > 0,
}) || errorRate.add(1);
sleep(1);
}
export function teardown(data) {
// Cleanup operations
console.log("Performance test completed");
}
10.3 Priorità di Implementazione
Fase 1: Foundation Testing (Settimane 1-2)
- Setup Testing Infrastructure:
- Configurazione Jest per frontend
- Setup pytest per backend
- TestContainers per integration tests
- CI/CD pipeline per automated testing
Fase 2: Core Unit Tests (Settimane 3-4)
-
API Endpoints Critici:
- Hierarchical API endpoints
- Chat API functionality
- File upload/processing
- Search API
-
Business Logic:
- Document chunking strategies
- BGE-M3 embedding generation
- Permission/authorization logic
- File processing pipeline
Fase 3: Integration Testing (Settimane 5-6)
- End-to-End Workflows:
- Document upload → processing → search
- User authentication → API access
- Chat with knowledge base integration
- Hierarchical entity relationships
Fase 4: Performance & Load Testing (Settimane 7-8)
- Performance Baselines:
- Hierarchical API response times
- Search latency optimization
- File processing throughput
- Concurrent user handling
10.4 Metriche di Successo Target
- Code Coverage: Minimo 80% per componenti business-critical
- Test Execution Time: Suite completa < 15 minuti
- Flaky Tests: Tasso di fallimento casuale < 1%
- Regression Detection: Identificazione automatica breaking changes
- Performance Baseline: Response time < 200ms per 95% richieste API
- Integration Success: Pipeline complete senza intervento manuale
11. Gestione Errori e Fallback
11.1 Strategie di Fallback
- Circuit Breaker: Hystrix pattern per servizi esterni
- Retry Policy: Exponential backoff con jitter
- Timeout: 30 secondi per operazioni sincrone, 1 ora per processing
- Graceful Degradation: Funzionalità ridotte in caso di fallimenti
// Circuit breaker implementation
class CircuitBreaker {
private failures = 0;
private lastFailureTime = 0;
private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED";
constructor(
private threshold = 5,
private timeout = 60000, // 1 minute
private monitoringPeriod = 10000, // 10 seconds
) {}
async execute<T>(
operation: () => Promise<T>,
fallback?: () => Promise<T>,
): Promise<T> {
if (this.state === "OPEN") {
if (Date.now() - this.lastFailureTime > this.timeout) {
this.state = "HALF_OPEN";
} else {
if (fallback) {
console.warn("Circuit breaker OPEN, executing fallback");
return await fallback();
}
throw new Error("Circuit breaker is OPEN");
}
}
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
if (fallback && this.state === "OPEN") {
console.warn("Operation failed, executing fallback");
return await fallback();
}
throw error;
}
}
private onSuccess() {
this.failures = 0;
this.state = "CLOSED";
}
private onFailure() {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= this.threshold) {
this.state = "OPEN";
console.error(`Circuit breaker opened after ${this.failures} failures`);
}
}
}
// Retry with exponential backoff
class RetryPolicy {
static async execute<T>(
operation: () => Promise<T>,
maxRetries = 3,
baseDelay = 1000,
maxDelay = 30000,
jitter = true,
): Promise<T> {
let lastError: Error;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;
if (attempt === maxRetries) {
break;
}
// Calculate delay with exponential backoff
let delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
// Add jitter to prevent thundering herd
if (jitter) {
delay = delay * (0.5 + Math.random() * 0.5);
}
console.warn(
`Operation failed, retrying in ${delay}ms (attempt ${attempt + 1}/${maxRetries})`,
);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
throw new Error(
`Operation failed after ${maxRetries} retries: ${lastError.message}`,
);
}
}
11.2 Gestione Eccezioni Globale
# FastAPI global exception handler
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import traceback
import logging
logger = logging.getLogger(__name__)
class EmblemaException(Exception):
"""Base exception for Emblema platform"""
def __init__(self, message: str, code: str = "GENERIC_ERROR", status_code: int = 500):
self.message = message
self.code = code
self.status_code = status_code
super().__init__(message)
class ProcessingError(EmblemaException):
"""Document processing related errors"""
def __init__(self, message: str, document_id: str = None):
super().__init__(message, "PROCESSING_ERROR", 422)
self.document_id = document_id
class SearchError(EmblemaException):
"""Search service related errors"""
def __init__(self, message: str, query: str = None):
super().__init__(message, "SEARCH_ERROR", 500)
self.query = query
@app.exception_handler(EmblemaException)
async def emblema_exception_handler(request: Request, exc: EmblemaException):
"""Handle custom Emblema exceptions"""
logger.error(f"Emblema exception: {exc.code} - {exc.message}", extra={
'error_code': exc.code,
'path': request.url.path,
'method': request.method
})
return JSONResponse(
status_code=exc.status_code,
content={
'error': {
'code': exc.code,
'message': exc.message,
'timestamp': datetime.utcnow().isoformat()
}
}
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle Pydantic validation errors"""
errors = []
for error in exc.errors():
errors.append({
'field': '.'.join(str(x) for x in error['loc'][1:]),
'message': error['msg'],
'type': error['type']
})
return JSONResponse(
status_code=422,
content={
'error': {
'code': 'VALIDATION_ERROR',
'message': 'Request validation failed',
'details': errors
}
}
)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Catch-all exception handler"""
logger.error(f"Unexpected error: {str(exc)}", extra={
'path': request.url.path,
'method': request.method,
'traceback': traceback.format_exc()
})
return JSONResponse(
status_code=500,
content={
'error': {
'code': 'INTERNAL_SERVER_ERROR',
'message': 'An unexpected error occurred'
}
}
)
12. Considerazioni Non Funzionali
12.1 Maintainability
- Clean Code: Principi SOLID e design patterns consolidati
- Documentation: Comprehensive inline documentation e README
- Code Reviews: Mandatory PR reviews con automated checks
- Refactoring: Continuous code improvement e debt reduction
// Example: Clean architecture pattern per business logic
interface DocumentService {
create(data: CreateDocumentRequest): Promise<Document>;
process(id: string, config: ProcessingConfig): Promise<Task>;
search(query: SearchQuery): Promise<SearchResults>;
}
class DocumentServiceImpl implements DocumentService {
constructor(
private documentRepository: DocumentRepository,
private fileStorageService: FileStorageService,
private processingService: ProcessingService,
private notificationService: NotificationService,
) {}
async create(data: CreateDocumentRequest): Promise<Document> {
// 1. Validate input
await this.validateCreateRequest(data);
// 2. Store file
const file = await this.fileStorageService.upload(data.file);
// 3. Create document record
const document = await this.documentRepository.create({
name: data.name,
type: file.extension,
status: "pending",
fileId: file.id,
createdBy: data.userId,
});
// 4. Trigger processing
const task = await this.processingService.scheduleProcessing(
document.id,
data.config,
);
// 5. Send notification
await this.notificationService.notifyDocumentCreated(document, data.userId);
return document;
}
private async validateCreateRequest(
data: CreateDocumentRequest,
): Promise<void> {
if (!data.file) {
throw new ValidationError("File is required");
}
if (data.file.size > MAX_FILE_SIZE) {
throw new ValidationError("File size exceeds limit");
}
const allowedTypes = ["pdf", "mp3", "mp4", "txt", "md"];
if (!allowedTypes.includes(data.file.extension)) {
throw new ValidationError("Unsupported file type");
}
}
}
12.2 Scalability
- Horizontal Scaling: Microservices stateless con load balancing
- Database Sharding: Partitioning strategy per large datasets
- Caching Layers: Multi-level caching con invalidation
- Async Processing: Message queues per decoupling
# Horizontal scaling configuration per Celery workers
from celery import Celery
from kombu import Queue
# Queue specialization per document types
celery_app = Celery('emblema')
celery_app.conf.update(
# Task routing per specialized workers
task_routes={
'process_pdf_document': {'queue': 'pdf_processing'},
'process_audio_document': {'queue': 'audio_processing'},
'process_video_document': {'queue': 'video_processing'},
'generate_embeddings': {'queue': 'embedding_generation'},
'send_notification': {'queue': 'notifications'}
},
# Performance optimizations
worker_prefetch_multiplier=1,
task_acks_late=True,
worker_disable_rate_limits=True,
# Auto-scaling configuration
worker_autoscaler='celery.worker.autoscale:Autoscaler',
worker_max_tasks_per_child=1000,
# Queue configurations
task_queues=[
Queue('pdf_processing', routing_key='pdf'),
Queue('audio_processing', routing_key='audio'),
Queue('video_processing', routing_key='video'),
Queue('embedding_generation', routing_key='embeddings'),
Queue('notifications', routing_key='notifications')
]
)
# Database connection pooling
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Connection pool size
max_overflow=30, # Additional connections
pool_pre_ping=True, # Validate connections
pool_recycle=3600 # Recycle connections every hour
)
12.3 Reliability
- Health Checks: Comprehensive service monitoring
- Graceful Shutdown: Clean service termination
- Data Consistency: ACID transactions e compensating actions
- Backup Strategy: Automated database e file backups
# Health check endpoints
from fastapi import FastAPI, HTTPException
from sqlalchemy import text
from redis.exceptions import ConnectionError as RedisConnectionError
@app.get("/health")
async def health_check():
"""Basic health check"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.get("/health/detailed")
async def detailed_health_check():
"""Detailed health check with dependencies"""
health_status = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"dependencies": {}
}
# Database health
try:
async with get_db_session() as db:
await db.execute(text("SELECT 1"))
health_status["dependencies"]["database"] = "healthy"
except Exception as e:
health_status["dependencies"]["database"] = f"unhealthy: {str(e)}"
health_status["status"] = "degraded"
# Redis health
try:
redis_client = get_redis_client()
await redis_client.ping()
health_status["dependencies"]["redis"] = "healthy"
except RedisConnectionError as e:
health_status["dependencies"]["redis"] = f"unhealthy: {str(e)}"
health_status["status"] = "degraded"
# Milvus health
try:
milvus_client = get_milvus_client()
collections = await milvus_client.list_collections()
health_status["dependencies"]["milvus"] = "healthy"
except Exception as e:
health_status["dependencies"]["milvus"] = f"unhealthy: {str(e)}"
health_status["status"] = "degraded"
if health_status["status"] == "degraded":
raise HTTPException(status_code=503, detail=health_status)
return health_status
# Graceful shutdown handler
import signal
import asyncio
class GracefulShutdown:
def __init__(self):
self.shutdown = False
self.tasks = set()
def signal_handler(self, signum, frame):
print(f"Received signal {signum}, initiating graceful shutdown...")
self.shutdown = True
async def wait_for_tasks(self, timeout=30):
"""Wait for running tasks to complete"""
if not self.tasks:
return
print(f"Waiting for {len(self.tasks)} tasks to complete...")
try:
await asyncio.wait_for(
asyncio.gather(*self.tasks, return_exceptions=True),
timeout=timeout
)
except asyncio.TimeoutError:
print(f"Timeout reached, cancelling {len(self.tasks)} remaining tasks")
for task in self.tasks:
task.cancel()
# Wait a bit more for cancellation
await asyncio.sleep(1)
def register_task(self, task):
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
# Initialize graceful shutdown
shutdown_handler = GracefulShutdown()
signal.signal(signal.SIGTERM, shutdown_handler.signal_handler)
signal.signal(signal.SIGINT, shutdown_handler.signal_handler)
13. Rischi e Mitigazioni
| Rischio | Probabilità | Impatto | Mitigazione |
| Milvus vector DB overload | Media | Alto | Connection pooling, search query optimization, read replicas |
| File processing memory leak | Bassa | Alto | Resource monitoring, worker restart policies, memory limits |
| BGE-M3 model unavailability | Bassa | Molto Alto | Local model caching, fallback embedding services, model replication |
| MinIO storage corruption | Bassa | Alto | Backup replication, integrity checks, disaster recovery procedures |
| Keycloak authentication failure | Media | Molto Alto | HA Keycloak setup, session caching, graceful degradation |
| PostgreSQL connection exhaustion | Media | Alto | Connection pooling, query optimization, read replicas |
| Celery worker deadlock | Media | Medio | Task timeout policies, worker health monitoring, automatic restart |
| LLM API rate limiting | Alta | Medio | Multiple provider integration, request queuing, caching strategies |
14. Appendici
14.1 Glossario
- BGE-M3: Beijing Academy of Artificial Intelligence's multilingual embedding model (1024 dimensions)
- MCP: Model Context Protocol per integrazione AI tools
- Chunking: Processo di suddivisione documenti in segmenti per elaborazione
- Milvus: Vector database open-source per similarity search
- Celery: Distributed task queue per Python
- Keycloak: Identity and access management solution
- MinIO: S3-compatible object storage
- Traefik: Modern reverse proxy e load balancer
- LiteLLM: Unified interface per multiple LLM providers
14.2 Riferimenti Tecnici
- Next.js 14 Documentation
- FastAPI Documentation
- Milvus Documentation
- Celery Documentation
- Keycloak Documentation
- Docker Compose Reference
- BGE-M3 Model
- MinerU PDF Processing
- WhisperX Audio Transcription
Fine Documento
Questo documento è confidenziale e proprietario di Emblema AI Platform. La riproduzione o distribuzione non autorizzata è vietata.
Low Level Design v1.0 - Documento tecnico completo per implementazione sistema Emblema AI Platform