复制 from typing import Dict, List, Optional, Any
from enum import Enum
import json
import aiohttp
class A2AMessageFormat(Enum):
"""A2A消息格式标准"""
JSON_RPC = "json-rpc" # JSON-RPC 2.0标准
GRPC = "grpc" # gRPC Protocol Buffers
REST = "rest" # RESTful API
MQTT = "mqtt" # MQTT Pub/Sub
class A2AMessage:
"""标准的A2A消息格式"""
def __init__(self, message_id: str, from_agent: str, to_agent: str,
message_type: str, payload: Dict, priority: int = 5):
self.message_id = message_id
self.from_agent = from_agent
self.to_agent = to_agent
self.message_type = message_type # "request", "response", "notification"
self.payload = payload
self.priority = priority
self.timestamp = datetime.utcnow().isoformat()
def to_json_rpc(self) -> Dict:
"""转换为JSON-RPC 2.0格式"""
return {
"jsonrpc": "2.0",
"method": f"{self.from_agent}.{self.message_type}",
"params": {
"to_agent": self.to_agent,
"message_id": self.message_id,
"payload": self.payload,
"priority": self.priority,
"timestamp": self.timestamp
},
"id": self.message_id
}
@staticmethod
def from_json_rpc(rpc_message: Dict) -> 'A2AMessage':
"""从JSON-RPC消息解析"""
method_parts = rpc_message["method"].split(".")
from_agent = method_parts[0]
message_type = method_parts[1]
params = rpc_message["params"]
return A2AMessage(
message_id=params["message_id"],
from_agent=from_agent,
to_agent=params["to_agent"],
message_type=message_type,
payload=params["payload"],
priority=params.get("priority", 5)
)
class OpenClawA2AAdapter:
"""OpenClaw与A2A协议的适配器"""
def __init__(self, agent_registry: Dict[str, Any]):
self.agent_registry = agent_registry
self.message_format = A2AMessageFormat.JSON_RPC
self.http_session = None
async def initialize(self):
"""初始化适配器"""
self.http_session = aiohttp.ClientSession()
async def send_a2a_message(self, message: A2AMessage) -> Dict:
"""
发送A2A消息到其他OpenClaw Agent
支持的寻址方式:
1. 本地Agent:通过Agent注册表直接调用
2. 远程Agent:通过HTTP/gRPC调用
3. 外部框架Agent:通过适配器转换
"""
# 步骤1:解析目标Agent
target_agent = await self._resolve_agent(message.to_agent)
if not target_agent:
return {
"status": "error",
"error": f"Target agent {message.to_agent} not found"
}
# 步骤2:根据Agent类型选择通信方式
if target_agent["location"] == "local":
return await self._send_local_message(message, target_agent)
elif target_agent["location"] == "remote":
return await self._send_remote_message(message, target_agent)
elif target_agent["framework"] in ["crewai", "langgraph"]:
return await self._send_external_framework_message(message, target_agent)
async def receive_a2a_message(self, request_data: Dict) -> Dict:
"""
接收来自其他Agent的A2A消息
用于OpenClaw Agent的HTTP endpoint
"""
try:
# 解析消息
message = A2AMessage.from_json_rpc(request_data)
# 查找目标Agent
if message.to_agent not in self.agent_registry:
return {
"status": "error",
"error": f"Agent {message.to_agent} not found"
}
# 将消息路由到Agent
agent = self.agent_registry[message.to_agent]
result = await agent.process_external_message(message)
# 返回响应
return {
"status": "success",
"message_id": message.message_id,
"result": result
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
async def _resolve_agent(self, agent_id: str) -> Optional[Dict]:
"""
Agent发现机制:解析Agent位置和通信方式
支持的寻址格式:
- local://agent_id:本地Agent
- remote://host:port/agent_id:远程OpenClaw Agent
- crewai://agent_id:CrewAI Agent
- langgraph://graph_id/node_id:LangGraph节点
"""
if agent_id.startswith("local://"):
agent_name = agent_id.replace("local://", "")
if agent_name in self.agent_registry:
return {
"location": "local",
"agent_id": agent_name,
"agent": self.agent_registry[agent_name]
}
elif agent_id.startswith("remote://"):
# 解析远程地址
url_part = agent_id.replace("remote://", "")
parts = url_part.split("/")
host_port = parts[0]
agent_name = "/".join(parts[1:])
return {
"location": "remote",
"agent_id": agent_name,
"endpoint": f"http://{host_port}/agents/{agent_name}/message"
}
elif agent_id.startswith("crewai://"):
agent_name = agent_id.replace("crewai://", "")
return {
"location": "external",
"framework": "crewai",
"agent_id": agent_name
}
elif agent_id.startswith("langgraph://"):
graph_info = agent_id.replace("langgraph://", "")
return {
"location": "external",
"framework": "langgraph",
"graph_info": graph_info
}
return None
async def _send_local_message(self, message: A2AMessage,
target_agent: Dict) -> Dict:
"""本地消息传递(直接函数调用)"""
agent = target_agent["agent"]
# 直接调用Agent的消息处理方法
result = await agent.process_external_message(message)
return {
"status": "success",
"message_id": message.message_id,
"result": result
}
async def _send_remote_message(self, message: A2AMessage,
target_agent: Dict) -> Dict:
"""远程消息传递(HTTP调用)"""
endpoint = target_agent["endpoint"]
try:
async with self.http_session.post(
endpoint,
json=message.to_json_rpc(),
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
if resp.status == 200:
result = await resp.json()
return result
else:
return {
"status": "error",
"error": f"HTTP {resp.status}",
"message_id": message.message_id
}
except asyncio.TimeoutError:
return {
"status": "error",
"error": "Remote agent timeout",
"message_id": message.message_id
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"message_id": message.message_id
}
async def _send_external_framework_message(self, message: A2AMessage,
target_agent: Dict) -> Dict:
"""发送到外部框架Agent(通过适配器转换)"""
framework = target_agent["framework"]
if framework == "crewai":
return await self._send_to_crewai(message, target_agent)
elif framework == "langgraph":
return await self._send_to_langgraph(message, target_agent)
return {
"status": "error",
"error": f"Unsupported framework: {framework}"
}
async def _send_to_crewai(self, message: A2AMessage,
target_agent: Dict) -> Dict:
"""
发送到CrewAI Agent
消息格式转换:
A2A Message → CrewAI Task Input
"""
# 示例:将A2A消息转换为CrewAI任务
crewai_task_input = {
"task_description": message.payload.get("task", ""),
"context": message.payload.get("context", {}),
"callback_url": f"openclaw://callback/{message.message_id}"
}
# 通过HTTP POST发送到CrewAI agent服务
# (假设CrewAI Agent有HTTP接口)
return {
"status": "forwarded",
"message_id": message.message_id,
"target_framework": "crewai"
}
async def _send_to_langgraph(self, message: A2AMessage,
target_agent: Dict) -> Dict:
"""
发送到LangGraph节点
消息格式转换:
A2A Message → LangGraph State
"""
graph_info = target_agent.get("graph_info", "")
# 将消息转换为LangGraph state输入
langgraph_input = {
"messages": [
{
"role": "user",
"content": message.payload.get("task", "")
}
],
"metadata": {
"source_agent": message.from_agent,
"message_id": message.message_id,
"callback_url": f"openclaw://callback/{message.message_id}"
}
}
# 通过LangServe或直接API调用LangGraph graph
# (这里是示意,实际需要根据LangGraph的部署方式)
return {
"status": "forwarded",
"message_id": message.message_id,
"target_framework": "langgraph"
}
class OpenClawA2AHTTPServer:
"""OpenClaw A2A HTTP Server - 用于接收来自其他Agent的消息"""
def __init__(self, adapter: OpenClawA2AAdapter):
self.adapter = adapter
self.app = None
def setup_routes(self, app):
"""设置HTTP路由"""
@app.post("/agents/{agent_id}/message")
async def receive_message(agent_id: str, request):
"""
接收A2A消息的端点
请求示例:
POST /agents/research_coordinator/message
{
"jsonrpc": "2.0",
"method": "data_analyst.request",
"params": {
"to_agent": "research_coordinator",
"message_id": "msg-123",
"payload": {
"task": "分析销售数据"
},
"priority": 7,
"timestamp": "2026-03-05T10:00:00Z"
},
"id": "msg-123"
}
"""
body = await request.json()
# 使用适配器处理消息
response = await self.adapter.receive_a2a_message(body)
return web.json_response(response)
@app.get("/agents")
async def list_agents(request):
"""列出所有可用Agent"""
agents = list(self.adapter.agent_registry.keys())
return web.json_response({
"agents": agents,
"count": len(agents)
})
@app.get("/agents/{agent_id}/info")
async def get_agent_info(agent_id: str, request):
"""获取Agent信息"""
if agent_id not in self.adapter.agent_registry:
return web.json_response(
{"error": f"Agent {agent_id} not found"},
status=404
)
agent = self.adapter.agent_registry[agent_id]
return web.json_response({
"agent_id": agent_id,
"framework": "openclaw",
"capabilities": agent.get_capabilities(),
"address": f"local://{agent_id}"
})