# 8.4 智能体间通信

多智能体系统中的通信设计直接影响系统的可靠性和可扩展性。本节介绍消息传递与共享内存两种范式，以及 Claude Code 的混合通信方案、OpenClaw 的多渠道路由机制。

## 8.4.1 通信的两种范式

智能体间通信的设计直接影响系统的可靠性、可扩展性和延迟。主要有两种范式。

| 维度        | 消息传递                          | 共享内存                      |
| --------- | ----------------------------- | ------------------------- |
| **原理**    | Agent 通过发送/接收消息通信，消息包含数据和操作指令 | Agent 通过共享数据结构通信，在同一进程内实现 |
| **耦合度**   | 松耦合，Agent 不需要知道彼此内部状态         | 紧耦合，需要同步机制（锁）             |
| **延迟**    | 较高                            | 低                         |
| **可扩展性**  | 易于添加新 Agent，分布式友好             | 难以分布式，可能导致数据竞争            |
| **顺序保证**  | 困难，需要额外机制                     | 强                         |
| **实现复杂度** | 较高（需要消息队列、重试等）                | 较低                        |
| **异步支持**  | 天然支持非阻塞操作                     | 需要额外处理                    |
| **适用场景**  | 分布式系统、异步处理、系统级隔离              | 单进程多线程、低延迟要求、智能体间关系紧密     |

## 8.4.2 Claude Code的通信机制

Claude Code采用 **混合方案**：

* Task间：基于消息的通知机制(task-notification XML)
* Worker间：通过Scratchpad共享内存
* 跨进程：使用Streamable HTTP（双向HTTP流）

### Task-Notification XML消息格式

Claude Code使用XML格式的通知消息进行Task间的通信。 **注意**：以下XML格式是Claude Code内部的任务通知格式，与MCP协议不同。MCP协议本身使用JSON-RPC 2.0格式（详见第九章），而此处展示的XML格式仅用于框架内Task间的状态传递：

```xml
<task-notification>
  <id>root#local_agent#0</id>
  <state>completed</state>
  <timestamp>2025-04-04T10:30:45Z</timestamp>
  <result>
    <success>true</success>
    <output>
      <key>value</key>
    </output>
    <metrics>
      <tokens_used>1250</tokens_used>
      <duration_ms>3450</duration_ms>
    </metrics>
  </result>
  <next_tasks>
    <task_id>root#local_agent#1</task_id>
    <task_id>root#local_agent#2</task_id>
  </next_tasks>
</task-notification>
```

**消息结构详解**：

| 字段          | 含义                                            | 类型       |
| ----------- | --------------------------------------------- | -------- |
| id          | Task的唯一标识符                                    | string   |
| state       | 任务状态(pending/running/completed/failed/killed) | enum     |
| timestamp   | 消息生成时间（ISO 8601格式）                            | datetime |
| result      | 执行结果（仅当state为completed或failed时）               | object   |
| metrics     | 执行指标（Token数、耗时等）                              | object   |
| next\_tasks | 后续可执行的Task ID列表                               | array    |

### Scratchpad跨Worker共享

Scratchpad是Claude Code中的共享内存区域，多个Worker可以读写：

