Passa al contenuto principale

Layer Dati

Il layer dati di Emblema è basato su un'architettura distribuita che combina diversi database specializzati per gestire efficacemente diversi tipi di dati. Questa sezione descrive in dettaglio i componenti principali e le loro configurazioni.

Loading diagram...

PostgreSQL

PostgreSQL serve come database principale per i dati relazionali e metadati dell'applicazione. Utilizza l'estensione pgcrypto per la generazione di UUID e supporta timestamptz per la gestione precisa dei tempi.

Schema Database

Il database è strutturato attorno a entità core con un sistema di permessi avanzato:

Entità Principali

-- Tabella File - gestione dei file caricati
CREATE TABLE "public"."file" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"created_at" timestamptz NOT NULL DEFAULT now(),
"created_by" uuid NOT NULL,
"filename" text NOT NULL,
"extension" text NOT NULL,
"size" bigint NOT NULL,
"mimetype" text NOT NULL,
"duration" numeric,
"hash" text NOT NULL,
"storage_key" text NOT NULL,
"entity_type" text NOT NULL DEFAULT 'File',
PRIMARY KEY ("id")
);

-- Tabella Document - documenti processati
CREATE TABLE "public"."document" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"name" text NOT NULL,
"file_id" uuid NOT NULL,
"created_at" timestamptz NOT NULL DEFAULT now(),
"created_by" uuid NOT NULL,
"status" text NOT NULL DEFAULT 'PENDING',
"parser_name" text,
"parser_config" jsonb,
"entity_type" text NOT NULL DEFAULT 'Document',
PRIMARY KEY ("id"),
FOREIGN KEY ("file_id") REFERENCES "public"."file"("id")
);

-- Tabella Knowledge Base - contenitori logici
CREATE TABLE "public"."knowledge_base" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"name" text NOT NULL,
"description" text,
"created_at" timestamptz NOT NULL DEFAULT now(),
"created_by" uuid NOT NULL,
"status" text NOT NULL DEFAULT 'ACTIVE',
"custom_data" jsonb,
"begin_at" timestamptz,
"end_at" timestamptz,
"entity_type" text NOT NULL DEFAULT 'KnowledgeBase',
PRIMARY KEY ("id")
);

-- Tabella Task - gestione processi asincroni
CREATE TABLE "public"."task" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"name" text NOT NULL,
"status" text NOT NULL DEFAULT 'PENDING',
"progress" numeric DEFAULT 0,
"entity_id" uuid,
"entity_type" text,
"parent_id" uuid,
"type" text NOT NULL,
"config" jsonb,
"ref_id" text,
"max_retry" integer DEFAULT 3,
"result_url" text,
"created_at" timestamptz NOT NULL DEFAULT now(),
"created_by" uuid NOT NULL,
PRIMARY KEY ("id"),
UNIQUE ("ref_id")
);

Sistema di Permessi

Emblema implementa un sistema di permessi a 3 livelli:

-- Permessi su singoli record
CREATE TABLE "public"."record_permission" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"entity_id" uuid NOT NULL,
"entity_type" text NOT NULL,
"user_id" uuid NOT NULL,
"permission" text NOT NULL,
"parent_id" uuid,
"created_by" uuid NOT NULL,
"created_at" timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY ("id")
);

-- Permessi su interi moduli
CREATE TABLE "public"."entity_permission" (
"id" uuid NOT NULL DEFAULT gen_random_uuid(),
"entity_type" text NOT NULL,
"user_id" uuid NOT NULL,
"permission" text NOT NULL,
"created_by" uuid NOT NULL,
"created_at" timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY ("id")
);

Configurazione Docker

# docker-compose-data-source.yaml
postgres-vector:
image: postgres:16.9
restart: always
volumes:
- emblema-hasura-data:/var/lib/postgresql/data
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
networks:
- emblema

Integrazione con Hasura

Hasura GraphQL Engine fornisce un'API GraphQL real-time sopra PostgreSQL:

