07-02-03 — A2A na Prática
TL;DR
Implementar A2A envolve três partes: publicar um Agent Card em /.well-known/agent.json, criar endpoint POST /tasks para receber tarefas, e opcionalmente suportar SSE streaming para updates em tempo real. O SDK Python oficial abstrai a maior parte do boilerplate. Este tópico mostra código real para um agente de análise de documentos.
Setup: SDK Python do A2A
pip install a2a-sdk
# Ou diretamente do GitHub:
pip install git+https://github.com/google-a2a/a2a-python-sdk.git
1. Agent Card: publicando sua identidade
O Agent Card é servido como endpoint HTTP estático. Todo cliente A2A faz GET em https://seu-agente.com/.well-known/agent.json antes de enviar uma task.
from a2a.types import AgentCard, AgentSkill, AgentCapabilities
agent_card = AgentCard(
name="document-analyzer",
description="Analisa documentos PDF e Word, extrai estrutura e gera resumos executivos.",
version="1.0.0",
url="https://document-agent.suaempresa.com",
capabilities=AgentCapabilities(
streaming=True,
pushNotifications=False,
stateTransitionHistory=True
),
skills=[
AgentSkill(
id="analyze_document",
name="Analisar Documento",
description="Extrai entidades, estrutura e gera resumo de documentos corporativos.",
tags=["document", "analysis", "summarization"],
examples=[
"Analise este contrato e identifique as cláusulas de rescisão",
"Resuma este relatório em 5 bullet points executivos"
],
inputModes=["text", "file"],
outputModes=["text", "data"]
)
],
defaultInputMode="text",
defaultOutputMode="text"
)
2. Implementando o Task Handler
from a2a.server import A2AServer
from a2a.types import Task, TaskStatus, TaskState, Message, TextPart, DataPart
from a2a.server.request_handlers import DefaultRequestHandler
import asyncio
class DocumentAnalyzerHandler(DefaultRequestHandler):
async def on_message_send(self, task: Task, context) -> None:
"""Handler principal — chamado quando uma task chega."""
# Pegar a mensagem do usuário
user_message = task.history[-1] if task.history else None
if not user_message:
await context.update_status(TaskState.failed, "Nenhuma mensagem recebida.")
return
# Extrair texto
text_content = ""
for part in user_message.parts:
if isinstance(part, TextPart):
text_content = part.text
break
# Reportar que estamos trabalhando
await context.update_status(
TaskState.working,
"Iniciando análise do documento..."
)
# Simulação de processamento com updates de progresso
await asyncio.sleep(1)
await context.send_message(Message(
role="agent",
parts=[TextPart(text="📄 Documento carregado. Extraindo estrutura...")]
))
await asyncio.sleep(2)
# Resultado final
analysis_result = {
"summary": f"Análise de: '{text_content[:50]}...'",
"sections": ["Introdução", "Desenvolvimento", "Conclusão"],
"key_entities": ["Partes envolvidas", "Valores", "Prazos"],
"risk_level": "médio"
}
# Enviar resultado como artefato estruturado
await context.add_artifact(
name="analysis-result",
description="Resultado da análise do documento",
parts=[DataPart(data=analysis_result, mimeType="application/json")]
)
# Concluir
await context.update_status(
TaskState.completed,
"Análise concluída com sucesso."
)
# Criar e iniciar o server
handler = DocumentAnalyzerHandler()
server = A2AServer(
agent_card=agent_card,
handler=handler,
host="0.0.0.0",
port=8080
)
if __name__ == "__main__":
server.run()
context.update_status vs context.send_message
update_status muda o estado do task (working → completed) e pode incluir uma mensagem de status. send_message adiciona uma mensagem ao histórico da conversa sem mudar o estado — use para updates intermediários visíveis ao usuário.
3. Cliente A2A: enviando uma task
from a2a.client import A2AClient
from a2a.types import Message, TextPart, SendMessageRequest
import asyncio
async def main():
# Descobrir o agente via Agent Card
client = await A2AClient.from_agent_card_url(
"https://document-agent.suaempresa.com"
)
# Criar uma task
request = SendMessageRequest(
message=Message(
role="user",
parts=[TextPart(
text="Analise este contrato de prestação de serviços e identifique "
"as principais cláusulas de risco para o contratante."
)]
)
)
# Enviar e receber updates via streaming
async with client.send_message_streaming(request) as stream:
async for update in stream:
if update.status:
print(f"[STATUS] {update.status.state}: {update.status.message}")
if update.message:
for part in update.message.parts:
if isinstance(part, TextPart):
print(f"[AGENTE] {part.text}")
if update.artifact:
print(f"[ARTEFATO] {update.artifact.name}: {update.artifact.parts}")
asyncio.run(main())
4. Multi-turn: quando o agente precisa de mais informação
Um dos diferenciais do A2A vs MCP é o suporte a input-required — o agente pode pausar e pedir mais dados:
# No handler do agente:
async def on_message_send(self, task: Task, context) -> None:
# Verificar se temos o documento
has_file = any(
isinstance(part, FilePart)
for msg in task.history
for part in msg.parts
)
if not has_file:
# Pausar e pedir o arquivo
await context.update_status(
TaskState.input_required,
"Por favor, anexe o documento que deseja analisar."
)
return
# Continuar com a análise...
await context.update_status(TaskState.working, "Analisando documento...")
Ao expor um agente A2A publicamente, valide sempre o token Bearer do cliente. A spec v1.1+ padroniza OAuth 2.0 — use o Entra ID (Azure AD) para emitir tokens se estiver no ecossistema Microsoft. Nunca exponha endpoints A2A sem autenticação em ambientes corporativos.
5. Discovery: encontrando agentes disponíveis
import httpx
async def discover_agent(base_url: str) -> dict:
"""Faz discovery do Agent Card de um agente A2A."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{base_url}/.well-known/agent.json")
response.raise_for_status()
return response.json()
# Listar skills disponíveis
async def list_available_agents(registry_url: str) -> list[dict]:
"""Consulta um registry de agentes A2A corporativo."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{registry_url}/agents")
return response.json()["agents"]