# 9.3 MCP服务端开发

本节详细讲解如何从零开始构建MCP Server，包括基本开发步骤、完整的Python实现示例、关键概念说明以及错误处理方案。通过这些实现细节，你将掌握如何定义工具(Tool)、资源(Resource)和提示词(Prompt)，选择合适的传输方式，并确保Server的稳定性。

## 9.3.1 开发MCP Server的基本步骤

创建一个完整的MCP Server需要：

1. 定义Tools（可调用的函数）
2. 定义Resources（可访问的数据）
3. 定义Prompts（提示词模板）
4. 实现处理器
5. 选择传输方式并启动Server

## 9.3.2 完整的Python MCP Server实现

MCP Server的完整Python实现包括四个主要部分：工具定义、资源处理、不同传输方式的实现，以及主函数。下面是教学用的最小协议骨架，用来解释 JSON-RPC 消息结构；生产项目应优先使用官方或社区维护的 MCP SDK。

### 第一部分：数据模型与工具定义

首先需要定义数据模型来表示工具、资源和提示词：

```python
# mcp_server.py
# 一个完整的MCP Server示例,提供文件系统工具和资源访问

import json
import os
import subprocess
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import asyncio
import sys
from pathlib import Path

@dataclass
class MCPTool:
    """MCP工具定义"""
    name: str
    description: str
    inputSchema: Dict[str, Any]

@dataclass
class MCPResource:
    """MCP资源定义"""
    uri: str
    name: str
    description: str
    mimeType: str  # MIME 类型

@dataclass
class MCPPrompt:
    """MCP提示词定义"""
    name: str
    description: str
    arguments: List[Dict[str, Any]]

class MCPServerBase:
    """MCP Server基类"""

    def __init__(self):
        self.tools: Dict[str, MCPTool] = {}
        self.resources: Dict[str, MCPResource] = {}
        self.prompts: Dict[str, MCPPrompt] = {}
        self.tool_handlers: Dict[str, callable] = {}
        self.resource_readers: Dict[str, callable] = {}
        self.prompt_generators: Dict[str, callable] = {}

    def register_tool(
        self, name: str, description: str,
        input_schema: Dict[str, Any], handler: callable
    ) -> None:
        """注册工具"""
        self.tools[name] = MCPTool(name, description, input_schema)
        self.tool_handlers[name] = handler

    def register_resource(
        self, uri: str, name: str, description: str,
        mime_type: str, reader: callable
    ) -> None:
        """注册资源"""
        self.resources[uri] = MCPResource(uri, name, description, mime_type)
        self.resource_readers[uri] = reader

    def register_prompt(
        self, name: str, description: str,
        arguments: List[Dict[str, Any]], generator: callable
    ) -> None:
        """注册提示词"""
        self.prompts[name] = MCPPrompt(name, description, arguments)
        self.prompt_generators[name] = generator
```

**设计说明**：基类采用了“注册模式”(Registration Pattern)，允许在运行时动态注册工具、资源和提示词。这提供了灵活性，使不同的Server实例可以有不同的功能集合。

### 第二部分：请求处理与JSON-RPC协议

MCP使用JSON-RPC 2.0协议进行通信。以下是请求处理的核心逻辑：

```python
    async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """处理JSON-RPC请求"""
        method = request.get("method")
        params = request.get("params", {})
        request_id = request.get("id")

        try:
            if method == "initialize":
                result = self._handle_initialize()
            elif method == "tools/list":
                result = self._handle_tools_list()
            elif method == "tools/call":
                result = await self._handle_tools_call(params)
            elif method == "resources/list":
                result = self._handle_resources_list()
            elif method == "resources/read":
                result = await self._handle_resources_read(params)
            elif method == "prompts/list":
                result = self._handle_prompts_list()
            elif method == "prompts/get":
                result = await self._handle_prompts_get(params)
            else:
                return {
                    "jsonrpc": "2.0",
                    "id": request_id,
                    "error": {"code": -32601, "message": f"Method not found: {method}"},
                }

            return {
                "jsonrpc": "2.0",
                "id": request_id,
                "result": result,
            }

        except Exception as e:
            return {
                "jsonrpc": "2.0",
                "id": request_id,
                "error": {"code": -32000, "message": str(e)},
            }
```

**设计说明**：请求处理是MCP通信的“中枢”。每个请求必须返回合法的JSON-RPC 2.0响应，包括id、result或error。这确保了客户端可以准确关联请求和响应，即使在异步场景中也不会混淆。

