Passa al contenuto principale

Background Tasks

Guida completa al sistema di elaborazione asincrona con Celery e background-task service.

🎯 Overview​

Il servizio background-task gestisce tutte le operazioni intensive di Emblema:

  • Chunking documenti (PDF, TXT, MD)
  • Trascrizione audio/video (MP3, MP4)
  • Generazione embeddings (BGE-M3)
  • Ottimizzazione file (conversione formati)
  • Estrazione testo avanzata (MinerU per PDF)

πŸ—οΈ Architettura​

Loading diagram...

Componenti Principali​

  1. FastAPI Server - API per submit task e query status
  2. Celery Workers - Processamento asincrono task
  3. Redis - Message broker e result backend
  4. Flower - Monitoring UI per Celery

πŸ“¦ Configurazione​

Environment Variables​

# Redis Configuration
CELERY_BROKER_URL=redis://redis-master:6379/6
CELERY_RESULT_URL=redis://redis-master:6379/6

# Worker Configuration
CELERY_MAX_CONCURRENCY=1 # Numero worker concorrenti
WORKING_DIR_MAX_AGE=604800 # 7 giorni retention

# Model Configuration
DEFAULT_EMBEDDING_MODEL=bge-m3
MODEL_ROOT_DIR=./models

# External Services
MILVUS_API_URL=http://milvus:19530/v2/vectordb
MINIO_HOSTNAME=minio
HASURA_API_URL=http://graphql-engine:8080/v1/graphql

Celery Configuration​

# app/celery_app.py
from celery import Celery

celery_app = Celery(
"background-task",
broker=settings.celery_broker_url,
result_backend=settings.celery_result_url,
include=["app.tasks"],
)

celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,

# Task routing per queue specializzate
task_routes={
"app.tasks.process_document": {"queue": "document_processing"},
"app.tasks.generate_embeddings": {"queue": "ai_processing"},
"app.tasks.transcribe_audio": {"queue": "audio_processing"},
},

# Performance settings
worker_prefetch_multiplier=1,
task_acks_late=True,
worker_max_tasks_per_child=1000,

# Time limits
task_time_limit=1800, # 30 minuti
task_soft_time_limit=1500, # 25 minuti
)

πŸ”§ Task Implementation​

1. Definire un Task​

from celery import current_task
from app.celery_app import celery_app
import logging

logger = logging.getLogger(__name__)

@celery_app.task(bind=True, name="process_document")
def process_document(self, document_id: str, config: dict):
"""
Process document con progress tracking.

Args:
document_id: UUID del documento
config: Configurazione chunking
"""
try:
# Update status iniziale
self.update_state(
state="PROCESSING",
meta={
"phase": "initialization",
"progress": 0,
"document_id": document_id
}
)

# Fase 1: Download e ottimizzazione
self.update_state(
state="PROCESSING",
meta={"phase": "optimization", "progress": 10}
)

doc_handler = DocumentHandler()
optimized_file = doc_handler.optimize_file(document_id)

# Fase 2: Chunking
self.update_state(
state="PROCESSING",
meta={"phase": "chunking", "progress": 30}
)

chunker = get_chunker(config["chunking_strategy"])
chunks = chunker.chunk_document(
optimized_file,
chunk_size=config["chunk_size"],
overlap=config["chunk_overlap"]
)

# Fase 3: Embeddings
for i, chunk in enumerate(chunks):
progress = 30 + (i / len(chunks)) * 60
self.update_state(
state="PROCESSING",
meta={
"phase": "embeddings",
"progress": int(progress),
"current_chunk": i + 1,
"total_chunks": len(chunks)
}
)

# Genera embedding per chunk
embedding = generate_embedding(chunk.content)
store_to_milvus(chunk, embedding)

# Completato
return {
"status": "completed",
"document_id": document_id,
"chunks_processed": len(chunks),
"processing_time": time.time() - start_time
}

except Exception as exc:
logger.error(f"Task failed: {exc}")
self.update_state(
state="FAILURE",
meta={"error": str(exc)}
)
raise

2. Task con Retry​

@celery_app.task(
bind=True,
name="transcribe_audio",
autoretry_for=(ConnectionError, TimeoutError),
retry_kwargs={'max_retries': 3, 'countdown': 60}
)
def transcribe_audio(self, audio_path: str, language: str = "it"):
"""
Trascrizione audio con retry automatico.
"""
try:
audio_handler = AudioHandler()

# Carica e processa audio
audio_data = audio_handler.load_audio(audio_path)

# Trascrizione con WhisperX
transcript = audio_handler.transcribe_with_diarization(
audio_data,
language=language
)

return {
"status": "completed",
"transcript": transcript,
"duration": audio_handler.get_duration(audio_data)
}

except WhisperError as exc:
# Errore non recuperabile
logger.error(f"Whisper error: {exc}")
raise self.retry(exc=exc)

3. Task Chain​

from celery import chain, group, chord