graphql-engine:
image: hasura/graphql-engine:v2.45.0
environment:
HASURA_GRAPHQL_METADATA_DATABASE_URL: postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres-vector:5432/postgres
PG_DATABASE_URL: postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres-vector:5432/postgres
HASURA_GRAPHQL_EXPERIMENTAL_FEATURES: "naming_convention"
HASURA_GRAPHQL_ADMIN_SECRET: ${HASURA_ADMIN_SECRET}

Milvus

Milvus è il database vettoriale utilizzato per la ricerca semantica. Gestisce embeddings di documenti chunked con dimensioni di 1024 (BGE-M3 model).

Architettura Milvus

Loading diagram...

Configurazione

# docker-compose-data-source.yaml
milvus:
image: milvusdb/milvus:v2.5.12
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
MINIO_ACCESS_KEY_ID: ${MINIO_ROOT_USER}
MINIO_SECRET_ACCESS_KEY: ${MINIO_ROOT_PASSWORD}
MINIO_REGION: ${MINIO_REGION}
volumes:
- ./config/milvus/config.yaml:/milvus/configs/milvus.yaml
- emblema-milvus-data:/var/lib/milvus
ports:
- "19530:19530" # Milvus gRPC
- "9091:9091" # Milvus HTTP

Client Configuration

# apps/spp-retrieval/app/helper/milvus.py
from pymilvus import MilvusClient

milvus_client = MilvusClient(
uri=MILVUS_URI,
token=MILVUS_TOKEN,
db_name=MILVUS_DB_NAME
)

Schema Collections

Le collections seguono questo schema:

# Schema per embeddings di chunk
{
"collection_name": "chunks_embeddings",
"dimension": 1024, # BGE-M3 model
"fields": [
{"name": "id", "type": "varchar", "is_primary": True},
{"name": "vector", "type": "float_vector", "dim": 1024},
{"name": "text", "type": "varchar"},
{"name": "document_id", "type": "varchar"},
{"name": "chunk_order", "type": "int64"},
{"name": "created_by", "type": "varchar"},
{"name": "metadata", "type": "json"}
],
"metric_type": "COSINE",
"index_type": "IVF_FLAT"
}

Amministrazione

Attu fornisce un'interfaz web per la gestione:

# docker-compose-admin-tool.yml
attu:
image: zilliz/attu:dev
environment:
MILVUS_URL: milvus:19530
labels:
- "traefik.http.routers.attu.rule=Host(`${MILVUS_ATTU_HOSTNAME}`)"

MinIO

MinIO gestisce l'object storage per tutti i file della piattaforma, organizzati in una struttura gerarchica basata su data e UUID.

Struttura Storage

bucket/
├── 2024/01/15/uuid/ # File post-ottimizzazione
│ ├── original.pdf # File originale
│ ├── optimized.pdf # File ottimizzato (se diverso)
│ ├── manifest.json # Manifest v2.0
│ └── artifacts/ # Artifacts processor-specific
│ ├── mineru/ # Per documenti PDF
│ │ ├── _layout.pdf
│ │ ├── _middle.json
│ │ └── content_list.json
│ └── whisperx/ # Per file audio/video
│ └── diarization.json
└── temp/ # Upload temporanei
└── uuid/
└── chunked-upload-parts/

Manifest v2.0

{
"version": "2.0",
"file_id": "uuid",
"storage_key": "2024/01/15/uuid",
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T10:35:00Z",
"files": {
"original": { "name": "document.pdf", "size": 1024000 },
"optimized": "optimized.pdf"
},
"artifacts": {
"mineru": ["_layout.pdf", "_middle.json"],
"diarization": ["diarization.json"]
},
"processing": {
"mineru": { "pages": 10, "model": "MinerU-0.9.4" },
"audio": { "model": "whisperx-large-v3-turbo", "speakers": 3 }
},
"metadata": {
"type": "document",
"pages": 10,
"duration": 300.5
}
}

Configurazione Docker