### 第三部分：资源与提示词处理

现在实现具体的处理方法，展示工具、资源和提示词的导出：

```python
    def _handle_initialize(self) -> Dict[str, Any]:
        """处理初始化请求"""
        return {
            "protocolVersion": "2025-11-25",
            "capabilities": {
                "tools": {},
                "resources": {},
                "prompts": {},
            },
            "serverInfo": {
                "name": "example-mcp-server",
                "version": "1.0.0",
            },
        }

    def _handle_tools_list(self) -> Dict[str, Any]:
        """列出所有工具"""
        tools_list = [asdict(tool) for tool in self.tools.values()]
        return {"tools": tools_list}

    async def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """调用工具"""
        tool_name = params.get("name")
        arguments = params.get("arguments", {})

        if tool_name not in self.tool_handlers:
            raise ValueError(f"Tool not found: {tool_name}")

        handler = self.tool_handlers[tool_name]
        result = await handler(arguments)

        return {
            "content": [
                {
                    "type": "text",
                    "text": result,
                }
            ]
        }

    def _handle_resources_list(self) -> Dict[str, Any]:
        """列出所有资源"""
        resources_list = [asdict(res) for res in self.resources.values()]
        return {"resources": resources_list}

    async def _handle_resources_read(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """读取资源"""
        uri = params.get("uri")

        if uri not in self.resource_readers:
            raise ValueError(f"Resource not found: {uri}")

        reader = self.resource_readers[uri]
        content = await reader()

        return {
            "contents": [
                {
                    "uri": uri,
                    "mimeType": self.resources[uri].mimeType,
                    "text": content,
                }
            ]
        }

    def _handle_prompts_list(self) -> Dict[str, Any]:
        """列出所有提示词"""
        prompts_list = [asdict(prompt) for prompt in self.prompts.values()]
        return {"prompts": prompts_list}

    async def _handle_prompts_get(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """获取提示词"""
        name = params.get("name")
        arguments = params.get("arguments", {})

        if name not in self.prompt_generators:
            raise ValueError(f"Prompt not found: {name}")

        generator = self.prompt_generators[name]
        messages = await generator(arguments)

        return {"messages": messages}
```

**设计说明**：这些处理方法遵循了一致的模式：列表方法返回所有已注册的对象、调用方法执行对应的处理器。注意 `_handle_prompts_get` 支持参数，使提示词能够根据不同的上下文动态生成。

### 第四部分：文件系统实现与传输

最后，我们实现一个具体的文件系统Server和两种传输方式。以下展示文件系统工具的实现：

```python
class FileSystemMCPServer(MCPServerBase):
    """提供文件系统访问的MCP Server"""

    def __init__(self, root_path: str = "."):
        super().__init__()
        self.root_path = Path(root_path).resolve()

        # 注册三个文件系统工具
        self.register_tool(
            name="read_file",
            description="Read contents of a file",
            input_schema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "Path to the file"},
                    "encoding": {
                        "type": "string",
                        "enum": ["utf-8", "ascii", "latin-1"],
                        "description": "File encoding (default: utf-8)",
                    },
                },
                "required": ["path"],
            },
            handler=self.handle_read_file,
        )

        self.register_tool(
            name="write_file",
            description="Write contents to a file",
            input_schema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "Path to the file"},
                    "contents": {"type": "string", "description": "File contents"},
                },
                "required": ["path", "contents"],
            },
            handler=self.handle_write_file,
        )

        self.register_tool(
            name="list_directory",
            description="List files in a directory",
            input_schema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "Directory path"},
                },
                "required": ["path"],
            },
            handler=self.handle_list_directory,
        )

    async def handle_read_file(self, args: Dict[str, Any]) -> str:
        """处理读文件请求"""
        file_path = self._resolve_path(args["path"])
        encoding = args.get("encoding", "utf-8")
        with open(file_path, "r", encoding=encoding) as f:
            contents = f.read()
        return f"File contents:\n\n{contents}"

    async def handle_write_file(self, args: Dict[str, Any]) -> str:
        """处理写文件请求"""
        file_path = self._resolve_path(args["path"])
        contents = args["contents"]
        file_path.parent.mkdir(parents=True, exist_ok=True)
        with open(file_path, "w") as f:
            f.write(contents)
        return f"File written: {file_path}"

    async def handle_list_directory(self, args: Dict[str, Any]) -> str:
        """处理列出目录请求"""
        dir_path = self._resolve_path(args["path"])
        if not dir_path.is_dir():
            return f"Not a directory: {dir_path}"
        items = []
        for item in dir_path.iterdir():
            item_type = "dir" if item.is_dir() else "file"
            items.append(f"  [{item_type}] {item.name}")
        return f"Contents of {dir_path}:\n" + "\n".join(items)

    def _resolve_path(self, path: str) -> Path:
        """解析路径并严格校验不逃逸根目录"""
        resolved = (self.root_path / path).resolve()
        try:
            resolved.relative_to(self.root_path.resolve())
        except ValueError as e:
            raise ValueError(f"Path traversal not allowed: {path}") from e
        return resolved
```