```python
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json
from threading import RLock

@dataclass
class ScratchpadEntry:
    """Scratchpad中的一个条目"""
    key: str
    value: Any
    created_at: datetime
    updated_at: datetime
    writer_id: str  # 最后的写者ID
    version: int = 0
    read_only: bool = False

class Scratchpad:
    """跨Worker的共享内存"""

    def __init__(self):
        self._data: Dict[str, ScratchpadEntry] = {}
        self._lock = RLock()
        self._access_log: list = []

    def put(self, key: str, value: Any, writer_id: str, read_only: bool = False) -> None:
        """写入值"""
        with self._lock:
            now = datetime.now()
            new_version = 0
            created_at = now

            if key in self._data:
                if self._data[key].read_only:
                    raise ValueError(f"Key {key} is read-only")
                new_version = self._data[key].version + 1
                created_at = self._data[key].created_at

            entry = ScratchpadEntry(
                key=key,
                value=value,
                created_at=created_at,
                updated_at=now,
                writer_id=writer_id,
                version=new_version,
                read_only=read_only,
            )
            self._data[key] = entry

            self._access_log.append({
                "timestamp": now,
                "operation": "put",
                "key": key,
                "writer_id": writer_id,
            })

    def get(self, key: str, reader_id: str, default: Optional[Any] = None) -> Any:
        """读取值"""
        with self._lock:
            if key not in self._data:
                return default

            entry = self._data[key]
            self._access_log.append({
                "timestamp": datetime.now(),
                "operation": "get",
                "key": key,
                "reader_id": reader_id,
                "version": entry.version,
            })
            return entry.value

    def get_all(self, reader_id: str) -> Dict[str, Any]:
        """获取所有键值对"""
        with self._lock:
            self._access_log.append({
                "timestamp": datetime.now(),
                "operation": "get_all",
                "reader_id": reader_id,
            })
            return {entry.key: entry.value for entry in self._data.values()}

    def delete(self, key: str, writer_id: str) -> bool:
        """删除键"""
        with self._lock:
            if key not in self._data:
                return False

            if self._data[key].read_only:
                raise ValueError(f"Key {key} is read-only")

            del self._data[key]
            self._access_log.append({
                "timestamp": datetime.now(),
                "operation": "delete",
                "key": key,
                "writer_id": writer_id,
            })
            return True

    def batch_get(self, keys: list, reader_id: str) -> Dict[str, Any]:
        """批量获取"""
        with self._lock:
            result = {}
            for key in keys:
                if key in self._data:
                    result[key] = self._data[key].value

            self._access_log.append({
                "timestamp": datetime.now(),
                "operation": "batch_get",
                "keys": keys,
                "reader_id": reader_id,
                "found": len(result),
            })
            return result

    def exists(self, key: str) -> bool:
        """检查键是否存在"""
        with self._lock:
            return key in self._data

    def get_version(self, key: str) -> int:
        """获取键的版本号"""
        with self._lock:
            return self._data[key].version if key in self._data else -1

    def watch(self, key: str, callback, reader_id: str) -> None:
        """监视键的变化(简化实现)"""
        # 这是一个简化版本,实际实现应该使用观察者模式
        pass

    def get_access_log(self, limit: int = 100) -> list:
        """获取访问日志"""
        with self._lock:
            return self._access_log[-limit:]

    def clear(self) -> None:
        """清空所有数据"""
        with self._lock:
            self._data.clear()
            self._access_log.clear()

# 使用示例:多个Worker共享数据
class Worker:
    """Worker进程"""

    def __init__(self, worker_id: str, scratchpad: Scratchpad):
        self.worker_id = worker_id
        self.scratchpad = scratchpad

    def research_phase(self):
        """Research阶段"""
        print(f"[{self.worker_id}] 执行Research阶段")
        findings = {
            "key_insights": ["洞察1", "洞察2"],
            "requirements": ["需求1", "需求2"],
        }
        self.scratchpad.put("research_findings", findings, self.worker_id)
        print(f"[{self.worker_id}] Research完成,写入Scratchpad")

    def synthesis_phase(self):
        """Synthesis阶段"""
        print(f"[{self.worker_id}] 执行Synthesis阶段")
        # 读取Research的输出
        findings = self.scratchpad.get("research_findings", self.worker_id)
        print(f"[{self.worker_id}] 读取Research结果: {findings}")

        plan = {
            "steps": ["步骤1", "步骤2", "步骤3"],
            "resources": ["资源1", "资源2"],
        }
        self.scratchpad.put("execution_plan", plan, self.worker_id)
        print(f"[{self.worker_id}] Synthesis完成,写入Scratchpad")

    def implementation_phase(self):
        """Implementation阶段"""
        print(f"[{self.worker_id}] 执行Implementation阶段")
        # 读取Synthesis的输出
        plan = self.scratchpad.get("execution_plan", self.worker_id)
        print(f"[{self.worker_id}] 读取执行计划: {plan}")

        # 执行并写入结果
        results = {"artifacts": ["工件1", "工件2"]}
        self.scratchpad.put("implementation_results", results, self.worker_id)

    def run(self):
        """执行所有阶段"""
        self.research_phase()
        self.synthesis_phase()
        self.implementation_phase()

if __name__ == "__main__":
    # 创建共享Scratchpad
    scratchpad = Scratchpad()

    # 创建Workers
    worker1 = Worker("Worker-1", scratchpad)
    worker2 = Worker("Worker-2", scratchpad)

    # 执行(演示单线程执行,实际应为多线程)
    worker1.run()

    print("\n=== 最终Scratchpad内容 ===")
    all_data = scratchpad.get_all("demo")
    for key, value in all_data.items():
        print(f"{key}: {value}")

    print("\n=== 访问日志 ===")
    logs = scratchpad.get_access_log()
    for log in logs:
        print(log)
```

