# 9.4 Harness中的MCP集成模式

本节讨论在Harness系统级别集成多个MCP Server时的核心架构、动态工具发现与注册、Schema缓存策略、权限与审计管理，以及错误处理与降级方案。这些模式确保大规模智能体系统能够高效、安全地管理分布式的MCP Server。

## 9.4.1 系统级集成的核心挑战

当Harness需要集成多个MCP Server时，面临以下挑战：

1. **动态发现与注册**：新的Server如何自动被Harness发现？
2. **Schema缓存**：如何避免每次都重新获取Schema（省去Token和延迟）？
3. **权限与隔离**：不同Agent应该访问哪些Server？
4. **错误处理与降级**：某个Server故障时如何继续工作？
5. **审计与日志**：所有Tool调用应该被记录用于审计

## 9.4.2 动态工具注册与发现

### MCPToolRegistry

工具注册中心的实现代码如下：

```python
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import asyncio
import hashlib
import json
from datetime import datetime, timedelta

@dataclass
class MCPServerConfig:
    """MCP Server配置"""
    server_id: str
    server_name: str
    transport_type: str  # "stdio" | "http"
    endpoint: str  # 路径或URL
    enabled: bool = True
    priority: int = 0  # 优先级(用于多个Server提供相同工具时)
    timeout_seconds: int = 30
    max_retries: int = 2
    tags: List[str] = field(default_factory=list)  # 标签化分类

@dataclass
class ToolSchema:
    """缓存的工具Schema"""
    server_id: str
    tool_name: str
    description: str
    input_schema: Dict
    cached_at: datetime
    schema_hash: str

class MCPToolRegistry:
    """MCP工具注册中心"""

    def __init__(self):
        self.servers: Dict[str, MCPServerConfig] = {}
        self.tool_cache: Dict[str, ToolSchema] = {}
        self.server_clients: Dict[str, object] = {}
        self.cache_ttl_seconds = 3600  # Schema缓存1小时
        self.lock = asyncio.Lock()
        self.permission_config: Dict[str, Dict[str, List[str]]] = {}  # agent_id -> server_id -> [tool_names]

    async def register_server(self, config: MCPServerConfig) -> None:
        """注册MCP Server"""
        async with self.lock:
            self.servers[config.server_id] = config
            print(f"[Registry] Registered MCP Server: {config.server_name}")

    async def unregister_server(self, server_id: str) -> None:
        """注销MCP Server"""
        async with self.lock:
            if server_id in self.servers:
                del self.servers[server_id]
                if server_id in self.server_clients:
                    # 关闭连接
                    client = self.server_clients[server_id]
                    if hasattr(client, 'close'):
                        await client.close()
                    del self.server_clients[server_id]

                # 清除缓存
                self.tool_cache = {
                    k: v for k, v in self.tool_cache.items()
                    if v.server_id != server_id
                }

    async def discover_tools(self) -> Dict[str, List[str]]:
        """发现所有可用的工具"""
        tools_by_server = {}

        for server_id, config in self.servers.items():
            if not config.enabled:
                continue

            try:
                client = await self._get_client(server_id)
                response = await client.send_request("tools/list")

                tools = [tool["name"] for tool in response.get("result", {}).get("tools", [])]
                tools_by_server[server_id] = tools

            except Exception as e:
                print(f"[Registry] Error discovering tools from {server_id}: {e}")
                tools_by_server[server_id] = []

        return tools_by_server

    async def get_tool_schema(self, tool_name: str, server_id: Optional[str] = None) -> Optional[ToolSchema]:
        """获取工具Schema(支持缓存)"""
        async with self.lock:
            # 尝试从缓存获取
            cache_key = f"{server_id}#{tool_name}" if server_id else tool_name

            if cache_key in self.tool_cache:
                cached = self.tool_cache[cache_key]
                if datetime.now() - cached.cached_at < timedelta(seconds=self.cache_ttl_seconds):
                    return cached

            # 缓存未命中,从Server获取
            if server_id:
                servers_to_try = [server_id]
            else:
                # 尝试所有提供此工具的Server
                servers_to_try = []
                for sid, config in self.servers.items():
                    if config.enabled:
                        servers_to_try.append(sid)

            for sid in servers_to_try:
                try:
                    client = await self._get_client(sid)
                    response = await client.send_request("tools/list")

                    for tool in response.get("result", {}).get("tools", []):
                        if tool["name"] == tool_name:
                            schema = ToolSchema(
                                server_id=sid,
                                tool_name=tool_name,
                                description=tool["description"],
                                input_schema=tool["inputSchema"],
                                cached_at=datetime.now(),
                                schema_hash=self._hash_schema(tool),
                            )

                            # 缓存
                            self.tool_cache[f"{sid}#{tool_name}"] = schema
                            return schema

                except Exception as e:
                    print(f"[Registry] Error getting schema from {sid}: {e}")
                    continue

            return None

    async def call_tool(
        self,
        tool_name: str,
        arguments: Dict,
        agent_id: Optional[str] = None,
        server_id: Optional[str] = None,
    ) -> Tuple[bool, any]:
        """调用工具"""
        try:
            # 确定使用哪个Server
            if not server_id:
                server_id = await self._find_server_for_tool(tool_name)

            if not server_id:
                return False, f"Tool {tool_name} not found in any server"

            # 检查权限
            if not await self._check_permission(agent_id, server_id, tool_name):
                return False, f"Agent {agent_id} not authorized to call {tool_name}"

            # 获取client并调用
            client = await self._get_client(server_id)
            response = await client.send_request(
                "tools/call",
                {"name": tool_name, "arguments": arguments}
            )

            if "error" in response:
                return False, response["error"]["message"]

            return True, response.get("result")

        except Exception as e:
            return False, str(e)

    async def _get_client(self, server_id: str):
        """获取或创建Server客户端"""
        if server_id in self.server_clients:
            return self.server_clients[server_id]

        config = self.servers.get(server_id)
        if not config:
            raise ValueError(f"Server {server_id} not found")

        if config.transport_type == "stdio":
            from mcp_client import StdioMCPClient
            client = StdioMCPClient(config.endpoint)
            client.start()
        elif config.transport_type == "http":
            from mcp_client import HttpMCPClient
            client = HttpMCPClient(config.endpoint)
            await client.connect()
        else:
            raise ValueError(f"Unknown transport type: {config.transport_type}")

        self.server_clients[server_id] = client
        return client

    async def _find_server_for_tool(self, tool_name: str) -> Optional[str]:
        """找到提供某个工具的Server"""
        tools_by_server = await self.discover_tools()

        # 按优先级排序
        candidates = [
            (sid, self.servers[sid].priority)
            for sid, tools in tools_by_server.items()
            if tool_name in tools
        ]

        if candidates:
            candidates.sort(key=lambda x: x[1], reverse=True)
            return candidates[0][0]

        return None

    async def _check_permission(self, agent_id: Optional[str], server_id: str, tool_name: str) -> bool:
        """检查Agent是否有权限调用工具"""
        if agent_id is None:
            return False  # 匿名调用默认拒绝

        # 查询权限配置
        allowed_tools = self.permission_config.get(agent_id, {}).get(server_id, [])

        # 支持通配符
        if "*" in allowed_tools or tool_name in allowed_tools:
            return True

        print(f"[Permission Denied] agent={agent_id}, server={server_id}, tool={tool_name}")
        return False

    def _hash_schema(self, tool: Dict) -> str:
        """计算Schema的哈希值,用于判断是否变化"""
        schema_str = json.dumps(tool["inputSchema"], sort_keys=True)
        return hashlib.md5(schema_str.encode()).hexdigest()

    def get_cache_stats(self) -> Dict:
        """获取缓存统计信息"""
        return {
            "total_cached_tools": len(self.tool_cache),
            "registered_servers": len(self.servers),
            "active_clients": len(self.server_clients),
            "cache_memory_bytes": sum(len(json.dumps(v.input_schema)) for v in self.tool_cache.values()),
        }
```

