# 2.4 层间接口设计

良好的接口设计是模块化系统的基础。本节讨论Harness系统中各层之间的通信接口，包括消息格式、工具调用协议、事件流规范等。

## 2.4.1 统一的消息格式

Harness系统中流动的核心数据对象是消息(Message)。无论是智能体与LLM的通信，还是运行时引擎与工具层的通信，都通过结构化的消息传递。

### 消息的通用结构

消息对象是系统内部通信的基本单位，其定义如下：

```python
from typing import Optional, Any, Literal
from datetime import datetime
from enum import Enum
import uuid

class MessageRole(str, Enum):
    """消息的角色"""
    USER = "user"
    ASSISTANT = "assistant"
    TOOL = "tool"
    SYSTEM = "system"

class MessageType(str, Enum):
    """消息的类型"""
    TEXT = "text"
    TOOL_CALL = "tool_call"
    TOOL_RESULT = "tool_result"
    FUNCTION_CALL = "function_call"
    EVENT = "event"

class Message:
    """通用消息基类"""

    def __init__(
        self,
        role: MessageRole,
        type: MessageType,
        content: str,
        message_id: str = None,
        parent_id: Optional[str] = None,
        metadata: dict = None,
        timestamp: datetime = None
    ):
        self.message_id = message_id or str(uuid.uuid4())
        self.role = role
        self.type = type
        self.content = content
        self.parent_id = parent_id
        self.metadata = metadata or {}
        self.timestamp = timestamp or datetime.now()

    def to_dict(self) -> dict:
        """转换为字典,便于序列化"""
        return {
            "message_id": self.message_id,
            "role": self.role.value,
            "type": self.type.value,
            "content": self.content,
            "parent_id": self.parent_id,
            "metadata": self.metadata,
            "timestamp": self.timestamp.isoformat()
        }

    @classmethod
    def from_dict(cls, data: dict) -> "Message":
        """从字典反序列化"""
        return cls(
            role=MessageRole(data["role"]),
            type=MessageType(data["type"]),
            content=data["content"],
            message_id=data.get("message_id"),
            parent_id=data.get("parent_id"),
            metadata=data.get("metadata", {}),
            timestamp=datetime.fromisoformat(data["timestamp"])
        )
```

### 工具调用消息

当智能体决定调用一个工具时，它发送一个`TOOL_CALL`类型的消息：

```python
class ToolCallMessage(Message):
    """工具调用消息"""

    def __init__(
        self,
        tool_name: str,
        tool_params: dict,
        agent_id: str,
        **kwargs
    ):
        super().__init__(
            role=MessageRole.ASSISTANT,
            type=MessageType.TOOL_CALL,
            content=f"Calling tool: {tool_name}",
            metadata={
                "tool_name": tool_name,
                "tool_params": tool_params,
                "agent_id": agent_id
            },
            **kwargs
        )

    @property
    def tool_name(self) -> str:
        return self.metadata["tool_name"]

    @property
    def tool_params(self) -> dict:
        return self.metadata["tool_params"]
```

### 工具结果消息

当工具执行完成后，返回一个`TOOL_RESULT`类型的消息：

```python
class ToolResultMessage(Message):
    """工具执行结果消息"""

    def __init__(
        self,
        tool_call_id: str,
        tool_name: str,
        status: str,
        output: Any = None,
        error: str = None,
        **kwargs
    ):
        super().__init__(
            role=MessageRole.TOOL,
            type=MessageType.TOOL_RESULT,
            content=output if isinstance(output, str) else str(output),
            parent_id=tool_call_id,
            metadata={
                "tool_name": tool_name,
                "status": status,  # "success", "error", "timeout"
                "output": output,
                "error": error
            },
            **kwargs
        )

    @property
    def status(self) -> str:
        return self.metadata["status"]

    @property
    def is_success(self) -> bool:
        return self.status == "success"
```

## 2.4.2 工具调用协议

工具调用协议定义了如何在Agent和工具之间进行通信。

### 调用请求的标准格式

Agent向工具发起调用时使用的请求格式定义如下：