## 8.4.3 OpenClaw的渠道路由

OpenClaw支持多渠道通信路由，支持20多个平台的消息分发。

### 渠道定义

多渠道通信的YAML定义示例如下：

```yaml
channels:
  # HTTP webhook渠道
  webhooks:
    type: http_post
    endpoints:
      - url: https://api.example.com/webhook
        method: POST
        headers:
          Authorization: "Bearer token"
    retry_policy:
      max_retries: 3
      backoff: exponential

  # 电子邮件渠道
  email:
    type: email
    smtp_config:
      host: smtp.example.com
      port: 587
      use_tls: true
    from_address: agent@example.com

  # Slack通知
  slack:
    type: slack
    webhook_url: "https://hooks.slack.com/services/..."
    channel: "#agent-notifications"

  # 数据库存储
  database:
    type: database
    connection_string: "postgresql://..."
    table: agent_messages

## 路由规则
routing_rules:
  - trigger: workflow_start
    channels: [webhooks, slack]
    format: json

  - trigger: workflow_complete
    channels: [email, slack]
    format: plain

  - trigger: approval_needed
    channels: [slack, database]
    format: json

  - trigger: error_occurred
    channels: [email, database]
    priority: high
    format: json
```

### 消息格式规范

为了支持多种渠道，OpenClaw定义了统一的内部消息格式，然后为每个渠道进行转换：

```python
from typing import Dict, Any, List
from dataclasses import dataclass
from enum import Enum
import json

class MessagePriority(Enum):
    """消息优先级"""
    LOW = 0
    NORMAL = 1
    HIGH = 2
    CRITICAL = 3

@dataclass
class Message:
    """统一的消息格式"""
    id: str
    source_agent_id: str
    target_agent_ids: List[str]
    message_type: str  # e.g., "task_result", "approval_request"
    payload: Dict[str, Any]
    priority: MessagePriority = MessagePriority.NORMAL
    timestamp: str = None
    ttl_seconds: int = 3600  # Time-to-live
    require_acknowledgment: bool = False

    def to_json(self) -> str:
        """转换为JSON"""
        return json.dumps({
            "id": self.id,
            "source_agent_id": self.source_agent_id,
            "target_agent_ids": self.target_agent_ids,
            "message_type": self.message_type,
            "payload": self.payload,
            "priority": self.priority.value,
            "timestamp": self.timestamp,
        })

class ChannelAdapter:
    """渠道适配器"""

    def adapt_to_slack(self, msg: Message) -> Dict[str, Any]:
        """适配到Slack格式"""
        return {
            "text": f"消息ID: {msg.id}",
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*{msg.message_type}*\n\n{json.dumps(msg.payload, indent=2)}",
                    }
                }
            ],
        }

    def adapt_to_email(self, msg: Message) -> Dict[str, str]:
        """适配到Email格式"""
        return {
            "subject": f"[{msg.message_type}] Message {msg.id}",
            "body": f"""
消息ID: {msg.id}
来源: {msg.source_agent_id}
类型: {msg.message_type}
优先级: {msg.priority.name}

内容:
{json.dumps(msg.payload, indent=2)}
            """,
        }

    def adapt_to_webhook(self, msg: Message) -> Dict[str, Any]:
        """适配到Webhook格式"""
        return {
            "event": msg.message_type,
            "message_id": msg.id,
            "source": msg.source_agent_id,
            "data": msg.payload,
            "timestamp": msg.timestamp,
        }
```

