Layer Dati
Il layer dati di Emblema è basato su un'architettura distribuita che combina diversi database specializzati per gestire efficacemente diversi tipi di dati. Questa sezione descrive in dettaglio i componenti principali e le loro configurazioni.
PostgreSQL
PostgreSQL serve come database principale per i dati relazionali e metadati dell'applicazione. Utilizza l'estensione pgcrypto per la generazione di UUID e supporta timestamptz per la gestione precisa dei tempi.
Schema Database
Il database è strutturato attorno a entità core con un sistema di permessi avanzato:
Entità Principali
-- Tabella File - gestione dei file caricati
CREATE TABLE "public"."file" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"created_at" timestamptz NOT NULL DEFAULT now(),
"created_by" uuid NOT NULL,
"filename" text NOT NULL,
"extension" text NOT NULL,
"size" bigint NOT NULL,
"mimetype" text NOT NULL,
"duration" numeric,
"hash" text NOT NULL,
"storage_key" text NOT NULL,
"entity_type" text NOT NULL DEFAULT 'File',
PRIMARY KEY ("id")
);
-- Tabella Document - documenti processati
CREATE TABLE "public"."document" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"name" text NOT NULL,
"file_id" uuid NOT NULL,
"created_at" timestamptz NOT NULL DEFAULT now(),
"created_by" uuid NOT NULL,
"status" text NOT NULL DEFAULT 'PENDING',
"parser_name" text,
"parser_config" jsonb,
"entity_type" text NOT NULL DEFAULT 'Document',
PRIMARY KEY ("id"),
FOREIGN KEY ("file_id") REFERENCES "public"."file"("id")
);
-- Tabella Knowledge Base - contenitori logici
CREATE TABLE "public"."knowledge_base" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"name" text NOT NULL,
"description" text,
"created_at" timestamptz NOT NULL DEFAULT now(),
"created_by" uuid NOT NULL,
"status" text NOT NULL DEFAULT 'ACTIVE',
"custom_data" jsonb,
"begin_at" timestamptz,
"end_at" timestamptz,
"entity_type" text NOT NULL DEFAULT 'KnowledgeBase',
PRIMARY KEY ("id")
);
-- Tabella Task - gestione processi asincroni
CREATE TABLE "public"."task" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"name" text NOT NULL,
"status" text NOT NULL DEFAULT 'PENDING',
"progress" numeric DEFAULT 0,
"entity_id" uuid,
"entity_type" text,
"parent_id" uuid,
"type" text NOT NULL,
"config" jsonb,
"ref_id" text,
"max_retry" integer DEFAULT 3,
"result_url" text,
"created_at" timestamptz NOT NULL DEFAULT now(),
"created_by" uuid NOT NULL,
PRIMARY KEY ("id"),
UNIQUE ("ref_id")
);
Sistema di Permessi
Emblema implementa un sistema di permessi a 3 livelli:
-- Permessi su singoli record
CREATE TABLE "public"."record_permission" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"entity_id" uuid NOT NULL,
"entity_type" text NOT NULL,
"user_id" uuid NOT NULL,
"permission" text NOT NULL,
"parent_id" uuid,
"created_by" uuid NOT NULL,
"created_at" timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY ("id")
);
-- Permessi su interi moduli
CREATE TABLE "public"."entity_permission" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"entity_type" text NOT NULL,
"user_id" uuid NOT NULL,
"permission" text NOT NULL,
"created_by" uuid NOT NULL,
"created_at" timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY ("id")
);
Configurazione Docker
# docker-compose-data-source.yaml
postgres-vector:
image: postgres:16.9
restart: always
volumes:
- emblema-hasura-data:/var/lib/postgresql/data
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
networks:
- emblema
Integrazione con Hasura
Hasura GraphQL Engine fornisce un'API GraphQL real-time sopra PostgreSQL:
graphql-engine:
image: hasura/graphql-engine:v2.45.0
environment:
HASURA_GRAPHQL_METADATA_DATABASE_URL: postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres-vector:5432/postgres
PG_DATABASE_URL: postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres-vector:5432/postgres
HASURA_GRAPHQL_EXPERIMENTAL_FEATURES: "naming_convention"
HASURA_GRAPHQL_ADMIN_SECRET: ${HASURA_ADMIN_SECRET}
Milvus
Milvus è il database vettoriale utilizzato per la ricerca semantica. Gestisce embeddings di documenti chunked con dimensioni di 1024 (BGE-M3 model).
Architettura Milvus
Configurazione
# docker-compose-data-source.yaml
milvus:
image: milvusdb/milvus:v2.5.12
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
MINIO_ACCESS_KEY_ID: ${MINIO_ROOT_USER}
MINIO_SECRET_ACCESS_KEY: ${MINIO_ROOT_PASSWORD}
MINIO_REGION: ${MINIO_REGION}
volumes:
- ./config/milvus/config.yaml:/milvus/configs/milvus.yaml
- emblema-milvus-data:/var/lib/milvus
ports:
- "19530:19530" # Milvus gRPC
- "9091:9091" # Milvus HTTP
Client Configuration
# apps/spp-retrieval/app/helper/milvus.py
from pymilvus import MilvusClient
milvus_client = MilvusClient(
uri=MILVUS_URI,
token=MILVUS_TOKEN,
db_name=MILVUS_DB_NAME
)
Schema Collections
Le collections seguono questo schema:
# Schema per embeddings di chunk
{
"collection_name": "chunks_embeddings",
"dimension": 1024, # BGE-M3 model
"fields": [
{"name": "id", "type": "varchar", "is_primary": True},
{"name": "vector", "type": "float_vector", "dim": 1024},
{"name": "text", "type": "varchar"},
{"name": "document_id", "type": "varchar"},
{"name": "chunk_order", "type": "int64"},
{"name": "created_by", "type": "varchar"},
{"name": "metadata", "type": "json"}
],
"metric_type": "COSINE",
"index_type": "IVF_FLAT"
}
Amministrazione
Attu fornisce un'interfaz web per la gestione:
# docker-compose-admin-tool.yml
attu:
image: zilliz/attu:dev
environment:
MILVUS_URL: milvus:19530
labels:
- "traefik.http.routers.attu.rule=Host(`${MILVUS_ATTU_HOSTNAME}`)"
MinIO
MinIO gestisce l'object storage per tutti i file della piattaforma, organizzati in una struttura gerarchica basata su data e UUID.
Struttura Storage
bucket/
├── 2024/01/15/uuid/ # File post-ottimizzazione
│ ├── original.pdf # File originale
│ ├── optimized.pdf # File ottimizzato (se diverso)
│ ├── manifest.json # Manifest v2.0
│ └── artifacts/ # Artifacts processor-specific
│ ├── mineru/ # Per documenti PDF
│ │ ├── _layout.pdf
│ │ ├── _middle.json
│ │ └── content_list.json
│ └── whisperx/ # Per file audio/video
│ └── diarization.json
└── temp/ # Upload temporanei
└── uuid/
└── chunked-upload-parts/
Manifest v2.0
{
"version": "2.0",
"file_id": "uuid",
"storage_key": "2024/01/15/uuid",
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T10:35:00Z",
"files": {
"original": { "name": "document.pdf", "size": 1024000 },
"optimized": "optimized.pdf"
},
"artifacts": {
"mineru": ["_layout.pdf", "_middle.json"],
"diarization": ["diarization.json"]
},
"processing": {
"mineru": { "pages": 10, "model": "MinerU-0.9.4" },
"audio": { "model": "whisperx-large-v3-turbo", "speakers": 3 }
},
"metadata": {
"type": "document",
"pages": 10,
"duration": 300.5
}
}
Configurazione Docker
# docker-compose-data-source.yaml
minio:
image: quay.io/minio/minio
command: server /data --console-address ":9090"
volumes:
- emblema-minio-data:/data
environment:
- MINIO_ROOT_USER=${MINIO_ROOT_USER}
- MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD}
- MINIO_REGION=${MINIO_REGION}
labels:
- "traefik.http.routers.minio.rule=Host(`${MINIO_HOSTNAME}`)"
- "traefik.http.services.minio.loadbalancer.server.port=9000"
- "traefik.http.routers.minioadmin.rule=Host(`${MINIO_ADMIN_HOSTNAME}`)"
- "traefik.http.services.minioadmin.loadbalancer.server.port=9090"
API Integration
# Esempio di utilizzo nell'applicazione
from minio import Minio
client = Minio(
endpoint=MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=MINIO_SECURE
)
# Upload con struttura gerarchica
def upload_file(file_path, file_id):
date_path = datetime.now().strftime("%Y/%m/%d")
storage_key = f"{date_path}/{file_id}"
client.fput_object(
bucket_name=BUCKET_NAME,
object_name=f"{storage_key}/original.{ext}",
file_path=file_path
)
Redis
Redis funge da cache distribuita e message broker per Celery, gestendo code di task asincroni e cache di sessione.
Architettura Redis
Configurazione Master
# docker-compose-base.yaml
redis-master:
image: redis:7.4.4
command: redis-server --requirepass ${REDIS_MASTER_PASSWORD} --appendonly yes
volumes:
- emblema-redis-master-data:/data
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "-a", "${REDIS_MASTER_PASSWORD}", "ping"]
interval: 5s
timeout: 2s
retries: 3
Code Celery
Redis gestisce diverse code per i task asincroni:
# Configurazione Celery
CELERY_BROKER_URL = f"redis://:{REDIS_PASSWORD}@redis-master:6379/0"
CELERY_RESULT_BACKEND = f"redis://:{REDIS_PASSWORD}@redis-master:6379/0"
# Code specializzate
CELERY_ROUTES = {
'tasks.document_processing': {'queue': 'document'},
'tasks.audio_processing': {'queue': 'audio'},
'tasks.embedding_generation': {'queue': 'embedding'},
'tasks.artifact_generation': {'queue': 'artifact'}
}
Cache Management
# Esempio di cache service
class CacheService:
def __init__(self):
self.redis = redis.Redis(
host='redis-master',
port=6379,
password=REDIS_PASSWORD,
decode_responses=True
)
def cache_search_results(self, query_hash, results, ttl=3600):
"""Cache risultati di ricerca semantica"""
key = f"search:{query_hash}"
self.redis.setex(key, ttl, json.dumps(results))
def get_cached_results(self, query_hash):
"""Recupera risultati cached"""
key = f"search:{query_hash}"
cached = self.redis.get(key)
return json.loads(cached) if cached else None
Monitoring con RedisInsight
# docker-compose-admin-tool.yml
redisinsight:
image: redislabs/redisinsight:latest
environment:
- RI_HOST=redis-master
labels:
- "traefik.http.routers.redisinsight.rule=Host(`${REDIS_INSIGHT_HOSTNAME}`)"
Integrazione Complessiva
Flusso Dati Tipico
Performance e Scalabilità
- PostgreSQL: Ottimizzato per OLTP con indici su campi frequently queried
- Milvus: Auto-scaling delle collections, indici IVF per ricerca veloce
- MinIO: Distribuzione automatica su più drive, erasure coding
- Redis: Replica read-only per load balancing, persistenza AOF
Backup e Disaster Recovery
# Backup automatizzato PostgreSQL
./scripts/hasura-backup/scheduled-backup.sh
# Backup Milvus collections
./scripts/milvus-backup/scheduled-backup.sh
# Sync MinIO verso storage esterno
mc mirror minio/emblema-bucket s3/backup-bucket
Riferimenti
- PostgreSQL 16 Documentation
- Milvus v2.5 Documentation
- MinIO Documentation
- Redis Documentation
- Hasura GraphQL Engine
Prossimi Passi
- Esplora Autenticazione: Sistema Keycloak e JWT
- Comprendi Architettura: Servizi backend e API
- Analizza Frontend: Applicazione Next.js e componenti UI
- Approfondisci AI Services: Pipeline di processing e chunking