```python
import uuid
import json

class ToolCallRequest:
    """工具调用请求"""

    def __init__(
        self,
        tool_id: str,                 # 工具的唯一标识
        parameters: dict,              # 工具参数
        call_id: str = None,          # 调用的唯一标识
        timeout_seconds: int = 30,
        retry_count: int = 0,
        metadata: dict = None
    ):
        self.tool_id = tool_id
        self.parameters = parameters
        self.call_id = call_id or str(uuid.uuid4())
        self.timeout_seconds = timeout_seconds
        self.retry_count = retry_count
        self.metadata = metadata or {}

    def to_json(self) -> str:
        """转换为JSON格式"""
        return json.dumps({
            "tool_id": self.tool_id,
            "parameters": self.parameters,
            "call_id": self.call_id,
            "timeout_seconds": self.timeout_seconds,
            "retry_count": self.retry_count,
            "metadata": self.metadata
        })

    @classmethod
    def from_json(cls, json_str: str) -> "ToolCallRequest":
        """从JSON反序列化"""
        data = json.loads(json_str)
        return cls(
            tool_id=data["tool_id"],
            parameters=data["parameters"],
            call_id=data.get("call_id"),
            timeout_seconds=data.get("timeout_seconds", 30),
            retry_count=data.get("retry_count", 0)
        )
```

### 调用响应的标准格式

工具执行完毕后返回的响应格式定义如下：

```python
from typing import Any, Literal
import json

class ToolCallResponse:
    """工具调用响应"""

    def __init__(
        self,
        call_id: str,
        status: Literal["success", "error", "timeout", "permission_denied"],
        result: Any = None,
        error_message: str = None,
        error_type: str = None,
        execution_time_ms: float = None,
        metadata: dict = None
    ):
        self.call_id = call_id
        self.status = status
        self.result = result
        self.error_message = error_message
        self.error_type = error_type
        self.execution_time_ms = execution_time_ms
        self.metadata = metadata or {}

    def to_json(self) -> str:
        """转换为JSON格式"""
        return json.dumps({
            "call_id": self.call_id,
            "status": self.status,
            "result": self.result,
            "error_message": self.error_message,
            "error_type": self.error_type,
            "execution_time_ms": self.execution_time_ms,
            "metadata": self.metadata
        }, default=str)

    @classmethod
    def from_json(cls, json_str: str) -> "ToolCallResponse":
        """从JSON反序列化"""
        data = json.loads(json_str)
        return cls(
            call_id=data["call_id"],
            status=data["status"],
            result=data.get("result"),
            error_message=data.get("error_message"),
            error_type=data.get("error_type"),
            execution_time_ms=data.get("execution_time_ms")
        )
```

## 2.4.3 事件流规范

Harness系统内部通过事件流进行异步通信。这允许不同的子系统在解耦的状态下协作。

### 事件的通用结构

事件是系统内部异步通信的基本单位，其结构定义如下：

```python
import uuid
from datetime import datetime

class Event:
    """事件基类"""

    def __init__(
        self,
        event_type: str,
        source: str,  # 事件来源,如"runtime_engine", "tool_executor"
        data: dict,
        event_id: str = None,
        timestamp: datetime = None
    ):
        self.event_id = event_id or str(uuid.uuid4())
        self.event_type = event_type
        self.source = source
        self.data = data
        self.timestamp = timestamp or datetime.now()

    def to_dict(self) -> dict:
        return {
            "event_id": self.event_id,
            "event_type": self.event_type,
            "source": self.source,
            "data": self.data,
            "timestamp": self.timestamp.isoformat()
        }

# 定义不同的事件类型
class EventType:
    """事件类型常量"""

    # 运行时引擎事件
    TASK_STARTED = "task_started"
    STEP_STARTED = "step_started"
    STEP_COMPLETED = "step_completed"
    TASK_COMPLETED = "task_completed"
    TASK_FAILED = "task_failed"

    # 工具执行事件
    TOOL_CALL_REQUESTED = "tool_call_requested"
    TOOL_CALL_APPROVED = "tool_call_approved"
    TOOL_CALL_REJECTED = "tool_call_rejected"
    TOOL_EXECUTION_STARTED = "tool_execution_started"
    TOOL_EXECUTION_SUCCEEDED = "tool_execution_succeeded"
    TOOL_EXECUTION_FAILED = "tool_execution_failed"

    # 权限事件
    PERMISSION_CHECK_PASSED = "permission_check_passed"
    PERMISSION_CHECK_FAILED = "permission_check_failed"
    APPROVAL_REQUEST_CREATED = "approval_request_created"
    APPROVAL_REQUEST_APPROVED = "approval_request_approved"
    APPROVAL_REQUEST_REJECTED = "approval_request_rejected"

    # 记忆事件
    MEMORY_UPDATED = "memory_updated"
```