def process_document_pipeline(document_id: str):
"""
Pipeline completa elaborazione documento.
"""
# Chain sequenziale
workflow = chain(
download_file.s(document_id),
optimize_document.s(),
extract_text.s(),
generate_chunks.s(),
group( # Parallel processing
generate_embeddings.s(),
extract_metadata.s(),
create_preview.s()
),
store_results.s(document_id)
)

return workflow.apply_async()

# Uso
result = process_document_pipeline("doc-123")

πŸ“Š Progress Tracking​

Server-Side (SSE)​

from fastapi import APIRouter, Response
from sse_starlette.sse import EventSourceResponse
import asyncio

router = APIRouter()

@router.get("/tasks/{task_id}/progress")
async def task_progress(task_id: str):
"""
Stream progress updates via SSE.
"""
async def event_generator():
while True:
# Get task status da Celery
result = AsyncResult(task_id)

if result.state == 'PENDING':
yield {
"data": json.dumps({
"state": "pending",
"progress": 0
})
}
elif result.state == 'PROCESSING':
yield {
"data": json.dumps({
"state": "processing",
**result.info
})
}
elif result.state == 'SUCCESS':
yield {
"data": json.dumps({
"state": "completed",
"result": result.result
})
}
break
elif result.state == 'FAILURE':
yield {
"data": json.dumps({
"state": "failed",
"error": str(result.info)
})
}
break

await asyncio.sleep(1)

return EventSourceResponse(event_generator())

Client-Side (React)​

const useTaskProgress = (taskId: string) => {
const [progress, setProgress] = useState(0);
const [status, setStatus] = useState<TaskStatus>("pending");
const [error, setError] = useState<string | null>(null);

useEffect(() => {
if (!taskId) return;

const eventSource = new EventSource(`/api/v1/tasks/${taskId}/progress`);

eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);

setStatus(data.state);
setProgress(data.progress || 0);

if (data.state === "failed") {
setError(data.error);
eventSource.close();
} else if (data.state === "completed") {
setProgress(100);
eventSource.close();
}
};

eventSource.onerror = () => {
setError("Connection lost");
eventSource.close();
};

return () => eventSource.close();
}, [taskId]);

return { progress, status, error };
};

🧩 Chunking Strategies​

1. Document Chunker​

class DocumentChunker(BaseChunker):
"""Chunker per PDF, TXT, MD."""

def chunk_document(self, file_path: str, metadata: Dict = None):
# Estrazione testo
if file_path.endswith('.pdf'):
text = self._extract_with_mineru(file_path)
else:
text = self._read_text_file(file_path)

# Chunking strategy
if self.strategy == "recursive":
chunks = self._recursive_chunk(text)
elif self.strategy == "semantic":
chunks = self._semantic_chunk(text)
elif self.strategy == "sentence":
chunks = self._sentence_chunk(text)

return self._add_metadata(chunks, metadata)

2. Audio Chunker​

class AudioChunker(BaseChunker):
"""Chunker per audio con diarization."""

def chunk_document(self, audio_path: str, metadata: Dict = None):
# Trascrizione con WhisperX
transcript = self._transcribe_with_whisperx(
audio_path,
language=metadata.get("language", "it")
)

# Chunking per speaker
chunks = []
current_speaker = None
current_text = ""

for segment in transcript:
if segment["speaker"] != current_speaker:
if current_text:
chunks.append({
"speaker": current_speaker,
"text": current_text,
"start": segment_start,
"end": segment["start"]
})
current_speaker = segment["speaker"]
current_text = segment["text"]
segment_start = segment["start"]
else:
current_text += " " + segment["text"]

return chunks

πŸ” Monitoring​

Flower UI​

# Avvia Flower
cd apps/background-task
uv run celery -A app.celery_app flower --port=5555

# Accedi a http://localhost:5555

Celery CLI​

# Status workers
docker compose exec background-task celery -A app.celery_app inspect active

# Task statistics
docker compose exec background-task celery -A app.celery_app inspect stats

# Scheduled tasks
docker compose exec background-task celery -A app.celery_app inspect scheduled

# Revoke task
docker compose exec background-task celery -A app.celery_app control revoke <task_id>

Custom Monitoring​

@router.get("/tasks/stats")
async def get_task_stats():
"""Get Celery statistics."""
stats = celery_app.control.inspect().stats()
active = celery_app.control.inspect().active()

return {
"workers": stats,
"active_tasks": active,
"queue_length": get_queue_length()
}

def get_queue_length():
"""Get Redis queue length."""
import redis
r = redis.from_url(settings.celery_broker_url)
return r.llen("celery")

πŸ› Debugging​

Task Debugging​

# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)

