Passa al contenuto principale

Sviluppo Backend

Guida completa allo sviluppo dei servizi backend Python con FastAPI, Celery, e integrazione GraphQL nell'ecosistema Emblema.

πŸš€ Stack Tecnologico​

Core Framework​

  • FastAPI: API framework moderno con validazione automatica
  • Celery: Task queue per elaborazioni asincrone
  • Redis: Message broker e caching
  • PostgreSQL: Database relazionale principale
  • Milvus: Database vettoriale per ricerca semantica

AI e Machine Learning​

  • OpenAI/LiteLLM: LLM API gateway
  • Transformers: Modelli Hugging Face
  • WhisperX: Speech-to-text con diarizzazione
  • MinerU: Estrazione avanzata PDF
  • BGE-M3: Embeddings multilingue

Gestione Dipendenze​

  • uv: Package manager Python moderno
  • Docker: Containerizzazione
  • pyproject.toml: Configurazione dipendenze

πŸ“ Struttura Servizi Backend​

apps/
β”œβ”€β”€ background-task/ # Worker principale AI
β”‚ β”œβ”€β”€ app/
β”‚ β”‚ β”œβ”€β”€ main.py # FastAPI application
β”‚ β”‚ β”œβ”€β”€ celery_app.py # Celery configuration
β”‚ β”‚ β”œβ”€β”€ tasks.py # Celery tasks
β”‚ β”‚ β”œβ”€β”€ chunkers/ # Document chunking
β”‚ β”‚ β”œβ”€β”€ handlers/ # Business logic
β”‚ β”‚ β”œβ”€β”€ routers/ # API endpoints
β”‚ β”‚ └── utils/ # Utilities
β”‚ β”œβ”€β”€ pyproject.toml # Dependencies
β”‚ └── Dockerfile # Container config
β”œβ”€β”€ document-render/ # Rendering documenti
β”œβ”€β”€ novu-bridge/ # Bridge notifiche
└── mcp-demo/ # Model Context Protocol

πŸ—οΈ Pattern Servizi Specializzati​

Ogni servizio backend Γ¨ dedicato a una funzione specifica.

1. Struttura Base Servizio​

# apps/my-service/app/main.py
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from .dependencies import get_settings
from .routers import my_router

def create_app() -> FastAPI:
app = FastAPI(
title="My Service",
description="Service description",
version="1.0.0",
)

# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# Include routers
app.include_router(
my_router.router,
prefix="/api/v1",
tags=["my-service"]
)

@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "my-service"}

return app

app = create_app()

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

2. Dependency Injection​

# apps/my-service/app/dependencies.py
from functools import lru_cache
from pydantic_settings import BaseSettings
import redis
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

class Settings(BaseSettings):
database_url: str = "postgresql://user:pass@localhost/db"
redis_url: str = "redis://localhost:6379"
api_key: str = ""

class Config:
env_file = ".env"

@lru_cache()
def get_settings() -> Settings:
return Settings()

# Database connection
engine = create_engine(get_settings().database_url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

# Redis connection
@lru_cache()
def get_redis():
return redis.from_url(get_settings().redis_url)

3. Router Structure​

# apps/my-service/app/routers/my_router.py
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.orm import Session
from typing import List
from ..dependencies import get_db, get_settings
from ..handlers import my_handler
from ..schemas import MySchema, MyResponse

router = APIRouter()

@router.post("/items", response_model=MyResponse)
async def create_item(
item: MySchema,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
settings = Depends(get_settings),
):
"""Create a new item with background processing."""
try:
# Validate and create item
db_item = await my_handler.create_item(db, item)

# Add background task
background_tasks.add_task(
my_handler.process_item_async,
db_item.id,
settings
)

return MyResponse(
id=db_item.id,
status="created",
message="Item created successfully"
)

except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail="Internal server error")