### 事件订阅系统

事件总线实现发布-订阅模式，允许多个订阅者监听特定事件：

```python
from typing import Callable
import asyncio

class EventBus:
    """事件总线,支持发布-订阅模式"""

    def __init__(self):
        self.subscribers: dict[str, list[Callable]] = {}
        self.event_history: list[Event] = []

    def subscribe(
        self,
        event_type: str,
        handler: Callable[[Event], None]
    ) -> str:
        """
        订阅某个事件类型。

        返回subscription_id,可用于后续取消订阅。
        """
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []

        self.subscribers[event_type].append(handler)
        return f"sub_{event_type}_{len(self.subscribers[event_type])}"

    def unsubscribe(self, event_type: str, handler: Callable) -> None:
        """取消订阅"""
        if event_type in self.subscribers:
            self.subscribers[event_type].remove(handler)

    async def publish(self, event: Event) -> None:
        """发布事件"""
        # 存储到事件历史
        self.event_history.append(event)

        # 调用所有订阅者
        handlers = self.subscribers.get(event.event_type, [])
        for handler in handlers:
            try:
                if asyncio.iscoroutinefunction(handler):
                    await handler(event)
                else:
                    handler(event)
            except Exception as e:
                # 记录处理器的异常,但不中断其他处理器
                print(f"Error in event handler: {e}")

    def get_event_history(
        self,
        event_type: str = None,
        source: str = None,
        limit: int = 100
    ) -> list[Event]:
        """查询事件历史"""
        results = self.event_history
        if event_type:
            results = [e for e in results if e.event_type == event_type]
        if source:
            results = [e for e in results if e.source == source]
        return results[-limit:]
```

### 事件驱动的架构示例

以下是一个完整的事件驱动架构使用示例，展示了多个子系统如何通过事件进行协作：

```python
class EventDrivenRuntime:
    """事件驱动的运行时引擎"""

    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus

        # 订阅权限事件
        self.event_bus.subscribe(
            EventType.APPROVAL_REQUEST_APPROVED,
            self._on_approval_approved
        )

        # 订阅工具执行事件
        self.event_bus.subscribe(
            EventType.TOOL_EXECUTION_SUCCEEDED,
            self._on_tool_succeeded
        )

    async def execute_task(self, task: Task) -> None:
        """执行任务"""
        # 发布任务开始事件
        await self.event_bus.publish(Event(
            event_type=EventType.TASK_STARTED,
            source="runtime_engine",
            data={"task_id": task.id, "task_description": task.description}
        ))

        # ... 执行任务 ...

        # 发布任务完成事件
        await self.event_bus.publish(Event(
            event_type=EventType.TASK_COMPLETED,
            source="runtime_engine",
            data={"task_id": task.id, "result": "success"}
        ))

    async def _on_approval_approved(self, event: Event) -> None:
        """处理审批批准事件"""
        tool_call_id = event.data["tool_call_id"]
        # 继续执行被暂停的工具调用
        ...

    async def _on_tool_succeeded(self, event: Event) -> None:
        """处理工具执行成功事件"""
        # 更新内部状态
        ...
```

## 2.4.4 三大参考系统的接口对比

三个参考系统在接口设计上体现了不同语言和架构范式的取舍。

### OpenAI Codex的接口设计

Codex 使用 Rust 的 trait 和 enum 定义接口，通过类型系统在编译期保证安全性：

