Passa al contenuto principale

Integrare AI

Guida completa per integrare modelli AI e servizi di intelligenza artificiale nei componenti di Emblema.

🎯 Overview​

Emblema utilizza un'architettura modulare per l'integrazione AI che include:

  • LiteLLM Gateway - Proxy unificato per accedere a diversi provider LLM
  • BGE-M3 Embeddings - Modello multilingue per generazione vettori (1024 dimensioni)
  • Milvus Vector DB - Storage e ricerca semantica ad alte prestazioni
  • Background Tasks - Elaborazione asincrona per operazioni AI intensive
  • MCP Support - Model Context Protocol per estendere capacitΓ  LLM

πŸ—οΈ Architettura AI​

Loading diagram...

πŸ”§ LiteLLM Gateway​

Configurazione Base​

# Environment Variables
LITELLM_MASTER_KEY=your-master-key
LITELLM_SALT_KEY=your-salt-key
DEFAULT_CHAT_MODEL=llama3.3-70b-instruct
DEFAULT_EMBEDDING_MODEL=bge-m3

Modelli Supportati​

# config/litellm/config.yaml
model_list:
- model_name: gpt-4
litellm_params:
model: openai/gpt-4
api_key: ${OPENAI_API_KEY}

- model_name: claude-3
litellm_params:
model: anthropic/claude-3-opus-20240229
api_key: ${ANTHROPIC_API_KEY}

- model_name: llama3.3-70b-instruct
litellm_params:
model: openai/llama-3.3-70b-instruct
api_base: ${VLLM_BASE_URL}
api_key: dummy

Chiamate API​

// lib/ai/litellm-client.ts
import { OpenAI } from "openai";

export const litellm = new OpenAI({
apiKey: process.env.LITELLM_MASTER_KEY,
baseURL: `${process.env.LITELLM_BASE_URL}/v1`,
});

// Esempio chat completion
const response = await litellm.chat.completions.create({
model: "gpt-4",
messages: [
{ role: "system", content: "You are a helpful assistant." },
{ role: "user", content: "Hello!" },
],
temperature: 0.7,
stream: true,
});

πŸ” Embeddings con BGE-M3​

Generazione Embeddings​

# apps/background-task/app/handlers/embeddings.py
from FlagEmbedding import BGEM3FlagModel
import numpy as np

class EmbeddingHandler:
def __init__(self):
self.model = BGEM3FlagModel(
'BAAI/bge-m3',
use_fp16=True,
device='cuda' if torch.cuda.is_available() else 'cpu'
)

def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""
Genera embeddings per lista di testi.