# Debug decorator
def debug_task(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger.debug(f"Starting {func.__name__} with args={args}, kwargs={kwargs}")
start = time.time()
try:
result = func(*args, **kwargs)
logger.debug(f"Completed {func.__name__} in {time.time()-start}s")
return result
except Exception as e:
logger.error(f"Failed {func.__name__}: {e}")
raise
return wrapper

@celery_app.task
@debug_task
def my_task(x, y):
return x + y

Memory Profiling​

import tracemalloc
import psutil

@celery_app.task(bind=True)
def memory_intensive_task(self, data):
# Start tracing
tracemalloc.start()
process = psutil.Process()

# Log memory before
mem_before = process.memory_info().rss / 1024 / 1024
logger.info(f"Memory before: {mem_before:.2f} MB")

# Process
result = process_large_data(data)

# Log memory after
mem_after = process.memory_info().rss / 1024 / 1024
logger.info(f"Memory after: {mem_after:.2f} MB")
logger.info(f"Memory delta: {mem_after - mem_before:.2f} MB")

# Top memory allocations
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')[:10]
for stat in top_stats:
logger.debug(stat)

return result

🚨 Error Handling​

Task Error Types​

class TaskError(Exception):
"""Base task error."""
pass

class RetryableError(TaskError):
"""Error che puΓ² essere ritentato."""
pass

class FatalError(TaskError):
"""Error non recuperabile."""
pass

@celery_app.task(bind=True)
def robust_task(self, document_id: str):
try:
# Processing...
pass
except ConnectionError as e:
# Retry con backoff
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
except ValidationError as e:
# Fatal - non retry
logger.error(f"Validation failed: {e}")
raise FatalError(str(e))
except Exception as e:
# Log e retry limitato
logger.error(f"Unexpected error: {e}", exc_info=True)
if self.request.retries < 3:
raise self.retry(exc=e, countdown=300)
raise

Dead Letter Queue​

# Configurazione DLQ
celery_app.conf.task_routes = {
'app.tasks.*': {'queue': 'default'},
'app.tasks.*.retry': {'queue': 'retry'},
'app.tasks.*.dlq': {'queue': 'dead_letter'},
}

# Task con DLQ
@celery_app.task(bind=True, max_retries=3)
def task_with_dlq(self, data):
try:
return process_data(data)
except Exception as exc:
if self.request.retries >= self.max_retries:
# Sposta in DLQ
dlq_task.apply_async(
args=[self.request.id, data, str(exc)],
queue='dead_letter'
)
raise self.retry(exc=exc)

πŸ“ˆ Performance Tips​

1. Task Optimization​

# Batch processing
@celery_app.task
def process_batch(document_ids: List[str]):
"""Process multiple documents efficiently."""
with get_db_connection() as conn:
for doc_id in document_ids:
process_single(doc_id, conn)

# Prefetch data
@celery_app.task
def optimized_task(doc_id: str):
# Prefetch all data
doc_data = fetch_document_data(doc_id)
config = load_config()
models = preload_models()

# Process con data cached
return process_with_cache(doc_data, config, models)

2. Worker Configuration​

# Worker per CPU-intensive tasks
celery -A app.celery_app worker \
--loglevel=info \
--concurrency=4 \
--pool=prefork \
--queue=cpu_intensive

# Worker per I/O-intensive tasks
celery -A app.celery_app worker \
--loglevel=info \
--concurrency=100 \
--pool=gevent \
--queue=io_intensive

3. Resource Management​

# Limit memoria per task
@celery_app.task
def memory_limited_task(data):
import resource

# Limit 2GB RAM
resource.setrlimit(
resource.RLIMIT_AS,
(2 * 1024 * 1024 * 1024, -1)
)

return process_data(data)

# Connection pooling
from contextlib import contextmanager

@contextmanager
def get_milvus_connection():
conn = connection_pool.get()
try:
yield conn
finally:
connection_pool.put(conn)

πŸ”§ Development Tips​

Local Development​

# Run worker locally
cd apps/background-task
uv run celery -A app.celery_app worker --loglevel=debug

# Con auto-reload
uv run watchmedo auto-restart \
--directory=./app \
--pattern="*.py" \
--recursive \
-- celery -A app.celery_app worker --loglevel=info

Testing Tasks​

# Unit test
def test_chunking_task():
from app.tasks import chunk_document

# Call sincrono per test
result = chunk_document.apply(
args=["doc-123", {"chunk_size": 512}]
).get()

assert result["status"] == "completed"
assert result["chunks_processed"] > 0

# Integration test con Celery
@pytest.fixture
def celery_app():
from app.celery_app import celery_app
celery_app.conf.update(
task_always_eager=True,
task_eager_propagates=True,
)
return celery_app

def test_async_task(celery_app):
result = my_task.delay(1, 2)
assert result.get() == 3

πŸ“š Best Practices​

  1. Task Design

    • Keep tasks idempotent
    • Use meaningful task names
    • Include progress tracking
    • Handle errors gracefully
  2. Performance

    • Batch similar operations
    • Use connection pooling
    • Prefetch data when possible
    • Monitor memory usage
  3. Monitoring

    • Use Flower for real-time monitoring
    • Set up alerts for failed tasks
    • Track task duration metrics
    • Monitor queue lengths
  4. Error Handling

    • Distinguish retriable vs fatal errors
    • Use exponential backoff
    • Implement dead letter queues
    • Log errors with context

πŸ’‘ Next Steps: Consulta Integrare AI per aggiungere modelli AI ai tuoi task.

Questa pagina ti Γ¨ stata utile?