```rust
/// 执行策略的决策结果
enum Decision {
    Allow,                    // 直接执行
    Prompt(String),           // 需要用户审批,附带理由
    Forbidden(String),        // 禁止执行
}

/// 技能(Skill)的标准元数据
struct SkillMetadata {
    name: String,
    description: String,
    instructions: String,
    dependencies: Vec<String>,
}

/// 沙箱执行的统一接口
trait SandboxExecutor {
    fn execute(&self, command: &str, working_dir: &Path) -> Result<Output>;
    fn is_available(&self) -> bool;
}
```

特点：

* Rust 的所有权系统和 `Result` 类型保证内存安全和错误处理的完备性
* execpolicy 通过 Starlark DSL 定义规则，与 Rust 核心解耦
* MCP 工具和内置技能共享统一的调用接口

### Claude Code的接口设计

Claude Code使用TypeScript的类型系统定义接口，确保编译时的类型安全：

```typescript
interface SDKMessage {
  id: string;
  role: "user" | "assistant" | "tool";
  content: string | ContentBlock[];
  metadata?: Record<string, any>;
}

interface ToolUseBlock {
  type: "tool_use";
  id: string;
  name: string;
  input: Record<string, any>;
}

interface ToolResultBlock {
  type: "tool_result";
  tool_use_id: string;
  content: string | Record<string, any>;
  is_error?: boolean;
}
```

特点：

* 类型安全的接口定义
* 支持流式处理(streaming)
* 灵活的内容块(ContentBlock)设计

### OpenClaw的接口设计

OpenClaw使用WebSocket协议进行通信，消息格式基于帧(Frame)：

**WebSocket帧结构：**

| 帧类型       | 负载                           |
| --------- | ---------------------------- |
| HEARTBEAT | {agent\_id, timestamp}       |
| TASK      | {task\_id, description, ...} |
| ACTION    | {action\_type, tool\_call}   |
| RESULT    | {action\_id, result\_data}   |
| CONTROL   | {command: pause/resume/stop} |

特点：

* 长连接保证高实时性
* 帧类型的设计明确了各种通信场景
* Heartbeat机制支持长期运行的Agent

## 2.4.5 版本控制和向后兼容性

当Harness系统演进时，接口的向后兼容性变得重要。

```python
class VersionedMessage:
    """带版本的消息,支持向后兼容"""

    VERSION = "1.0"

    def __init__(
        self,
        role: MessageRole,
        type: MessageType,
        content: str,
        version: str = None,
        **kwargs
    ):
        self.version = version or self.VERSION
        # ... 其他初始化 ...

    @classmethod
    def from_dict(cls, data: dict) -> "VersionedMessage":
        """从字典反序列化,处理版本兼容性"""
        version = data.get("version", "1.0")

        if version == "1.0":
            return cls._from_v1_0(data)
        elif version == "2.0":
            return cls._from_v2_0(data)
        else:
            raise ValueError(f"Unsupported message version: {version}")

    @classmethod
    def _from_v1_0(cls, data: dict) -> "VersionedMessage":
        """从v1.0格式反序列化"""
        # 处理v1.0特定的字段和格式
        return cls(
            role=MessageRole(data.get("role")),
            type=MessageType(data.get("type")),
            content=data.get("content"),
            version="1.0"
        )

    @classmethod
    def _from_v2_0(cls, data: dict) -> "VersionedMessage":
        """从v2.0格式反序列化"""
        # 处理v2.0特定的字段和格式
        return cls(
            role=MessageRole(data.get("role")),
            type=MessageType(data.get("type")),
            content=data.get("content"),
            version="2.0"
        )
```

## 2.4.6 总结

良好的接口设计包括：

1. **统一的消息格式**：所有层间通信都使用结构化的消息
2. **工具调用协议**：标准化的请求和响应格式
3. **事件流规范**：异步通信的统一事件定义
4. **版本控制**：为系统演进预留向后兼容的机制

这些接口的清晰定义，使得Harness系统的各个组件可以独立开发和测试，同时保持整体的一致性。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yeasy.gitbook.io/harness_engineering_guide/di-yi-bu-fen-harness-gong-cheng-ji-chu/02_architecture/2.4_interfaces.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
