14-01-03 — Caching (Semantic Cache), Batching e Streaming
TL;DR
Três técnicas de performance e custo: Semantic Cache (retorna resposta em cache quando pergunta é semanticamente similar — reduz custo em até 40%), Batching (agrupa requests para APIs com desconto por lote, ideal para processamento offline), e Streaming (envia tokens conforme gerados — reduz percepção de latência para zero mesmo com TTL alto).
Semantic Cache
Cache tradicional é exato: "Qual é o preço do produto X?" só bate no cache se digitado identicamente. Semantic Cache usa embeddings para detectar perguntas semanticamente equivalentes e retornar a resposta em cache.
Como funciona:
- Query entra → gera embedding da query
- Busca no cache (Redis com RediSearch ou Azure Cache for Redis + vector search)
- Se encontrar resposta com similaridade > threshold → retorna do cache (0ms + 0 tokens)
- Se não encontrar → processa normalmente → armazena no cache com embedding
import redis
import numpy as np
from openai import AzureOpenAI
client = AzureOpenAI(...)
redis_client = redis.Redis(host="seu-redis.redis.cache.windows.net", ssl=True)
SIMILARITY_THRESHOLD = 0.92 # Ajustar por caso de uso
def get_embedding(text: str) -> list[float]:
resp = client.embeddings.create(model="text-embedding-3-large", input=text)
return resp.data[0].embedding
def semantic_cache_get(query: str) -> str | None:
query_embedding = get_embedding(query)
# Busca no Redis usando FT.SEARCH com vector similarity
results = redis_client.ft("cache-idx").search(
f"*=>[KNN 1 @embedding $vec AS score]",
query_params={"vec": np.array(query_embedding, dtype=np.float32).tobytes()}
)
if results.total > 0:
doc = results.docs[0]
score = float(doc.score)
if score >= SIMILARITY_THRESHOLD:
return doc.response # Cache hit!
return None # Cache miss
def semantic_cache_set(query: str, response: str):
embedding = get_embedding(query)
redis_client.hset(f"cache:{hash(query)}", mapping={
"query": query,
"response": response,
"embedding": np.array(embedding, dtype=np.float32).tobytes()
})
redis_client.expire(f"cache:{hash(query)}", 3600 * 24) # TTL 24h
Taxa de cache hit por tipo de aplicação:
- FAQ interno / helpdesk: 40-60% de hit rate → economia significativa
- Chatbot de produto com perguntas similares: 25-40%
- Análise de documentos únicos: 5-10% (pouco ganho)
- Código gerado: baixíssimo (queries muito específicas)
Batching
Para workloads assíncronos e não urgentes, agrupar requests em lotes reduz custo.
Azure OpenAI Batch API: Processa requests em arquivo JSONL e retorna resultados em até 24h. Custo 50% menor que API síncrona. Ideal para:
- Indexação de documentos para RAG (milhares de arquivos)
- Geração de embeddings em massa
- Avaliação de qualidade de dataset em bulk
- Extração de dados de documentos históricos
import json
from openai import AzureOpenAI
client = AzureOpenAI(...)
# Cria arquivo de batch
requests = [
{"custom_id": f"doc-{i}", "method": "POST", "url": "/chat/completions",
"body": {"model": "gpt-4o", "messages": [
{"role": "user", "content": f"Resuma: {doc}"}
]}}
for i, doc in enumerate(documents)
]
# Upload do arquivo JSONL
with open("batch_requests.jsonl", "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")
file = client.files.create(file=open("batch_requests.jsonl", "rb"), purpose="batch")
# Submete batch
batch = client.batches.create(
input_file_id=file.id,
endpoint="/chat/completions",
completion_window="24h"
)
print(f"Batch ID: {batch.id} — Status: {batch.status}")
# Verificar status depois com client.batches.retrieve(batch.id)
Streaming
Em vez de esperar a resposta completa, o servidor envia tokens conforme são gerados. Para o usuário, a experiência parece instantânea mesmo que o tempo total seja o mesmo.
// Streaming com Azure OpenAI SDK (.NET)
using Azure.AI.OpenAI;
var client = new AzureOpenAIClient(new Uri(endpoint), new AzureKeyCredential(apiKey));
var chatClient = client.GetChatClient("gpt-4o");
var streamingUpdates = chatClient.CompleteChatStreamingAsync(
new ChatMessage[] {
new SystemChatMessage("Você é um assistente de análise jurídica."),
new UserChatMessage(userQuery)
}
);
await foreach (var update in streamingUpdates)
{
foreach (var part in update.ContentUpdate)
{
Console.Write(part.Text); // Tokens chegam em tempo real
// No contexto web: yield via SSE ou WebSocket
}
}
Quando NÃO usar streaming:
- Quando você precisa pós-processar a resposta completa antes de exibir (ex: Content Safety check, JSON parsing)
- APIs batch onde latência não importa
- Quando o cliente não suporta SSE/WebSocket