### Schema缓存策略

**缓存的多层设计**

```python
import os
from dataclasses import asdict

class SchemaCache:
    """多层Schema缓存系统"""

    def __init__(self):
        # L1: 内存缓存(热工具)
        self.memory_cache: Dict[str, ToolSchema] = {}
        self.memory_cache_ttl = 3600  # 1小时

        # L2: 磁盘缓存(所有工具)
        self.disk_cache_dir = "./mcp_schema_cache"
        os.makedirs(self.disk_cache_dir, exist_ok=True)

        # L3: 远程缓存(分布式)
        self.remote_cache_enabled = False
        self.remote_cache_url = None

        # 统计
        self.hits = 0
        self.misses = 0

    async def get(self, server_id: str, tool_name: str) -> Optional[ToolSchema]:
        """获取缓存的Schema"""
        # 尝试L1
        key = f"{server_id}#{tool_name}"
        if key in self.memory_cache:
            schema = self.memory_cache[key]
            if datetime.now() - schema.cached_at < timedelta(seconds=self.memory_cache_ttl):
                self.hits += 1
                return schema

        # 尝试L2
        disk_path = self._get_disk_cache_path(server_id, tool_name)
        if os.path.exists(disk_path):
            try:
                with open(disk_path, 'r') as f:
                    data = json.load(f)
                    schema = ToolSchema(**data)
                    # 晋升到L1
                    self.memory_cache[key] = schema
                    self.hits += 1
                    return schema
            except Exception:
                pass

        # 尝试L3(可选)
        if self.remote_cache_enabled:
            try:
                schema = await self._fetch_from_remote(server_id, tool_name)
                if schema:
                    self.hits += 1
                    return schema
            except Exception:
                pass

        self.misses += 1
        return None

    async def put(self, schema: ToolSchema) -> None:
        """缓存Schema"""
        key = f"{schema.server_id}#{schema.tool_name}"

        # L1
        self.memory_cache[key] = schema

        # L2
        disk_path = self._get_disk_cache_path(schema.server_id, schema.tool_name)
        os.makedirs(os.path.dirname(disk_path), exist_ok=True)
        with open(disk_path, 'w') as f:
            json.dump(asdict(schema), f, default=str)

    def _get_disk_cache_path(self, server_id: str, tool_name: str) -> str:
        """获取磁盘缓存路径"""
        filename = f"{server_id}_{tool_name}.json"
        return os.path.join(self.disk_cache_dir, filename)

    async def _fetch_from_remote(self, server_id: str, tool_name: str) -> Optional[ToolSchema]:
        """从远程缓存获取(如Redis)"""
        # 实现省略
        pass

    def get_stats(self) -> Dict:
        """获取缓存统计"""
        total = self.hits + self.misses
        hit_rate = self.hits / total if total > 0 else 0
        return {
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": f"{hit_rate:.2%}",
            "memory_items": len(self.memory_cache),
            "disk_items": len(os.listdir(self.disk_cache_dir)),
        }
```