**设计说明**：文件系统Server展示了三个关键的安全做法：(1) 限制可访问的根路径；(2) 验证路径不会逃出根目录；(3) 在执行操作前解析并验证所有路径。这是处理用户输入的敏感操作（如文件访问）的标准防御方式。上面代码使用 `resolved.relative_to(self.root_path.resolve())` 进行边界检查是最佳实践——相比字符串前缀比较（`resolved.startswith()`），它能正确处理Windows路径、符号链接和相对路径。完整的五层递进式路径校验实践（长度检查、URL编码双解码、Unicode规范化、平台特定规范化、符号链接解析与边界检查）详见第 12.4 节。

### 第五部分：传输层实现

MCP支持多种传输方式。以下展示stdio和HTTP两种传输的简化实现：

```python
class StdioMCPServer:
    """使用stdio传输的MCP Server"""

    def __init__(self, server: MCPServerBase):
        self.server = server

    async def run(self) -> None:
        """运行服务器"""
        loop = asyncio.get_running_loop()

        while True:
            try:
                # 读取一行JSON
                line = await loop.run_in_executor(None, sys.stdin.readline)
                if not line:
                    break

                request = json.loads(line.strip())
                response = await self.server.handle_request(request)

                # 写入响应
                json_line = json.dumps(response)
                sys.stdout.write(json_line + "\n")
                sys.stdout.flush()

            except json.JSONDecodeError as e:
                sys.stderr.write(f"JSON decode error: {e}\n")
            except Exception as e:
                sys.stderr.write(f"Error: {e}\n")
```

**stdio传输的优势**：(1) 无需网络配置，完全通过管道通信；(2) 天然支持进程隔离；(3) 无序列化开销，只需逐行JSON处理。这使stdio成为本地工具集成的理想选择。

```python
try:
    from aiohttp import web
    HAS_AIOHTTP = True
except ImportError:
    HAS_AIOHTTP = False

class HttpMCPServer:
    """使用Streamable HTTP传输的MCP Server"""

    def __init__(
        self,
        server: MCPServerBase,
        host: str = "127.0.0.1",
        port: int = 8000,
        bearer_token: Optional[str] = None,
        allowed_origins: Optional[set] = None,
    ):
        self.server = server
        self.host = host
        self.port = port
        self.bearer_token = bearer_token
        self.allowed_origins = allowed_origins or {"http://localhost:8000"}
        self.clients = {}
        if not HAS_AIOHTTP:
            raise RuntimeError("aiohttp not installed. Install it with: pip install aiohttp")

    def _check_security(self, request):
        origin = request.headers.get("Origin")
        if origin and origin not in self.allowed_origins:
            return web.Response(status=403, text="Origin not allowed")
        if self.bearer_token:
            expected = f"Bearer {self.bearer_token}"
            if request.headers.get("Authorization") != expected:
                return web.Response(status=401, text="Unauthorized")
        return None

    async def handle_mcp_post(self, request):
        """处理POST /mcp(接收请求并返回响应)"""
        security_error = self._check_security(request)
        if security_error:
            return security_error
        accept = request.headers.get("Accept", "")
        if "application/json" not in accept or "text/event-stream" not in accept:
            return web.Response(status=406, text="Accept must include JSON and SSE")
        data = await request.json()
        response = await self.server.handle_request(data)
        return web.json_response(response)

    async def handle_mcp_get(self, request):
        """处理GET /mcp(建立流连接)"""
        security_error = self._check_security(request)
        if security_error:
            return security_error
        if "text/event-stream" not in request.headers.get("Accept", ""):
            return web.Response(status=405, text="SSE stream requires text/event-stream")
        response = web.StreamResponse()
        response.content_type = "text/event-stream"
        response.headers["Cache-Control"] = "no-cache"
        await response.prepare(request)
        client_id = id(request)
        self.clients[client_id] = response
        try:
            while True:
                await asyncio.sleep(1)
        finally:
            del self.clients[client_id]
        return response

    async def run(self):
        """启动Streamable HTTP服务器"""
        app = web.Application()
        app.router.add_post("/mcp", self.handle_mcp_post)
        app.router.add_get("/mcp", self.handle_mcp_get)
        runner = web.AppRunner(app)
        await runner.setup()
        site = web.TCPSite(runner, self.host, self.port)
        await site.start()
        print(f"Streamable HTTP MCP Server running at http://{self.host}:{self.port}")
        await asyncio.Event().wait()
```