Returns:
List di vettori 1024-dimensionali
"""
embeddings = self.model.encode(
texts,
batch_size=32,
max_length=512,
return_dense=True,
return_sparse=False,
return_colbert_vecs=False
)

# Normalizza vettori
embeddings = embeddings['dense_vecs']
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)

return embeddings.tolist()

Chunking Strategies​

# Configurazione chunking per embeddings ottimali
CHUNKING_PRESETS = {
"default": {
"chunk_size": 512, # Token ottimali per BGE-M3
"chunk_overlap": 75, # Overlap per contesto
"strategy": "recursive"
},
"technical": {
"chunk_size": 600, # PiΓΉ grande per contenuti tecnici
"chunk_overlap": 120,
"strategy": "recursive"
},
"qa": {
"chunk_size": 200, # Piccolo per Q&A precisi
"chunk_overlap": 25,
"strategy": "sentence"
}
}

🎨 Chat API Implementation​

API Route Handler​

// app/api/v1/chat/route.ts
import { litellm } from "@/lib/ai/litellm-client";
import { searchDocuments } from "@/lib/ai/search";

export async function POST(req: Request) {
const { messages, agentId, useKnowledgeBase } = await req.json();

try {
// 1. Recupera configurazione agente
const agent = await getAgent(agentId);

// 2. RAG - Recupera contesto se richiesto
let context = "";
if (useKnowledgeBase) {
const lastMessage = messages[messages.length - 1];
const documents = await searchDocuments(
lastMessage.content,
agent.knowledgeBaseId,
);
context = formatDocumentsAsContext(documents);
}

// 3. Costruisci prompt con contesto
const systemPrompt = buildSystemPrompt(agent, context);

// 4. Stream risposta
const stream = await litellm.chat.completions.create({
model: agent.model || DEFAULT_CHAT_MODEL,
messages: [{ role: "system", content: systemPrompt }, ...messages],
temperature: agent.temperature || 0.7,
stream: true,
});

// 5. Return SSE stream
return new Response(createSSEStream(stream), {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
} catch (error) {
return handleAIError(error);
}
}

Client Integration​

// hooks/use-chat.ts
export const useChat = (agentId: string) => {
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);

const sendMessage = async (content: string) => {
const userMessage = { role: "user", content };
setMessages((prev) => [...prev, userMessage]);
setIsStreaming(true);

const response = await fetch("/api/v1/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: [...messages, userMessage],
agentId,
useKnowledgeBase: true,
}),
});

// Handle SSE stream
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let assistantMessage = { role: "assistant", content: "" };

while (true) {
const { done, value } = await reader!.read();
if (done) break;

const chunk = decoder.decode(value);
const lines = chunk.split("\n");

for (const line of lines) {
if (line.startsWith("data: ")) {
const data = JSON.parse(line.slice(6));
assistantMessage.content += data.choices[0]?.delta?.content || "";
setMessages((prev) => [...prev.slice(0, -1), assistantMessage]);
}
}
}

setIsStreaming(false);
};

return { messages, sendMessage, isStreaming };
};

πŸ”Ž Ricerca Semantica​

Milvus Integration​

# apps/background-task/app/handlers/vector_db.py
from pymilvus import Collection, connections
import numpy as np

class MilvusHandler:
def __init__(self):
connections.connect(
alias="default",
host=settings.milvus_host,
port=settings.milvus_port,
user=settings.milvus_user,
password=settings.milvus_password,
)
self.collection = Collection("document_chunks")

def search(
self,
query_vector: List[float],
knowledge_base_id: str = None,
limit: int = 10,
score_threshold: float = 0.7
) -> List[Dict]:
"""
Ricerca semantica nei chunks.
"""
# Costruisci filtro
search_params = {
"metric_type": "COSINE",
"params": {"nprobe": 10}
}

filter_expr = f"score >= {score_threshold}"
if knowledge_base_id:
filter_expr += f" and knowledge_base_id == '{knowledge_base_id}'"

# Esegui ricerca
results = self.collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=limit,
expr=filter_expr,
output_fields=["chunk_id", "content", "metadata", "score"]
)

# Formatta risultati
chunks = []
for hit in results[0]:
chunks.append({
"chunk_id": hit.entity.get("chunk_id"),
"content": hit.entity.get("content"),
"metadata": hit.entity.get("metadata"),
"score": hit.score
})

return chunks

Search API​

// app/api/v1/search/route.ts
export async function POST(req: Request) {
const { query, knowledgeBaseId, limit = 10 } = await req.json();

try {
// 1. Genera embedding per query
const queryEmbedding = await generateEmbedding(query);

// 2. Cerca in Milvus
const results = await milvusClient.search({
collection_name: "document_chunks",
vector: queryEmbedding,
filter: knowledgeBaseId
? `knowledge_base_id == "${knowledgeBaseId}"`
: undefined,
limit,
output_fields: ["chunk_id", "content", "metadata"],
});

// 3. Arricchisci risultati
const enrichedResults = await enrichSearchResults(results);

return Response.json({
results: enrichedResults,
query,
total: enrichedResults.length,
});
} catch (error) {
return handleSearchError(error);
}
}

πŸ€– MCP (Model Context Protocol)​

Server Configuration​

// lib/mcp/server-manager.ts
export class MCPServerManager {
private servers: Map<string, MCPServer> = new Map();

async initializeServer(config: MCPServerConfig) {
const server = new MCPServer({
command: config.command,
args: config.args,
env: config.env,
});

await server.start();
this.servers.set(config.id, server);

return server;
}

async callTool(serverId: string, toolName: string, args: any) {
const server = this.servers.get(serverId);
if (!server) throw new Error("Server not found");

return await server.callTool(toolName, args);
}
}

MCP Integration in Chat​

// Integrazione MCP nel flusso chat
async function handleMCPTools(message: string, mcpServers: string[]) {
const tools = await getMCPTools(mcpServers);

// Includi tools nel prompt
const response = await litellm.chat.completions.create({
model: "gpt-4",
messages: [
{
role: "system",
content: `You have access to these tools: ${JSON.stringify(tools)}`,
},
{ role: "user", content: message },
],
tools: tools,
tool_choice: "auto",
});

// Esegui tool calls se richiesti
if (response.choices[0].message.tool_calls) {
for (const toolCall of response.choices[0].message.tool_calls) {
const result = await executeMCPTool(toolCall);
// Aggiungi risultato al contesto
}
}
}

πŸ“Š Monitoring & Analytics​

AI Metrics​

// lib/ai/metrics.ts
export async function trackAIUsage(params: {
model: string;
tokens: number;
latency: number;
type: "chat" | "embedding";
}) {
await db.ai_usage.create({
data: {
model: params.model,
tokens: params.tokens,
latency: params.latency,
type: params.type,
timestamp: new Date(),
cost: calculateCost(params.model, params.tokens),
},
});
}

Performance Optimization​

# Ottimizzazioni per inferenza
class OptimizedEmbeddingHandler:
def __init__(self):
# Batch processing
self.batch_size = 32

# Model quantization
self.model = BGEM3FlagModel(
'BAAI/bge-m3',
use_fp16=True, # Half precision
device='cuda'
)

# Caching
self.cache = LRUCache(maxsize=10000)

async def process_batch(self, texts: List[str]):
# Check cache
uncached = []
cached_results = {}

for text in texts:
cache_key = hashlib.md5(text.encode()).hexdigest()
if cache_key in self.cache:
cached_results[text] = self.cache[cache_key]
else:
uncached.append(text)

# Process uncached
if uncached:
embeddings = self.model.encode(uncached, batch_size=self.batch_size)

# Update cache
for text, embedding in zip(uncached, embeddings):
cache_key = hashlib.md5(text.encode()).hexdigest()
self.cache[cache_key] = embedding

πŸ” Security Best Practices​

API Key Management​

// Validazione API keys
const validateAPIKey = async (key: string): Promise<boolean> => {
// Verifica formato
if (!key.match(/^sk-[a-zA-Z0-9]{48}$/)) {
return false;
}

// Verifica in database
const hashedKey = await hashAPIKey(key);
const exists = await db.api_keys.findFirst({
where: { hashed_key: hashedKey, active: true },
});

return !!exists;
};

Rate Limiting​

// middleware/ai-rate-limit.ts
export const aiRateLimit = rateLimit({
windowMs: 60 * 1000, // 1 minuto
max: async (req) => {
const user = await getUser(req);
return user.tier === "premium" ? 100 : 10;
},
standardHeaders: true,
legacyHeaders: false,
});

πŸ§ͺ Testing AI Features​

Unit Tests​

# tests/test_embeddings.py
import pytest
from app.handlers.embeddings import EmbeddingHandler

@pytest.fixture
def embedding_handler():
return EmbeddingHandler()

def test_embedding_generation(embedding_handler):
texts = ["Hello world", "Test embedding"]
embeddings = embedding_handler.generate_embeddings(texts)

assert len(embeddings) == 2
assert len(embeddings[0]) == 1024
assert all(-1 <= val <= 1 for val in embeddings[0])

def test_embedding_similarity(embedding_handler):
texts = ["cat", "dog", "automobile"]
embeddings = embedding_handler.generate_embeddings(texts)

# Cat e dog dovrebbero essere piΓΉ simili
cat_dog_sim = cosine_similarity(embeddings[0], embeddings[1])
cat_car_sim = cosine_similarity(embeddings[0], embeddings[2])

assert cat_dog_sim > cat_car_sim

Integration Tests​

// __tests__/api/chat.test.ts
describe("Chat API", () => {
it("should stream chat response", async () => {
const response = await fetch("/api/v1/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: [{ role: "user", content: "Hello" }],
agentId: "test-agent",
}),
});

expect(response.headers.get("content-type")).toBe("text/event-stream");

const reader = response.body!.getReader();
const { value } = await reader.read();
const chunk = new TextDecoder().decode(value);

expect(chunk).toContain("data: ");
});
});

πŸš€ Best Practices​

  1. Streaming Responses

    • Usa sempre streaming per chat lunghe
    • Implementa proper error handling
    • Mostra indicatori di caricamento
  2. Caching Strategy

    • Cache embeddings frequenti
    • Cache risultati ricerca per query comuni
    • TTL appropriato per invalidazione
  3. Error Handling

    • Fallback a modelli alternativi
    • Messaggi errore user-friendly
    • Logging dettagliato per debug
  4. Performance

    • Batch processing per embeddings
    • Lazy loading modelli
    • Connection pooling per Milvus
  5. Security

    • Sanitizza input utente
    • Rate limiting per endpoint
    • Audit logging per compliance

πŸ“š Esempi Avanzati​

RAG Pipeline Completo​

// lib/ai/rag-pipeline.ts
export class RAGPipeline {
async process(query: string, options: RAGOptions) {
// 1. Query expansion
const expandedQueries = await this.expandQuery(query);

// 2. Multi-query search
const searchResults = await Promise.all(
expandedQueries.map((q) => this.searchDocuments(q)),
);

// 3. Re-ranking
const rerankedDocs = await this.rerank(query, searchResults.flat());

// 4. Context building
const context = this.buildContext(rerankedDocs, options.maxTokens);

// 5. Generate response
return await this.generateResponse(query, context, options);
}
}

Custom Agent Implementation​

// lib/ai/custom-agent.ts
export class CustomAgent {
constructor(
private config: AgentConfig,
private tools: Tool[],
) {}

async run(input: string): Promise<AgentResponse> {
let messages = [
{ role: "system", content: this.config.systemPrompt },
{ role: "user", content: input },
];

let iterations = 0;
const maxIterations = 5;

while (iterations < maxIterations) {
const response = await litellm.chat.completions.create({
model: this.config.model,
messages,
tools: this.tools.map((t) => t.schema),
tool_choice: "auto",
});

const message = response.choices[0].message;
messages.push(message);

if (!message.tool_calls) {
return { output: message.content, iterations };
}

// Execute tools
for (const toolCall of message.tool_calls) {
const result = await this.executeTool(toolCall);
messages.push({
role: "tool",
content: JSON.stringify(result),
tool_call_id: toolCall.id,
});
}

iterations++;
}

throw new Error("Max iterations reached");
}
}

πŸ’‘ Next Steps: Consulta Background Tasks per implementare elaborazioni AI asincrone.

Questa pagina ti Γ¨ stata utile?