# docker-compose-data-source.yaml
minio:
image: quay.io/minio/minio
command: server /data --console-address ":9090"
volumes:
- emblema-minio-data:/data
environment:
- MINIO_ROOT_USER=${MINIO_ROOT_USER}
- MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD}
- MINIO_REGION=${MINIO_REGION}
labels:
- "traefik.http.routers.minio.rule=Host(`${MINIO_HOSTNAME}`)"
- "traefik.http.services.minio.loadbalancer.server.port=9000"
- "traefik.http.routers.minioadmin.rule=Host(`${MINIO_ADMIN_HOSTNAME}`)"
- "traefik.http.services.minioadmin.loadbalancer.server.port=9090"

API Integration

# Esempio di utilizzo nell'applicazione
from minio import Minio

client = Minio(
endpoint=MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=MINIO_SECURE
)

# Upload con struttura gerarchica
def upload_file(file_path, file_id):
date_path = datetime.now().strftime("%Y/%m/%d")
storage_key = f"{date_path}/{file_id}"

client.fput_object(
bucket_name=BUCKET_NAME,
object_name=f"{storage_key}/original.{ext}",
file_path=file_path
)

Redis

Redis funge da cache distribuita e message broker per Celery, gestendo code di task asincroni e cache di sessione.

Architettura Redis

Loading diagram...

Configurazione Master

# docker-compose-base.yaml
redis-master:
image: redis:7.4.4
command: redis-server --requirepass ${REDIS_MASTER_PASSWORD} --appendonly yes
volumes:
- emblema-redis-master-data:/data
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "-a", "${REDIS_MASTER_PASSWORD}", "ping"]
interval: 5s
timeout: 2s
retries: 3

Code Celery

Redis gestisce diverse code per i task asincroni:

# Configurazione Celery
CELERY_BROKER_URL = f"redis://:{REDIS_PASSWORD}@redis-master:6379/0"
CELERY_RESULT_BACKEND = f"redis://:{REDIS_PASSWORD}@redis-master:6379/0"

# Code specializzate
CELERY_ROUTES = {
'tasks.document_processing': {'queue': 'document'},
'tasks.audio_processing': {'queue': 'audio'},
'tasks.embedding_generation': {'queue': 'embedding'},
'tasks.artifact_generation': {'queue': 'artifact'}
}

Cache Management

# Esempio di cache service
class CacheService:
def __init__(self):
self.redis = redis.Redis(
host='redis-master',
port=6379,
password=REDIS_PASSWORD,
decode_responses=True
)

def cache_search_results(self, query_hash, results, ttl=3600):
"""Cache risultati di ricerca semantica"""
key = f"search:{query_hash}"
self.redis.setex(key, ttl, json.dumps(results))

def get_cached_results(self, query_hash):
"""Recupera risultati cached"""
key = f"search:{query_hash}"
cached = self.redis.get(key)
return json.loads(cached) if cached else None

Monitoring con RedisInsight

# docker-compose-admin-tool.yml
redisinsight:
image: redislabs/redisinsight:latest
environment:
- RI_HOST=redis-master
labels:
- "traefik.http.routers.redisinsight.rule=Host(`${REDIS_INSIGHT_HOSTNAME}`)"

Integrazione Complessiva

Flusso Dati Tipico

Loading diagram...

Performance e Scalabilità

  • PostgreSQL: Ottimizzato per OLTP con indici su campi frequently queried
  • Milvus: Auto-scaling delle collections, indici IVF per ricerca veloce
  • MinIO: Distribuzione automatica su più drive, erasure coding
  • Redis: Replica read-only per load balancing, persistenza AOF

Backup e Disaster Recovery

# Backup automatizzato PostgreSQL
./scripts/hasura-backup/scheduled-backup.sh

# Backup Milvus collections
./scripts/milvus-backup/scheduled-backup.sh

# Sync MinIO verso storage esterno
mc mirror minio/emblema-bucket s3/backup-bucket

Riferimenti

Prossimi Passi

  1. Esplora Autenticazione: Sistema Keycloak e JWT
  2. Comprendi Architettura: Servizi backend e API
  3. Analizza Frontend: Applicazione Next.js e componenti UI
  4. Approfondisci AI Services: Pipeline di processing e chunking

Questa pagina ti è stata utile?