@router.get("/items", response_model=List[MyResponse])
async def get_items(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
):
"""Get all items with pagination."""
items = await my_handler.get_items(db, skip=skip, limit=limit)
return items

@router.get("/items/{item_id}", response_model=MyResponse)
async def get_item(
item_id: str,
db: Session = Depends(get_db),
):
"""Get single item by ID."""
item = await my_handler.get_item(db, item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item

🎯 Background Task Service​

Il servizio principale per elaborazioni AI intensive.

1. Celery Configuration​

# apps/background-task/app/celery_app.py
import os
from celery import Celery
from .config import get_settings

settings = get_settings()

celery_app = Celery(
"background-task",
broker=settings.redis_url,
result_backend=settings.redis_url,
include=["app.tasks"],
)

# Configuration
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,

# Task routing
task_routes={
"app.tasks.process_document": {"queue": "document_processing"},
"app.tasks.generate_embeddings": {"queue": "ai_processing"},
"app.tasks.transcribe_audio": {"queue": "audio_processing"},
},

# Concurrency settings
worker_prefetch_multiplier=1,
task_acks_late=True,
worker_max_tasks_per_child=1000,

# Result expiration
result_expires=3600,

# Task time limits
task_time_limit=1800, # 30 minutes
task_soft_time_limit=1500, # 25 minutes
)

if __name__ == "__main__":
celery_app.start()

2. Task Definitions​

# apps/background-task/app/tasks.py
from celery import current_task
from .celery_app import celery_app
from .handlers.document import DocumentHandler
from .handlers.storage import StorageHandler
from .handlers.vector_db import VectorDBHandler
from .chunkers import get_chunker
import logging

logger = logging.getLogger(__name__)

@celery_app.task(bind=True, name="process_document")
def process_document(self, document_id: str, config: dict):
"""
Process document: optimization, chunking, embeddings.

Args:
document_id: UUID of document to process
config: Processing configuration
"""
try:
# Update task status
self.update_state(
state="PROCESSING",
meta={"phase": "initialization", "progress": 0}
)

doc_handler = DocumentHandler()
storage_handler = StorageHandler()
vector_handler = VectorDBHandler()

# Phase 1: File optimization
self.update_state(
state="PROCESSING",
meta={"phase": "optimization", "progress": 10}
)

optimized_file = doc_handler.optimize_file(document_id)

# Phase 2: Content extraction and chunking
self.update_state(
state="PROCESSING",
meta={"phase": "chunking", "progress": 30}
)

chunker = get_chunker(config["chunking_strategy"])
chunks = chunker.chunk_document(
optimized_file,
chunk_size=config["chunk_size"],
overlap=config["chunk_overlap"]
)

# Phase 3: Generate embeddings
self.update_state(
state="PROCESSING",
meta={"phase": "embeddings", "progress": 60}
)

embeddings = []
for i, chunk in enumerate(chunks):
embedding = doc_handler.generate_embedding(chunk.content)
embeddings.append({
"chunk_id": chunk.id,
"embedding": embedding,
"metadata": chunk.metadata
})

# Update progress
progress = 60 + (i / len(chunks)) * 30
self.update_state(
state="PROCESSING",
meta={"phase": "embeddings", "progress": int(progress)}
)

# Phase 4: Store in vector database
self.update_state(
state="PROCESSING",
meta={"phase": "storage", "progress": 90}
)

vector_handler.store_embeddings(document_id, embeddings)

# Final update
return {
"status": "completed",
"document_id": document_id,
"chunks_processed": len(chunks),
"embeddings_generated": len(embeddings)
}

except Exception as exc:
logger.error(f"Task failed for document {document_id}: {exc}")
self.update_state(
state="FAILURE",
meta={"error": str(exc), "document_id": document_id}
)
raise

@celery_app.task(bind=True, name="transcribe_audio")
def transcribe_audio(self, file_path: str, language: str = "it"):
"""
Transcribe audio file with speaker diarization.

Args:
file_path: Path to audio file
language: Language code for transcription
"""
try:
from .handlers.audio import AudioHandler