### 权限与审计网关

权限与审计网关的实现代码如下：

```python
class PermissionGateway:
    """权限和审计网关"""

    def __init__(self):
        self.permissions: Dict[str, List[str]] = {}  # agent_id -> [tool_names]
        self.audit_log: List[Dict] = []
        self.approval_queue: asyncio.Queue = asyncio.Queue()

    def register_permission(self, agent_id: str, tool_names: List[str]) -> None:
        """为Agent注册权限"""
        self.permissions[agent_id] = tool_names

    async def check_and_audit(
        self,
        agent_id: str,
        tool_name: str,
        arguments: Dict,
        risk_level: str = "low",
    ) -> Tuple[bool, Optional[str]]:
        """检查权限并记录审计日志"""

        # 1. 权限检查
        allowed_tools = self.permissions.get(agent_id, [])
        if tool_name not in allowed_tools:
            self._log_audit("denied", agent_id, tool_name, arguments, "Permission denied")
            return False, "Permission denied"

        # 2. 风险评估
        if risk_level == "high":
            # 需要人工审批
            approval_request = {
                "agent_id": agent_id,
                "tool_name": tool_name,
                "arguments": arguments,
                "timestamp": datetime.now(),
            }
            await self.approval_queue.put(approval_request)

            # 等待批准(超时30秒)
            try:
                approved = await asyncio.wait_for(
                    self._wait_for_approval(agent_id, tool_name),
                    timeout=30
                )
                if not approved:
                    self._log_audit("rejected", agent_id, tool_name, arguments, "Manual rejection")
                    return False, "Request rejected by human"
            except asyncio.TimeoutError:
                self._log_audit("timeout", agent_id, tool_name, arguments, "Approval timeout")
                return False, "Approval timeout"

        # 3. 记录审计日志
        self._log_audit("allowed", agent_id, tool_name, arguments, "")
        return True, None

    def _log_audit(
        self,
        action: str,
        agent_id: str,
        tool_name: str,
        arguments: Dict,
        reason: str,
    ) -> None:
        """记录审计日志"""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "action": action,
            "agent_id": agent_id,
            "tool_name": tool_name,
            "arguments": arguments,
            "reason": reason,
        }
        self.audit_log.append(entry)

        # 可以发送到外部审计系统(如ELK)
        print(f"[Audit] {action.upper()}: {agent_id}/{tool_name}")

    async def _wait_for_approval(self, agent_id: str, tool_name: str) -> bool:
        """等待人工批准"""
        # 实现省略:应该连接到审批系统
        return True

    def export_audit_log(self, start_time: datetime, end_time: datetime) -> List[Dict]:
        """导出审计日志"""
        return [
            entry for entry in self.audit_log
            if start_time <= datetime.fromisoformat(entry["timestamp"]) <= end_time
        ]
```

