Integrare AI
Guida completa per integrare modelli AI e servizi di intelligenza artificiale nei componenti di Emblema.
π― Overviewβ
Emblema utilizza un'architettura modulare per l'integrazione AI che include:
- LiteLLM Gateway - Proxy unificato per accedere a diversi provider LLM
- BGE-M3 Embeddings - Modello multilingue per generazione vettori (1024 dimensioni)
- Milvus Vector DB - Storage e ricerca semantica ad alte prestazioni
- Background Tasks - Elaborazione asincrona per operazioni AI intensive
- MCP Support - Model Context Protocol per estendere capacitΓ LLM
ποΈ Architettura AIβ
Loading diagram...
π§ LiteLLM Gatewayβ
Configurazione Baseβ
# Environment Variables
LITELLM_MASTER_KEY=your-master-key
LITELLM_SALT_KEY=your-salt-key
DEFAULT_CHAT_MODEL=llama3.3-70b-instruct
DEFAULT_EMBEDDING_MODEL=bge-m3
Modelli Supportatiβ
# config/litellm/config.yaml
model_list:
- model_name: gpt-4
litellm_params:
model: openai/gpt-4
api_key: ${OPENAI_API_KEY}
- model_name: claude-3
litellm_params:
model: anthropic/claude-3-opus-20240229
api_key: ${ANTHROPIC_API_KEY}
- model_name: llama3.3-70b-instruct
litellm_params:
model: openai/llama-3.3-70b-instruct
api_base: ${VLLM_BASE_URL}
api_key: dummy
Chiamate APIβ
// lib/ai/litellm-client.ts
import { OpenAI } from "openai";
export const litellm = new OpenAI({
apiKey: process.env.LITELLM_MASTER_KEY,
baseURL: `${process.env.LITELLM_BASE_URL}/v1`,
});
// Esempio chat completion
const response = await litellm.chat.completions.create({
model: "gpt-4",
messages: [
{ role: "system", content: "You are a helpful assistant." },
{ role: "user", content: "Hello!" },
],
temperature: 0.7,
stream: true,
});
π Embeddings con BGE-M3β
Generazione Embeddingsβ
# apps/background-task/app/handlers/embeddings.py
from FlagEmbedding import BGEM3FlagModel
import numpy as np
class EmbeddingHandler:
def __init__(self):
self.model = BGEM3FlagModel(
'BAAI/bge-m3',
use_fp16=True,
device='cuda' if torch.cuda.is_available() else 'cpu'
)
def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""
Genera embeddings per lista di testi.
Returns:
List di vettori 1024-dimensionali
"""
embeddings = self.model.encode(
texts,
batch_size=32,
max_length=512,
return_dense=True,
return_sparse=False,
return_colbert_vecs=False
)
# Normalizza vettori
embeddings = embeddings['dense_vecs']
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
return embeddings.tolist()
Chunking Strategiesβ
# Configurazione chunking per embeddings ottimali
CHUNKING_PRESETS = {
"default": {
"chunk_size": 512, # Token ottimali per BGE-M3
"chunk_overlap": 75, # Overlap per contesto
"strategy": "recursive"
},
"technical": {
"chunk_size": 600, # PiΓΉ grande per contenuti tecnici
"chunk_overlap": 120,
"strategy": "recursive"
},
"qa": {
"chunk_size": 200, # Piccolo per Q&A precisi
"chunk_overlap": 25,
"strategy": "sentence"
}
}
π¨ Chat API Implementationβ
API Route Handlerβ
// app/api/v1/chat/route.ts
import { litellm } from "@/lib/ai/litellm-client";
import { searchDocuments } from "@/lib/ai/search";
export async function POST(req: Request) {
const { messages, agentId, useKnowledgeBase } = await req.json();
try {
// 1. Recupera configurazione agente
const agent = await getAgent(agentId);
// 2. RAG - Recupera contesto se richiesto
let context = "";
if (useKnowledgeBase) {
const lastMessage = messages[messages.length - 1];
const documents = await searchDocuments(
lastMessage.content,
agent.knowledgeBaseId,
);
context = formatDocumentsAsContext(documents);
}
// 3. Costruisci prompt con contesto
const systemPrompt = buildSystemPrompt(agent, context);
// 4. Stream risposta
const stream = await litellm.chat.completions.create({
model: agent.model || DEFAULT_CHAT_MODEL,
messages: [{ role: "system", content: systemPrompt }, ...messages],
temperature: agent.temperature || 0.7,
stream: true,
});
// 5. Return SSE stream
return new Response(createSSEStream(stream), {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
} catch (error) {
return handleAIError(error);
}
}
Client Integrationβ
// hooks/use-chat.ts
export const useChat = (agentId: string) => {
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = async (content: string) => {
const userMessage = { role: "user", content };
setMessages((prev) => [...prev, userMessage]);
setIsStreaming(true);
const response = await fetch("/api/v1/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: [...messages, userMessage],
agentId,
useKnowledgeBase: true,
}),
});
// Handle SSE stream
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let assistantMessage = { role: "assistant", content: "" };
while (true) {
const { done, value } = await reader!.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = JSON.parse(line.slice(6));
assistantMessage.content += data.choices[0]?.delta?.content || "";
setMessages((prev) => [...prev.slice(0, -1), assistantMessage]);
}
}
}
setIsStreaming(false);
};
return { messages, sendMessage, isStreaming };
};
π Ricerca Semanticaβ
Milvus Integrationβ
# apps/background-task/app/handlers/vector_db.py
from pymilvus import Collection, connections
import numpy as np
class MilvusHandler:
def __init__(self):
connections.connect(
alias="default",
host=settings.milvus_host,
port=settings.milvus_port,
user=settings.milvus_user,
password=settings.milvus_password,
)
self.collection = Collection("document_chunks")
def search(
self,
query_vector: List[float],
knowledge_base_id: str = None,
limit: int = 10,
score_threshold: float = 0.7
) -> List[Dict]:
"""
Ricerca semantica nei chunks.
"""
# Costruisci filtro
search_params = {
"metric_type": "COSINE",
"params": {"nprobe": 10}
}
filter_expr = f"score >= {score_threshold}"
if knowledge_base_id:
filter_expr += f" and knowledge_base_id == '{knowledge_base_id}'"
# Esegui ricerca
results = self.collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=limit,
expr=filter_expr,
output_fields=["chunk_id", "content", "metadata", "score"]
)
# Formatta risultati
chunks = []
for hit in results[0]:
chunks.append({
"chunk_id": hit.entity.get("chunk_id"),
"content": hit.entity.get("content"),
"metadata": hit.entity.get("metadata"),
"score": hit.score
})
return chunks
Search APIβ
// app/api/v1/search/route.ts
export async function POST(req: Request) {
const { query, knowledgeBaseId, limit = 10 } = await req.json();
try {
// 1. Genera embedding per query
const queryEmbedding = await generateEmbedding(query);
// 2. Cerca in Milvus
const results = await milvusClient.search({
collection_name: "document_chunks",
vector: queryEmbedding,
filter: knowledgeBaseId
? `knowledge_base_id == "${knowledgeBaseId}"`
: undefined,
limit,
output_fields: ["chunk_id", "content", "metadata"],
});
// 3. Arricchisci risultati
const enrichedResults = await enrichSearchResults(results);
return Response.json({
results: enrichedResults,
query,
total: enrichedResults.length,
});
} catch (error) {
return handleSearchError(error);
}
}
π€ MCP (Model Context Protocol)β
Server Configurationβ
// lib/mcp/server-manager.ts
export class MCPServerManager {
private servers: Map<string, MCPServer> = new Map();
async initializeServer(config: MCPServerConfig) {
const server = new MCPServer({
command: config.command,
args: config.args,
env: config.env,
});
await server.start();
this.servers.set(config.id, server);
return server;
}
async callTool(serverId: string, toolName: string, args: any) {
const server = this.servers.get(serverId);
if (!server) throw new Error("Server not found");
return await server.callTool(toolName, args);
}
}
MCP Integration in Chatβ
// Integrazione MCP nel flusso chat
async function handleMCPTools(message: string, mcpServers: string[]) {
const tools = await getMCPTools(mcpServers);
// Includi tools nel prompt
const response = await litellm.chat.completions.create({
model: "gpt-4",
messages: [
{
role: "system",
content: `You have access to these tools: ${JSON.stringify(tools)}`,
},
{ role: "user", content: message },
],
tools: tools,
tool_choice: "auto",
});
// Esegui tool calls se richiesti
if (response.choices[0].message.tool_calls) {
for (const toolCall of response.choices[0].message.tool_calls) {
const result = await executeMCPTool(toolCall);
// Aggiungi risultato al contesto
}
}
}
π Monitoring & Analyticsβ
AI Metricsβ
// lib/ai/metrics.ts
export async function trackAIUsage(params: {
model: string;
tokens: number;
latency: number;
type: "chat" | "embedding";
}) {
await db.ai_usage.create({
data: {
model: params.model,
tokens: params.tokens,
latency: params.latency,
type: params.type,
timestamp: new Date(),
cost: calculateCost(params.model, params.tokens),
},
});
}
Performance Optimizationβ
# Ottimizzazioni per inferenza
class OptimizedEmbeddingHandler:
def __init__(self):
# Batch processing
self.batch_size = 32
# Model quantization
self.model = BGEM3FlagModel(
'BAAI/bge-m3',
use_fp16=True, # Half precision
device='cuda'
)
# Caching
self.cache = LRUCache(maxsize=10000)
async def process_batch(self, texts: List[str]):
# Check cache
uncached = []
cached_results = {}
for text in texts:
cache_key = hashlib.md5(text.encode()).hexdigest()
if cache_key in self.cache:
cached_results[text] = self.cache[cache_key]
else:
uncached.append(text)
# Process uncached
if uncached:
embeddings = self.model.encode(uncached, batch_size=self.batch_size)
# Update cache
for text, embedding in zip(uncached, embeddings):
cache_key = hashlib.md5(text.encode()).hexdigest()
self.cache[cache_key] = embedding
π Security Best Practicesβ
API Key Managementβ
// Validazione API keys
const validateAPIKey = async (key: string): Promise<boolean> => {
// Verifica formato
if (!key.match(/^sk-[a-zA-Z0-9]{48}$/)) {
return false;
}
// Verifica in database
const hashedKey = await hashAPIKey(key);
const exists = await db.api_keys.findFirst({
where: { hashed_key: hashedKey, active: true },
});
return !!exists;
};
Rate Limitingβ
// middleware/ai-rate-limit.ts
export const aiRateLimit = rateLimit({
windowMs: 60 * 1000, // 1 minuto
max: async (req) => {
const user = await getUser(req);
return user.tier === "premium" ? 100 : 10;
},
standardHeaders: true,
legacyHeaders: false,
});
π§ͺ Testing AI Featuresβ
Unit Testsβ
# tests/test_embeddings.py
import pytest
from app.handlers.embeddings import EmbeddingHandler
@pytest.fixture
def embedding_handler():
return EmbeddingHandler()
def test_embedding_generation(embedding_handler):
texts = ["Hello world", "Test embedding"]
embeddings = embedding_handler.generate_embeddings(texts)
assert len(embeddings) == 2
assert len(embeddings[0]) == 1024
assert all(-1 <= val <= 1 for val in embeddings[0])
def test_embedding_similarity(embedding_handler):
texts = ["cat", "dog", "automobile"]
embeddings = embedding_handler.generate_embeddings(texts)
# Cat e dog dovrebbero essere piΓΉ simili
cat_dog_sim = cosine_similarity(embeddings[0], embeddings[1])
cat_car_sim = cosine_similarity(embeddings[0], embeddings[2])
assert cat_dog_sim > cat_car_sim
Integration Testsβ
// __tests__/api/chat.test.ts
describe("Chat API", () => {
it("should stream chat response", async () => {
const response = await fetch("/api/v1/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: [{ role: "user", content: "Hello" }],
agentId: "test-agent",
}),
});
expect(response.headers.get("content-type")).toBe("text/event-stream");
const reader = response.body!.getReader();
const { value } = await reader.read();
const chunk = new TextDecoder().decode(value);
expect(chunk).toContain("data: ");
});
});
π Best Practicesβ
-
Streaming Responses
- Usa sempre streaming per chat lunghe
- Implementa proper error handling
- Mostra indicatori di caricamento
-
Caching Strategy
- Cache embeddings frequenti
- Cache risultati ricerca per query comuni
- TTL appropriato per invalidazione
-
Error Handling
- Fallback a modelli alternativi
- Messaggi errore user-friendly
- Logging dettagliato per debug
-
Performance
- Batch processing per embeddings
- Lazy loading modelli
- Connection pooling per Milvus
-
Security
- Sanitizza input utente
- Rate limiting per endpoint
- Audit logging per compliance
π Esempi Avanzatiβ
RAG Pipeline Completoβ
// lib/ai/rag-pipeline.ts
export class RAGPipeline {
async process(query: string, options: RAGOptions) {
// 1. Query expansion
const expandedQueries = await this.expandQuery(query);
// 2. Multi-query search
const searchResults = await Promise.all(
expandedQueries.map((q) => this.searchDocuments(q)),
);
// 3. Re-ranking
const rerankedDocs = await this.rerank(query, searchResults.flat());
// 4. Context building
const context = this.buildContext(rerankedDocs, options.maxTokens);
// 5. Generate response
return await this.generateResponse(query, context, options);
}
}
Custom Agent Implementationβ
// lib/ai/custom-agent.ts
export class CustomAgent {
constructor(
private config: AgentConfig,
private tools: Tool[],
) {}
async run(input: string): Promise<AgentResponse> {
let messages = [
{ role: "system", content: this.config.systemPrompt },
{ role: "user", content: input },
];
let iterations = 0;
const maxIterations = 5;
while (iterations < maxIterations) {
const response = await litellm.chat.completions.create({
model: this.config.model,
messages,
tools: this.tools.map((t) => t.schema),
tool_choice: "auto",
});
const message = response.choices[0].message;
messages.push(message);
if (!message.tool_calls) {
return { output: message.content, iterations };
}
// Execute tools
for (const toolCall of message.tool_calls) {
const result = await this.executeTool(toolCall);
messages.push({
role: "tool",
content: JSON.stringify(result),
tool_call_id: toolCall.id,
});
}
iterations++;
}
throw new Error("Max iterations reached");
}
}
π‘ Next Steps: Consulta Background Tasks per implementare elaborazioni AI asincrone.