self.update_state(
state="PROCESSING",
meta={"phase": "loading", "progress": 0}
)

audio_handler = AudioHandler()

# Load and preprocess audio
self.update_state(
state="PROCESSING",
meta={"phase": "preprocessing", "progress": 20}
)

audio_data = audio_handler.load_audio(file_path)

# Transcription with WhisperX
self.update_state(
state="PROCESSING",
meta={"phase": "transcription", "progress": 40}
)

transcript = audio_handler.transcribe_with_diarization(
audio_data,
language=language
)

# Post-processing
self.update_state(
state="PROCESSING",
meta={"phase": "postprocessing", "progress": 80}
)

processed_transcript = audio_handler.process_transcript(transcript)

return {
"status": "completed",
"transcript": processed_transcript,
"speakers": len(set(s["speaker"] for s in processed_transcript)),
"duration": audio_handler.get_duration(audio_data)
}

except Exception as exc:
logger.error(f"Audio transcription failed: {exc}")
self.update_state(
state="FAILURE",
meta={"error": str(exc)}
)
raise

3. Chunking System​

# apps/background-task/app/chunkers/base.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class Chunk:
id: str
content: str
metadata: Dict[str, Any]
tokens: int

class BaseChunker(ABC):
"""Base class for all chunkers."""

def __init__(self, chunk_size: int = 512, overlap: int = 75):
self.chunk_size = chunk_size
self.overlap = overlap
self.tokenizer = self._load_tokenizer()

def _load_tokenizer(self):
"""Load BGE-M3 tokenizer for consistent token counting."""
from transformers import AutoTokenizer
return AutoTokenizer.from_pretrained("BAAI/bge-m3")

def count_tokens(self, text: str) -> int:
"""Count tokens using BGE-M3 tokenizer."""
return len(self.tokenizer.encode(text))

@abstractmethod
def chunk_document(
self,
content: str,
metadata: Dict[str, Any] = None
) -> List[Chunk]:
"""Chunk document content into smaller pieces."""
pass

# apps/background-task/app/chunkers/document.py
from langchain_text_splitters import RecursiveCharacterTextSplitter
from .base import BaseChunker, Chunk
import uuid

class DocumentChunker(BaseChunker):
"""Chunker for PDF, TXT, MD files using MinerU for PDFs."""

def __init__(self, chunk_size: int = 512, overlap: int = 75):
super().__init__(chunk_size, overlap)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size * 4, # Approximate character count
chunk_overlap=overlap * 4,
length_function=self.count_tokens,
separators=["\n\n", "\n", ". ", "? ", "! ", " ", ""]
)

def chunk_document(self, file_path: str, metadata: Dict = None) -> List[Chunk]:
"""
Chunk document file.

Args:
file_path: Path to optimized document file
metadata: Additional metadata

Returns:
List of chunks with content and metadata
"""
# Extract text based on file type
if file_path.endswith('.pdf'):
content = self._extract_pdf_with_mineru(file_path)
else:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()

# Create chunks
text_chunks = self.text_splitter.split_text(content)

chunks = []
for i, chunk_content in enumerate(text_chunks):
chunk_metadata = {
"chunk_index": i,
"source_file": file_path,
"token_count": self.count_tokens(chunk_content),
**(metadata or {})
}

chunks.append(Chunk(
id=str(uuid.uuid4()),
content=chunk_content.strip(),
metadata=chunk_metadata,
tokens=chunk_metadata["token_count"]
))

return chunks

def _extract_pdf_with_mineru(self, pdf_path: str) -> str:
"""Extract text from PDF using MinerU."""
try:
from mineru.core import PDFProcessor

processor = PDFProcessor()
result = processor.process_pdf(pdf_path)

# Extract text from MinerU result
text_content = ""
for page in result.pages:
for block in page.blocks:
if block.type == "text":
text_content += block.content + "\n"

