Background Tasks
Guida completa al sistema di elaborazione asincrona con Celery e background-task service.
π― Overviewβ
Il servizio background-task gestisce tutte le operazioni intensive di Emblema:
- Chunking documenti (PDF, TXT, MD)
- Trascrizione audio/video (MP3, MP4)
- Generazione embeddings (BGE-M3)
- Ottimizzazione file (conversione formati)
- Estrazione testo avanzata (MinerU per PDF)
ποΈ Architetturaβ
Loading diagram...
Componenti Principaliβ
- FastAPI Server - API per submit task e query status
- Celery Workers - Processamento asincrono task
- Redis - Message broker e result backend
- Flower - Monitoring UI per Celery
π¦ Configurazioneβ
Environment Variablesβ
# Redis Configuration
CELERY_BROKER_URL=redis://redis-master:6379/6
CELERY_RESULT_URL=redis://redis-master:6379/6
# Worker Configuration
CELERY_MAX_CONCURRENCY=1 # Numero worker concorrenti
WORKING_DIR_MAX_AGE=604800 # 7 giorni retention
# Model Configuration
DEFAULT_EMBEDDING_MODEL=bge-m3
MODEL_ROOT_DIR=./models
# External Services
MILVUS_API_URL=http://milvus:19530/v2/vectordb
MINIO_HOSTNAME=minio
HASURA_API_URL=http://graphql-engine:8080/v1/graphql
Celery Configurationβ
# app/celery_app.py
from celery import Celery
celery_app = Celery(
"background-task",
broker=settings.celery_broker_url,
result_backend=settings.celery_result_url,
include=["app.tasks"],
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
# Task routing per queue specializzate
task_routes={
"app.tasks.process_document": {"queue": "document_processing"},
"app.tasks.generate_embeddings": {"queue": "ai_processing"},
"app.tasks.transcribe_audio": {"queue": "audio_processing"},
},
# Performance settings
worker_prefetch_multiplier=1,
task_acks_late=True,
worker_max_tasks_per_child=1000,
# Time limits
task_time_limit=1800, # 30 minuti
task_soft_time_limit=1500, # 25 minuti
)
π§ Task Implementationβ
1. Definire un Taskβ
from celery import current_task
from app.celery_app import celery_app
import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name="process_document")
def process_document(self, document_id: str, config: dict):
"""
Process document con progress tracking.
Args:
document_id: UUID del documento
config: Configurazione chunking
"""
try:
# Update status iniziale
self.update_state(
state="PROCESSING",
meta={
"phase": "initialization",
"progress": 0,
"document_id": document_id
}
)
# Fase 1: Download e ottimizzazione
self.update_state(
state="PROCESSING",
meta={"phase": "optimization", "progress": 10}
)
doc_handler = DocumentHandler()
optimized_file = doc_handler.optimize_file(document_id)
# Fase 2: Chunking
self.update_state(
state="PROCESSING",
meta={"phase": "chunking", "progress": 30}
)
chunker = get_chunker(config["chunking_strategy"])
chunks = chunker.chunk_document(
optimized_file,
chunk_size=config["chunk_size"],
overlap=config["chunk_overlap"]
)
# Fase 3: Embeddings
for i, chunk in enumerate(chunks):
progress = 30 + (i / len(chunks)) * 60
self.update_state(
state="PROCESSING",
meta={
"phase": "embeddings",
"progress": int(progress),
"current_chunk": i + 1,
"total_chunks": len(chunks)
}
)
# Genera embedding per chunk
embedding = generate_embedding(chunk.content)
store_to_milvus(chunk, embedding)
# Completato
return {
"status": "completed",
"document_id": document_id,
"chunks_processed": len(chunks),
"processing_time": time.time() - start_time
}
except Exception as exc:
logger.error(f"Task failed: {exc}")
self.update_state(
state="FAILURE",
meta={"error": str(exc)}
)
raise
2. Task con Retryβ
@celery_app.task(
bind=True,
name="transcribe_audio",
autoretry_for=(ConnectionError, TimeoutError),
retry_kwargs={'max_retries': 3, 'countdown': 60}
)
def transcribe_audio(self, audio_path: str, language: str = "it"):
"""
Trascrizione audio con retry automatico.
"""
try:
audio_handler = AudioHandler()
# Carica e processa audio
audio_data = audio_handler.load_audio(audio_path)
# Trascrizione con WhisperX
transcript = audio_handler.transcribe_with_diarization(
audio_data,
language=language
)
return {
"status": "completed",
"transcript": transcript,
"duration": audio_handler.get_duration(audio_data)
}
except WhisperError as exc:
# Errore non recuperabile
logger.error(f"Whisper error: {exc}")
raise self.retry(exc=exc)
3. Task Chainβ
from celery import chain, group, chord
def process_document_pipeline(document_id: str):
"""
Pipeline completa elaborazione documento.
"""
# Chain sequenziale
workflow = chain(
download_file.s(document_id),
optimize_document.s(),
extract_text.s(),
generate_chunks.s(),
group( # Parallel processing
generate_embeddings.s(),
extract_metadata.s(),
create_preview.s()
),
store_results.s(document_id)
)
return workflow.apply_async()
# Uso
result = process_document_pipeline("doc-123")
π Progress Trackingβ
Server-Side (SSE)β
from fastapi import APIRouter, Response
from sse_starlette.sse import EventSourceResponse
import asyncio
router = APIRouter()
@router.get("/tasks/{task_id}/progress")
async def task_progress(task_id: str):
"""
Stream progress updates via SSE.
"""
async def event_generator():
while True:
# Get task status da Celery
result = AsyncResult(task_id)
if result.state == 'PENDING':
yield {
"data": json.dumps({
"state": "pending",
"progress": 0
})
}
elif result.state == 'PROCESSING':
yield {
"data": json.dumps({
"state": "processing",
**result.info
})
}
elif result.state == 'SUCCESS':
yield {
"data": json.dumps({
"state": "completed",
"result": result.result
})
}
break
elif result.state == 'FAILURE':
yield {
"data": json.dumps({
"state": "failed",
"error": str(result.info)
})
}
break
await asyncio.sleep(1)
return EventSourceResponse(event_generator())
Client-Side (React)β
const useTaskProgress = (taskId: string) => {
const [progress, setProgress] = useState(0);
const [status, setStatus] = useState<TaskStatus>("pending");
const [error, setError] = useState<string | null>(null);
useEffect(() => {
if (!taskId) return;
const eventSource = new EventSource(`/api/v1/tasks/${taskId}/progress`);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setStatus(data.state);
setProgress(data.progress || 0);
if (data.state === "failed") {
setError(data.error);
eventSource.close();
} else if (data.state === "completed") {
setProgress(100);
eventSource.close();
}
};
eventSource.onerror = () => {
setError("Connection lost");
eventSource.close();
};
return () => eventSource.close();
}, [taskId]);
return { progress, status, error };
};
π§© Chunking Strategiesβ
1. Document Chunkerβ
class DocumentChunker(BaseChunker):
"""Chunker per PDF, TXT, MD."""
def chunk_document(self, file_path: str, metadata: Dict = None):
# Estrazione testo
if file_path.endswith('.pdf'):
text = self._extract_with_mineru(file_path)
else:
text = self._read_text_file(file_path)
# Chunking strategy
if self.strategy == "recursive":
chunks = self._recursive_chunk(text)
elif self.strategy == "semantic":
chunks = self._semantic_chunk(text)
elif self.strategy == "sentence":
chunks = self._sentence_chunk(text)
return self._add_metadata(chunks, metadata)
2. Audio Chunkerβ
class AudioChunker(BaseChunker):
"""Chunker per audio con diarization."""
def chunk_document(self, audio_path: str, metadata: Dict = None):
# Trascrizione con WhisperX
transcript = self._transcribe_with_whisperx(
audio_path,
language=metadata.get("language", "it")
)
# Chunking per speaker
chunks = []
current_speaker = None
current_text = ""
for segment in transcript:
if segment["speaker"] != current_speaker:
if current_text:
chunks.append({
"speaker": current_speaker,
"text": current_text,
"start": segment_start,
"end": segment["start"]
})
current_speaker = segment["speaker"]
current_text = segment["text"]
segment_start = segment["start"]
else:
current_text += " " + segment["text"]
return chunks
π Monitoringβ
Flower UIβ
# Avvia Flower
cd apps/background-task
uv run celery -A app.celery_app flower --port=5555
# Accedi a http://localhost:5555
Celery CLIβ
# Status workers
docker compose exec background-task celery -A app.celery_app inspect active
# Task statistics
docker compose exec background-task celery -A app.celery_app inspect stats
# Scheduled tasks
docker compose exec background-task celery -A app.celery_app inspect scheduled
# Revoke task
docker compose exec background-task celery -A app.celery_app control revoke <task_id>
Custom Monitoringβ
@router.get("/tasks/stats")
async def get_task_stats():
"""Get Celery statistics."""
stats = celery_app.control.inspect().stats()
active = celery_app.control.inspect().active()
return {
"workers": stats,
"active_tasks": active,
"queue_length": get_queue_length()
}
def get_queue_length():
"""Get Redis queue length."""
import redis
r = redis.from_url(settings.celery_broker_url)
return r.llen("celery")
π Debuggingβ
Task Debuggingβ
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Debug decorator
def debug_task(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger.debug(f"Starting {func.__name__} with args={args}, kwargs={kwargs}")
start = time.time()
try:
result = func(*args, **kwargs)
logger.debug(f"Completed {func.__name__} in {time.time()-start}s")
return result
except Exception as e:
logger.error(f"Failed {func.__name__}: {e}")
raise
return wrapper
@celery_app.task
@debug_task
def my_task(x, y):
return x + y
Memory Profilingβ
import tracemalloc
import psutil
@celery_app.task(bind=True)
def memory_intensive_task(self, data):
# Start tracing
tracemalloc.start()
process = psutil.Process()
# Log memory before
mem_before = process.memory_info().rss / 1024 / 1024
logger.info(f"Memory before: {mem_before:.2f} MB")
# Process
result = process_large_data(data)
# Log memory after
mem_after = process.memory_info().rss / 1024 / 1024
logger.info(f"Memory after: {mem_after:.2f} MB")
logger.info(f"Memory delta: {mem_after - mem_before:.2f} MB")
# Top memory allocations
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')[:10]
for stat in top_stats:
logger.debug(stat)
return result
π¨ Error Handlingβ
Task Error Typesβ
class TaskError(Exception):
"""Base task error."""
pass
class RetryableError(TaskError):
"""Error che puΓ² essere ritentato."""
pass
class FatalError(TaskError):
"""Error non recuperabile."""
pass
@celery_app.task(bind=True)
def robust_task(self, document_id: str):
try:
# Processing...
pass
except ConnectionError as e:
# Retry con backoff
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
except ValidationError as e:
# Fatal - non retry
logger.error(f"Validation failed: {e}")
raise FatalError(str(e))
except Exception as e:
# Log e retry limitato
logger.error(f"Unexpected error: {e}", exc_info=True)
if self.request.retries < 3:
raise self.retry(exc=e, countdown=300)
raise
Dead Letter Queueβ
# Configurazione DLQ
celery_app.conf.task_routes = {
'app.tasks.*': {'queue': 'default'},
'app.tasks.*.retry': {'queue': 'retry'},
'app.tasks.*.dlq': {'queue': 'dead_letter'},
}
# Task con DLQ
@celery_app.task(bind=True, max_retries=3)
def task_with_dlq(self, data):
try:
return process_data(data)
except Exception as exc:
if self.request.retries >= self.max_retries:
# Sposta in DLQ
dlq_task.apply_async(
args=[self.request.id, data, str(exc)],
queue='dead_letter'
)
raise self.retry(exc=exc)
π Performance Tipsβ
1. Task Optimizationβ
# Batch processing
@celery_app.task
def process_batch(document_ids: List[str]):
"""Process multiple documents efficiently."""
with get_db_connection() as conn:
for doc_id in document_ids:
process_single(doc_id, conn)
# Prefetch data
@celery_app.task
def optimized_task(doc_id: str):
# Prefetch all data
doc_data = fetch_document_data(doc_id)
config = load_config()
models = preload_models()
# Process con data cached
return process_with_cache(doc_data, config, models)
2. Worker Configurationβ
# Worker per CPU-intensive tasks
celery -A app.celery_app worker \
--loglevel=info \
--concurrency=4 \
--pool=prefork \
--queue=cpu_intensive
# Worker per I/O-intensive tasks
celery -A app.celery_app worker \
--loglevel=info \
--concurrency=100 \
--pool=gevent \
--queue=io_intensive
3. Resource Managementβ
# Limit memoria per task
@celery_app.task
def memory_limited_task(data):
import resource
# Limit 2GB RAM
resource.setrlimit(
resource.RLIMIT_AS,
(2 * 1024 * 1024 * 1024, -1)
)
return process_data(data)
# Connection pooling
from contextlib import contextmanager
@contextmanager
def get_milvus_connection():
conn = connection_pool.get()
try:
yield conn
finally:
connection_pool.put(conn)
π§ Development Tipsβ
Local Developmentβ
# Run worker locally
cd apps/background-task
uv run celery -A app.celery_app worker --loglevel=debug
# Con auto-reload
uv run watchmedo auto-restart \
--directory=./app \
--pattern="*.py" \
--recursive \
-- celery -A app.celery_app worker --loglevel=info
Testing Tasksβ
# Unit test
def test_chunking_task():
from app.tasks import chunk_document
# Call sincrono per test
result = chunk_document.apply(
args=["doc-123", {"chunk_size": 512}]
).get()
assert result["status"] == "completed"
assert result["chunks_processed"] > 0
# Integration test con Celery
@pytest.fixture
def celery_app():
from app.celery_app import celery_app
celery_app.conf.update(
task_always_eager=True,
task_eager_propagates=True,
)
return celery_app
def test_async_task(celery_app):
result = my_task.delay(1, 2)
assert result.get() == 3
π Best Practicesβ
-
Task Design
- Keep tasks idempotent
- Use meaningful task names
- Include progress tracking
- Handle errors gracefully
-
Performance
- Batch similar operations
- Use connection pooling
- Prefetch data when possible
- Monitor memory usage
-
Monitoring
- Use Flower for real-time monitoring
- Set up alerts for failed tasks
- Track task duration metrics
- Monitor queue lengths
-
Error Handling
- Distinguish retriable vs fatal errors
- Use exponential backoff
- Implement dead letter queues
- Log errors with context
π‘ Next Steps: Consulta Integrare AI per aggiungere modelli AI ai tuoi task.