在实现了消息队列和传输适配器后，我们需要确保整个通信系统具有高可靠性和良好的性能特性。本节阐述了通信协议设计的核心原则，它们在保证消息传递质量方面起着至关重要的作用。

## 8.4.4 通信协议设计原则

### 1. 可靠性

消息传递必须保证：

* **至少一次送达(At-Least-Once)**：即使失败也会重试
* **幂等性**：重复消息不会产生重复副作用
* **确认机制**：发送者需要接收确认

```python
class ReliableMessageQueue:
    """可靠的消息队列"""

    def __init__(self):
        self.pending_messages = {}  # msg_id -> message
        self.confirmed_messages = set()

    def send(self, message: Message, max_retries: int = 3) -> bool:
        """发送消息,支持重试"""
        for attempt in range(max_retries):
            try:
                # 发送消息
                self._transmit(message)
                # 等待确认
                if self._wait_for_ack(message.id, timeout=5):
                    self.confirmed_messages.add(message.id)
                    return True
            except Exception as e:
                print(f"发送失败(第{attempt+1}次): {e}")
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # 指数退避

        return False

    def _transmit(self, message: Message):
        """实际发送"""
        self.pending_messages[message.id] = message
        # 调用实际的传输机制

    def _wait_for_ack(self, msg_id: str, timeout: int) -> bool:
        """等待确认"""
        # 实现等待逻辑
        return True

    def acknowledge(self, msg_id: str) -> None:
        """接收方确认消息"""
        if msg_id in self.pending_messages:
            del self.pending_messages[msg_id]
            self.confirmed_messages.add(msg_id)
```

### 2. 顺序保证

某些场景需要消息按顺序处理：

```python
class OrderedMessageQueue:
    """保证顺序的消息队列"""

    def __init__(self):
        self.queues: Dict[str, list] = {}  # key -> messages

    def enqueue(self, key: str, message: Message) -> None:
        """入队(保证同一key的消息顺序)"""
        if key not in self.queues:
            self.queues[key] = []
        self.queues[key].append(message)

    def dequeue(self, key: str) -> Message:
        """出队(FIFO)"""
        if key in self.queues and self.queues[key]:
            return self.queues[key].pop(0)
        return None

    def get_queue_length(self, key: str) -> int:
        """获取队列长度"""
        return len(self.queues.get(key, []))
```

### 3. 背压控制

当消息积压时，应该反馈给发送者：

```python
class BackpressureController:
    """背压控制"""

    def __init__(self, max_queue_size: int = 1000):
        self.max_queue_size = max_queue_size
        self.queue_sizes: Dict[str, int] = {}

    def can_send(self, agent_id: str) -> bool:
        """检查是否可以发送"""
        current_size = self.queue_sizes.get(agent_id, 0)
        return current_size < self.max_queue_size

    def record_send(self, agent_id: str) -> None:
        """记录发送"""
        self.queue_sizes[agent_id] = self.queue_sizes.get(agent_id, 0) + 1

    def record_delivery(self, agent_id: str) -> None:
        """记录送达"""
        if agent_id in self.queue_sizes:
            self.queue_sizes[agent_id] -= 1

    def get_backpressure_status(self, agent_id: str) -> float:
        """获取背压状态(0-1,1表示队列满)"""
        return min(1.0, self.queue_sizes.get(agent_id, 0) / self.max_queue_size)
```

## 8.4.5 本小节小结

智能体间通信是多智能体系统的血管。Claude Code采用混合方案(task-notification XML + Scratchpad)，既保证了消息的显式流动，又通过共享内存支持高效的同步操作。OpenClaw的多渠道路由系统则提供了灵活的消息分发能力。关键是设计好通信协议，确保可靠性、顺序保证和背压控制。下一节将在MiniHarness中实现完整的编排引擎。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yeasy.gitbook.io/harness_engineering_guide/di-san-bu-fen-xi-tong-ji-cheng-yu-gong-cheng-shi-jian/08_orchestration/8.4_communication.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