return text_content

except Exception as e:
logger.error(f"MinerU extraction failed: {e}")
# Fallback to PyPDF
import PyPDF2
with open(pdf_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text

# apps/background-task/app/chunkers/audio.py
from .base import BaseChunker, Chunk
import json

class AudioChunker(BaseChunker):
"""Chunker for MP3 files using WhisperX transcription."""

def chunk_document(self, audio_path: str, metadata: Dict = None) -> List[Chunk]:
"""
Chunk audio file based on speaker diarization.

Args:
audio_path: Path to audio file
metadata: Additional metadata including language

Returns:
List of chunks based on speaker segments
"""
from ..handlers.audio import AudioHandler

audio_handler = AudioHandler()

# Transcribe with diarization
transcript = audio_handler.transcribe_with_diarization(
audio_path,
language=metadata.get("language", "it")
)

chunks = []
current_speaker = None
current_content = ""
current_start = None

for segment in transcript:
if segment["speaker"] != current_speaker:
# Save previous chunk
if current_content:
chunks.append(self._create_audio_chunk(
current_content,
current_speaker,
current_start,
segment["start"],
metadata
))

# Start new chunk
current_speaker = segment["speaker"]
current_content = segment["text"]
current_start = segment["start"]
else:
current_content += " " + segment["text"]

# Add final chunk
if current_content:
chunks.append(self._create_audio_chunk(
current_content,
current_speaker,
current_start,
transcript[-1]["end"],
metadata
))

return chunks

def _create_audio_chunk(
self,
content: str,
speaker: str,
start_time: float,
end_time: float,
metadata: Dict
) -> Chunk:
"""Create audio chunk with timing metadata."""
chunk_metadata = {
"speaker": speaker,
"start_time": start_time,
"end_time": end_time,
"duration": end_time - start_time,
"media_type": "audio",
"token_count": self.count_tokens(content),
**(metadata or {})
}

return Chunk(
id=str(uuid.uuid4()),
content=f"[{start_time:.1f}s - {end_time:.1f}s] {speaker}: {content}",
metadata=chunk_metadata,
tokens=chunk_metadata["token_count"]
)

πŸ” Ricerca Semantica con Milvus​

La ricerca semantica Γ¨ gestita direttamente tramite Milvus come servizio infrastrutturale, accessibile via API dalle applicazioni.

1. Milvus Integration​

# Esempio di integrazione Milvus nelle applicazioni
# Invece di un servizio separato, Milvus Γ¨ accessibile direttamente

from pymilvus import connections, Collection
from typing import List, Dict, Any
import os

# Connessione a Milvus (da fare in app startup)
connections.connect(
alias="default",
host=os.getenv("MILVUS_HOSTNAME", "milvus"),
port=int(os.getenv("MILVUS_PORT", "19530")),
user=os.getenv("MILVUS_USER", "root"),
password=os.getenv("MILVUS_PASSWORD", "milvus")
)

# Esempio di ricerca in un endpoint API
@router.post("/api/v1/search")
async def search_documents(
query: str,
limit: int = 10,
filters: Dict[str, Any] = None
):
"""
Ricerca semantica nei documenti.
Milvus Γ¨ acceduto direttamente, non tramite servizio separato.
"""
# Genera embedding per la query
query_embedding = await generate_embedding(query)

# Cerca in Milvus
collection = Collection("documents")
results = collection.search(
data=[query_embedding],
anns_field="embedding",
limit=limit,
# Altri parametri...
)

return format_search_results(results)

πŸ”§ Development Workflow​

1. Local Development​

# Avvia servizio in sviluppo
cd apps/background-task
uv run fastapi dev app/main.py --reload --port 8001

# Avvia worker Celery
uv run celery -A app.celery_app worker --loglevel=info --pool=solo

# Monitor tasks
uv run celery -A app.celery_app flower --port=5555

2. Testing​

# tests/test_chunkers.py
import pytest
from app.chunkers.document import DocumentChunker

def test_document_chunker():
"""Test document chunking functionality."""
chunker = DocumentChunker(chunk_size=100, overlap=20)

content = "This is a test document. " * 100
chunks = chunker.chunk_document(content)

assert len(chunks) > 1
assert all(chunk.tokens <= 100 for chunk in chunks)

# Test overlap
if len(chunks) > 1:
first_chunk_end = chunker.tokenizer.decode(
chunker.tokenizer.encode(chunks[0].content)[-20:]
)
second_chunk_start = chunker.tokenizer.decode(
chunker.tokenizer.encode(chunks[1].content)[:20]
)
# Should have some overlap
assert any(word in second_chunk_start for word in first_chunk_end.split())

# Run tests
pytest tests/ -v

3. Docker Development​

# Dockerfile per sviluppo con hot reload
FROM python:3.10-slim

WORKDIR /app

# Install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

# Install dependencies
COPY pyproject.toml uv.lock ./
RUN uv sync --dev

# Copy source code
COPY app/ ./app/

# Development command con reload
CMD ["uv", "run", "fastapi", "dev", "app/main.py", "--host", "0.0.0.0", "--reload"]

4. Debugging​

# Debugging con rich e structlog
import structlog
from rich.logging import RichHandler
from rich.traceback import install

# Install rich traceback
install(show_locals=True)

# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)

logger = structlog.get_logger(__name__)

# Usage in code
logger.info(
"document_processed",
document_id=doc_id,
processing_time=elapsed_time,
chunk_count=len(chunks),
user_id=user_id
)

πŸ“Š Monitoring e Logging​

1. Health Checks​

# Health check endpoint
from fastapi import status
from sqlalchemy import text

@app.get("/health", status_code=status.HTTP_200_OK)
async def health_check(db: Session = Depends(get_db)):
"""Comprehensive health check."""
health_status = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"checks": {}
}

