14-01-03 — Caching (Semantic Cache), Batching e Streaming

⏱ 12 minFontes validadas em: 2026-04-29

TL;DR

Três técnicas de performance e custo: Semantic Cache (retorna resposta em cache quando pergunta é semanticamente similar — reduz custo em até 40%), Batching (agrupa requests para APIs com desconto por lote, ideal para processamento offline), e Streaming (envia tokens conforme gerados — reduz percepção de latência para zero mesmo com TTL alto).

Semantic Cache

Cache tradicional é exato: "Qual é o preço do produto X?" só bate no cache se digitado identicamente. Semantic Cache usa embeddings para detectar perguntas semanticamente equivalentes e retornar a resposta em cache.

Como funciona:

  1. Query entra → gera embedding da query
  2. Busca no cache (Redis com RediSearch ou Azure Cache for Redis + vector search)
  3. Se encontrar resposta com similaridade > threshold → retorna do cache (0ms + 0 tokens)
  4. Se não encontrar → processa normalmente → armazena no cache com embedding
import redis
import numpy as np
from openai import AzureOpenAI

client = AzureOpenAI(...)
redis_client = redis.Redis(host="seu-redis.redis.cache.windows.net", ssl=True)

SIMILARITY_THRESHOLD = 0.92  # Ajustar por caso de uso

def get_embedding(text: str) -> list[float]:
    resp = client.embeddings.create(model="text-embedding-3-large", input=text)
    return resp.data[0].embedding

def semantic_cache_get(query: str) -> str | None:
    query_embedding = get_embedding(query)
    
    # Busca no Redis usando FT.SEARCH com vector similarity
    results = redis_client.ft("cache-idx").search(
        f"*=>[KNN 1 @embedding $vec AS score]",
        query_params={"vec": np.array(query_embedding, dtype=np.float32).tobytes()}
    )
    
    if results.total > 0:
        doc = results.docs[0]
        score = float(doc.score)
        if score >= SIMILARITY_THRESHOLD:
            return doc.response  # Cache hit!
    
    return None  # Cache miss

def semantic_cache_set(query: str, response: str):
    embedding = get_embedding(query)
    redis_client.hset(f"cache:{hash(query)}", mapping={
        "query": query,
        "response": response,
        "embedding": np.array(embedding, dtype=np.float32).tobytes()
    })
    redis_client.expire(f"cache:{hash(query)}", 3600 * 24)  # TTL 24h
🔷 Azure AI Foundry + Semantic Cache: O Azure API Management tem um policy de semantic cache nativo desde 2024 — configura-se no portal sem código. Para APIs de Azure OpenAI via APIM, isso é a solução mais simples.

Taxa de cache hit por tipo de aplicação:

  • FAQ interno / helpdesk: 40-60% de hit rate → economia significativa
  • Chatbot de produto com perguntas similares: 25-40%
  • Análise de documentos únicos: 5-10% (pouco ganho)
  • Código gerado: baixíssimo (queries muito específicas)

Batching

Para workloads assíncronos e não urgentes, agrupar requests em lotes reduz custo.

Azure OpenAI Batch API: Processa requests em arquivo JSONL e retorna resultados em até 24h. Custo 50% menor que API síncrona. Ideal para:

  • Indexação de documentos para RAG (milhares de arquivos)
  • Geração de embeddings em massa
  • Avaliação de qualidade de dataset em bulk
  • Extração de dados de documentos históricos
import json
from openai import AzureOpenAI

client = AzureOpenAI(...)

# Cria arquivo de batch
requests = [
    {"custom_id": f"doc-{i}", "method": "POST", "url": "/chat/completions",
     "body": {"model": "gpt-4o", "messages": [
         {"role": "user", "content": f"Resuma: {doc}"}
     ]}}
    for i, doc in enumerate(documents)
]

# Upload do arquivo JSONL
with open("batch_requests.jsonl", "w") as f:
    for req in requests:
        f.write(json.dumps(req) + "\n")

file = client.files.create(file=open("batch_requests.jsonl", "rb"), purpose="batch")

# Submete batch
batch = client.batches.create(
    input_file_id=file.id,
    endpoint="/chat/completions",
    completion_window="24h"
)

print(f"Batch ID: {batch.id} — Status: {batch.status}")
# Verificar status depois com client.batches.retrieve(batch.id)

Streaming

Em vez de esperar a resposta completa, o servidor envia tokens conforme são gerados. Para o usuário, a experiência parece instantânea mesmo que o tempo total seja o mesmo.

💡 Percepção de latência: Um modelo que gera 500 tokens em 10s parece "lento" se o usuário espera 10s para ver algo. Com streaming, o primeiro token aparece em ~0.5s e o usuário lê enquanto o modelo gera. A percepção de velocidade melhora drasticamente.
// Streaming com Azure OpenAI SDK (.NET)
using Azure.AI.OpenAI;

var client = new AzureOpenAIClient(new Uri(endpoint), new AzureKeyCredential(apiKey));
var chatClient = client.GetChatClient("gpt-4o");

var streamingUpdates = chatClient.CompleteChatStreamingAsync(
    new ChatMessage[] {
        new SystemChatMessage("Você é um assistente de análise jurídica."),
        new UserChatMessage(userQuery)
    }
);

await foreach (var update in streamingUpdates)
{
    foreach (var part in update.ContentUpdate)
    {
        Console.Write(part.Text); // Tokens chegam em tempo real
        // No contexto web: yield via SSE ou WebSocket
    }
}

Quando NÃO usar streaming:

  • Quando você precisa pós-processar a resposta completa antes de exibir (ex: Content Safety check, JSON parsing)
  • APIs batch onde latência não importa
  • Quando o cliente não suporta SSE/WebSocket

Como isso se conecta

  • 14-02-01 — caching e batching são técnicas centrais de otimização de custo
  • 14-01-01 — RAG se beneficia de semantic cache nas queries frequentes
  • 14-03-01 — não usar streaming em UIs interativas é anti-pattern de UX

Fontes

  1. Azure OpenAI — Batch API
  2. Azure APIM — Semantic Cache Policy
  3. Azure OpenAI — Streaming completions
  4. Redis — Vector Similarity Search