**HTTP传输的优势**：(1) 支持远程访问，跨机器通信；(2) GET流连接用于Server推送事件；(3) 标准的HTTP协议，易于在网络中部署和监控。

### 第六部分：启动函数

最后，主函数根据命令行参数选择传输方式：

```python
async def main():
    """主函数"""
    # 创建文件系统MCP Server
    fs_server = FileSystemMCPServer(root_path=".")

    # 选择传输方式
    if len(sys.argv) > 1 and sys.argv[1] == "--http":
        # 使用HTTP传输
        transport = HttpMCPServer(fs_server)
        await transport.run()
    else:
        # 使用stdio传输(默认)
        transport = StdioMCPServer(fs_server)
        await transport.run()

if __name__ == "__main__":
    asyncio.run(main())
```

### Server开发的关键概念

#### 1. 工具定义的JSON Schema

工具的`inputSchema`应该是完整的JSON Schema，包含：

* `type`: 必须是“object”
* `properties`: 参数定义
* `required`: 必需参数列表

```json
{
  "type": "object",
  "properties": {
    "name": {
      "type": "string",
      "description": "User's full name"
    },
    "age": {
      "type": "integer",
      "minimum": 0,
      "maximum": 150
    },
    "tags": {
      "type": "array",
      "items": {"type": "string"}
    }
  },
  "required": ["name"],
  "additionalProperties": false
}
```

#### 2. 资源URI规范

Resources应该有结构化的URI（统一资源标识符）：

```yaml
file:///path/to/document.md
db://postgres/public/users/123
notion://page_abc123def456
github://owner/repo/issues/42
```

#### 3. 提示词参数化

Prompts可以接受参数，在生成消息时使用这些参数：

```python
async def generate_code_review_prompt(self, args: Dict[str, Any]) -> List[Dict]:
    """生成代码审查提示词"""
    language = args.get("language", "Python")
    focus_areas = args.get("focus_areas", "correctness,performance")

    prompt = f"""You are an expert code reviewer for {language}.
Review the following code focusing on: {focus_areas}

Provide:
1. Issues found (if any)
2. Suggestions for improvement
3. Overall assessment
"""

    return [
        {
            "role": "user",
            "content": {
                "type": "text",
                "text": prompt,
            }
        }
    ]
```

### 错误处理

MCP Server应该正确处理和报告错误：

```python
class MCPServerError(Exception):
    """MCP Server错误"""

    def __init__(self, code: int, message: str):
        self.code = code
        self.message = message

    def to_json_rpc_error(self) -> Dict[str, Any]:
        return {
            "code": self.code,
            "message": self.message,
        }

# 标准错误码
class MCPErrorCode(Enum):
    PARSE_ERROR = -32700
    INVALID_REQUEST = -32600
    METHOD_NOT_FOUND = -32601
    INVALID_PARAMS = -32602
    INTERNAL_ERROR = -32603
    SERVER_ERROR = -32000  # 到 -32099
```

### 本小节小结

开发MCP Server的核心是：

1. 定义Tool、Resource和Prompt的Schema
2. 实现对应的处理器
3. 选择合适的传输方式
4. 正确处理错误和异常

关键要点：

* JSON Schema应该准确且完整
* 资源URI应该有明确的结构
* 提示词应该支持参数化
* 错误响应应该遵循JSON-RPC 2.0规范

下一节将讨论Harness如何在系统级别集成多个MCP Server。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yeasy.gitbook.io/harness_engineering_guide/di-san-bu-fen-xi-tong-ji-cheng-yu-gong-cheng-shi-jian/09_mcp/9.3_server_dev.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