# Database check
try:
db.execute(text("SELECT 1"))
health_status["checks"]["database"] = "healthy"
except Exception as e:
health_status["checks"]["database"] = f"unhealthy: {str(e)}"
health_status["status"] = "unhealthy"

# Redis check
try:
redis_client = get_redis()
redis_client.ping()
health_status["checks"]["redis"] = "healthy"
except Exception as e:
health_status["checks"]["redis"] = f"unhealthy: {str(e)}"
health_status["status"] = "unhealthy"

# Celery check
try:
from .celery_app import celery_app
stats = celery_app.control.inspect().stats()
if stats:
health_status["checks"]["celery"] = "healthy"
else:
health_status["checks"]["celery"] = "no workers"
except Exception as e:
health_status["checks"]["celery"] = f"unhealthy: {str(e)}"

if health_status["status"] != "healthy":
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content=health_status
)

return health_status

2. Metrics e Observability​

# Prometheus metrics
from prometheus_client import Counter, Histogram, generate_latest

# Metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')
TASK_COUNT = Counter('celery_tasks_total', 'Total Celery tasks', ['task', 'status'])

# Middleware for metrics
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()

response = await call_next(request)

# Record metrics
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path
).inc()

REQUEST_DURATION.observe(time.time() - start_time)

return response

@app.get("/metrics")
async def get_metrics():
"""Prometheus metrics endpoint."""
return Response(generate_latest(), media_type="text/plain")

Questo setup backend garantisce:

  • βœ… ScalabilitΓ : Pattern SPP e task queue Celery
  • βœ… Performance: Processamento asincrono e caching Redis
  • βœ… Reliability: Health checks, retry logic, error handling
  • βœ… Observability: Logging strutturato, metriche, tracing
  • βœ… AI Integration: Pipeline complete per elaborazione AI
  • βœ… Type Safety: Pydantic models e FastAPI validation

Questa pagina ti Γ¨ stata utile?