import aiohttp
import json
import asyncio
from typing import Dict, Any, Callable, Optional
class StreamableHttpMCPClient:
"""基于Streamable HTTP的MCP客户端"""
def __init__(self, server_url: str):
self.server_url = server_url.rstrip('/')
self.request_id = 0
self.session: Optional[aiohttp.ClientSession] = None
self.pending_responses: Dict[int, asyncio.Future] = {}
async def connect(self) -> None:
"""建立连接"""
self.session = aiohttp.ClientSession()
# 启动后台任务监听流
asyncio.create_task(self._listen_stream())
async def _listen_stream(self) -> None:
"""监听双向HTTP流"""
try:
# Streamable HTTP使用单一端点进行双向通信
async with self.session.get(f"{self.server_url}/mcp") as resp:
async for line in resp.content:
if not line:
continue
line = line.decode().strip()
if line:
try:
data = json.loads(line)
request_id = data.get("id")
if request_id in self.pending_responses:
future = self.pending_responses.pop(request_id)
future.set_result(data)
except json.JSONDecodeError:
pass
except Exception as e:
print(f"Stream listener error: {e}")
async def send_request(
self, method: str, params: Dict[str, Any] = None
) -> Dict[str, Any]:
"""发送请求并等待响应"""
self.request_id += 1
request = {
"jsonrpc": "2.0",
"id": self.request_id,
"method": method,
}
if params:
request["params"] = params
# 创建Future等待响应
future: asyncio.Future = asyncio.Future()
self.pending_responses[self.request_id] = future
# 通过双向流发送请求
async with self.session.post(
f"{self.server_url}/mcp",
json=request,
) as resp:
if resp.status != 200:
raise RuntimeError(f"HTTP {resp.status}")
# 等待响应(带超时)
try:
response = await asyncio.wait_for(future, timeout=30)
return response
except asyncio.TimeoutError:
self.pending_responses.pop(self.request_id, None)
raise RuntimeError(f"Request {self.request_id} timeout")
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""调用工具"""
return await self.send_request(
"tools/call",
{"name": tool_name, "arguments": arguments}
)
async def close(self) -> None:
"""关闭连接"""
if self.session:
await self.session.close()
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
# Server端实现示例
from aiohttp import web
import asyncio
class StreamableHttpMCPServer:
"""基于Streamable HTTP的MCP服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 8000):
self.host = host
self.port = port
self.clients = {} # 保存流连接
self.tools = {}
def register_tool(self, name: str, handler):
"""注册工具"""
self.tools[name] = handler
async def handle_mcp(self, request):
"""处理MCP的双向流通信"""
# 检查是否是GET请求(接收流)还是POST请求(发送数据)
if request.method == "POST":
data = await request.json()
method = data.get("method")
request_id = data.get("id")
result = await self._process_request(method, data.get("params", {}))
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": result,
}
return web.json_response(response)
elif request.method == "GET":
# 建立流连接
response = web.StreamResponse()
response.content_type = "application/x-ndjson"
response.headers["Cache-Control"] = "no-cache"
await response.prepare(request)
client_id = id(request)
self.clients[client_id] = response
try:
# 保持连接开放
while True:
await asyncio.sleep(1)
finally:
del self.clients[client_id]
return response
async def broadcast_message(self, message: Dict[str, Any]) -> None:
"""广播消息给所有客户端"""
for response in self.clients.values():
try:
await response.write(
(json.dumps(message) + "\n").encode()
)
except Exception:
pass
async def _process_request(self, method: str, params: Dict) -> Dict[str, Any]:
"""处理请求(具体业务逻辑)"""
if method == "tools/list":
return {"tools": list(self.tools.keys())}
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name in self.tools:
return await self.tools[tool_name](arguments)
return {"error": f"Tool {tool_name} not found"}
return {}
async def start(self):
"""启动服务器"""
app = web.Application()
# Streamable HTTP使用单一端点
app.router.add_post("/mcp", self.handle_mcp)
app.router.add_get("/mcp", self.handle_mcp)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
print(f"Streamable HTTP MCP Server started at http://{self.host}:{self.port}")
# Streamable HTTP传输使用示例
async def main():
async with StreamableHttpMCPClient("http://localhost:8000") as client:
result = await client.send_request("tools/list")
print("Tools:", result)