### 错误处理与降级策略

错误处理与降级策略的实现代码如下：

```python
class ToolCallWithFallback:
    """支持降级的工具调用"""

    def __init__(self, registry: MCPToolRegistry, gateway: PermissionGateway):
        self.registry = registry
        self.gateway = gateway
        self.fallback_handlers: Dict[str, callable] = {}

    def register_fallback(self, tool_name: str, handler: callable) -> None:
        """为某个工具注册降级方案"""
        self.fallback_handlers[tool_name] = handler

    async def call(
        self,
        tool_name: str,
        arguments: Dict,
        agent_id: str = None,
        risk_level: str = "low",
        use_fallback_on_error: bool = True,
    ) -> Tuple[bool, any, Dict]:
        """调用工具,支持降级"""

        # 检查权限
        allowed, reason = await self.gateway.check_and_audit(
            agent_id, tool_name, arguments, risk_level
        )
        if not allowed:
            return False, reason, {"fallback": False}

        # 尝试主调用
        success, result = await self.registry.call_tool(
            tool_name, arguments, agent_id
        )

        if success:
            return True, result, {"fallback": False, "source": "primary"}

        # 如果失败且有降级方案
        if use_fallback_on_error and tool_name in self.fallback_handlers:
            try:
                fallback_result = await self.fallback_handlers[tool_name](arguments)
                return True, fallback_result, {"fallback": True, "source": "fallback"}
            except Exception as e:
                return False, str(e), {"fallback": True, "error": str(e)}

        return False, result, {"fallback": False, "source": "primary"}
```

### 本小节小结

Harness级别的MCP集成需要考虑：

1. **动态发现**：通过MCPToolRegistry自动发现和注册Server
2. **Schema缓存**：多层缓存设计（内存、磁盘、远程），显著降低延迟和Token消耗
3. **权限隔离**：PermissionGateway确保Agent只能访问授权的工具
4. **审计追踪**：所有Tool调用都被记录用于合规性和调试
5. **错误降级**：Server故障时支持后备方案

Schema缓存可以减少重复工具描述带来的Token消耗；具体节省比例取决于工具数量、Schema 大小和缓存命中率，对于大规模智能体系统尤其重要。

下一节将在MiniHarness中实现完整的MCP客户端集成。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yeasy.gitbook.io/harness_engineering_guide/di-san-bu-fen-xi-tong-ji-cheng-yu-gong-cheng-shi-jian/09_mcp/9.4_integration_patterns.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
