# 8.3 Harness中的多智能体编排实现

多智能体协调的理论基础和主流模式（网络、主管、层级、并行模式）已在《智能体 AI 权威指南》第五章详细介绍。

> 💡 **理论参考**：关于多智能体协作模式的核心理论、架构对比和设计权衡，请参阅《智能体 AI 权威指南》第五章(5.1-5.3)。本节聚焦于 Harness 框架中的 **实现细节** 和 **工程实践**。

## 8.3.1 Harness中的状态机驱动编排

与通用多智能体框架不同，Harness 基于 **状态机** + **消息路由** 的机制来编排多个智能体，这种设计特别适合长流程、有分支的工作流。

Harness通过将智能体的生命周期和任务流程映射到状态机节点，实现了高效的编排和协调。以下内容展示了如何在状态机中创建和管理子智能体，以及它们之间的通信机制。

### 状态机中的子智能体创建

在Harness中，每个智能体（包括主智能体和子智能体）都对应一个或多个状态节点：

```yaml
# orchestration.yaml
states:
  research:
    type: agent_invoke
    agent: ResearchAgent
    config:
      system_prompt: "You are a research specialist..."
      max_tokens: 4000
      tools: [search_web, fetch_document]
    timeout: 300
    on_success: synthesis
    on_failure: retry_research

  synthesis:
    type: agent_invoke
    agent: SynthesisAgent
    config:
      system_prompt: "You are a synthesis expert..."
      input_from: research  # 自动将research的结果作为输入
    timeout: 200
    on_success: implementation

  implementation:
    type: agent_invoke
    agent: ImplementationAgent
    config:
      system_prompt: "Execute the plan step by step..."
      parallel_workers: 3  # 可以并发创建3个子工作者
    timeout: 600
    on_success: verification

  verification:
    type: agent_invoke
    agent: VerificationAgent
    config:
      system_prompt: "Verify the implementation results..."
    on_success: done
    on_failure: request_revision
```

**Harness的状态机特性**：

1. **自动上下文传递**：从 `research` 状态的输出自动成为 `synthesis` 状态的输入，无需显式编程
2. **并发子智能体**：`parallel_workers: 3` 表示在该阶段可以并发生成最多3个子智能体处理任务分片
3. **失败恢复**：如果某个智能体失败，可以配置重试或转移到错误处理状态

### 状态机中的消息路由

多个智能体之间的通信通过 **消息队列** 和 **路由规则** 来管理：

```python
import asyncio

class OrchestrationContext:
    """编排上下文,管理多智能体间的消息"""

    def __init__(self):
        self.message_queue = asyncio.Queue()
        self.agent_channels = {}  # agent_id -> Queue
        self.state_outputs = {}   # state_name -> result

    async def send_message(self, from_agent: str, to_agent: str, msg: dict):
        """发送消息给特定智能体"""
        if to_agent not in self.agent_channels:
            self.agent_channels[to_agent] = asyncio.Queue()
        await self.agent_channels[to_agent].put(msg)

    async def broadcast_message(self, from_agent: str, msg: dict):
        """广播消息给所有智能体(用于讨论模式)"""
        for agent_id, queue in self.agent_channels.items():
            if agent_id != from_agent:  # 不发给自己
                await queue.put(msg)

    def store_state_output(self, state_name: str, result: dict):
        """存储状态的输出,用于后续状态访问"""
        self.state_outputs[state_name] = result

    def get_state_input(self, state_name: str) -> dict:
        """从前驱状态获取输入"""
        # 根据DAG定义查找前驱状态
        return self.state_outputs.get(self._get_predecessor(state_name), {})
```

## 8.3.2 消息路由与错误传播

在复杂的多智能体工作流中，关键的工程挑战是：**如何在一个智能体失败时，让其他智能体知晓，并决定是继续、重试还是回滚？**

```python
class ErrorPropagation:
    """多智能体间的错误传播"""

    def __init__(self, orchestration_context: OrchestrationContext):
        self.ctx = orchestration_context
        self.error_handlers = {}

    def register_error_handler(self, source_agent: str, handler_fn):
        """为特定智能体注册错误处理器"""
        self.error_handlers[source_agent] = handler_fn

    async def propagate_error(self, source_agent: str, error: Exception, state: str):
        """传播错误到下游智能体"""
        error_event = {
            "type": "agent_error",
            "source": source_agent,
            "state": state,
            "error_type": error.__class__.__name__,
            "error_message": str(error),
            "timestamp": time.time()
        }

        # 1. 广播错误事件
        await self.ctx.broadcast_message(source_agent, error_event)

        # 2. 触发错误处理器
        if source_agent in self.error_handlers:
            handler_result = await self.error_handlers[source_agent](error_event)

            if handler_result.action == "retry":
                # 重试该阶段
                return await self._retry_state(state)
            elif handler_result.action == "fallback":
                # 使用备选方案
                return handler_result.fallback_result
            elif handler_result.action == "abort":
                # 中止整个工作流
                raise WorkflowAbortedError(error_event)

    async def _retry_state(self, state: str, max_retries: int = 3):
        """重试某个状态"""
        for attempt in range(max_retries):
            try:
                return await self.ctx.execute_state(state)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)  # 指数退避
```

**使用示例**：

```yaml
states:
  research:
    type: agent_invoke
    agent: ResearchAgent
    on_failure:
      - action: retry
        max_retries: 3
        delay: exponential  # 2s, 4s, 8s

  synthesis:
    type: agent_invoke
    agent: SynthesisAgent
    depends_on: research
    on_failure:
      - action: fallback
        fallback_state: use_cached_synthesis  # 使用缓存的合成结果
      - action: notify
        channel: slack
        message: "Synthesis failed, manual review needed"

  implementation:
    type: agent_invoke
    agent: ImplementationAgent
    depends_on: synthesis
    error_handling:
      research_failed: abort  # 如果research失败,直接中止
      synthesis_failed: retry  # 如果synthesis失败,重试implementation
```

## 8.3.3 状态机中的上下文隔离

关键问题：当多个智能体并发运行时，如何避免它们的上下文互相污染？

Harness采用 **层级化的上下文隔离** 机制：

```python
class ExecutionContext:
    """分层执行上下文"""

    def __init__(self, workflow_id: str, parent_context: Optional['ExecutionContext'] = None):
        self.workflow_id = workflow_id
        self.parent = parent_context

        # 本层专有的变量
        self.local_vars: Dict[str, Any] = {}

        # 从父上下文继承的只读副本
        self.inherited_vars: Dict[str, Any] = parent_context.local_vars.copy() if parent_context else {}

    def get(self, key: str, default=None):
        """查询变量:先查本地,再查继承的"""
        if key in self.local_vars:
            return self.local_vars[key]
        return self.inherited_vars.get(key, default)

    def set(self, key: str, value: Any):
        """设置变量(只在本地作用域)"""
        self.local_vars[key] = value

    def commit_to_parent(self, keys: List[str]):
        """显式提交某些变量到父上下文(用于阶段间数据传递)"""
        if self.parent:
            for key in keys:
                if key in self.local_vars:
                    self.parent.set(key, self.local_vars[key])
```

**使用场景示例**：

在多个子智能体并发执行时，每个子智能体都有独立的上下文：

```python
async def execute_parallel_agents(ctx: ExecutionContext, agents: List[Agent]):
    """并发执行多个智能体,隔离上下文"""
    tasks = []

    for agent in agents:
        # 为每个智能体创建独立的子上下文
        child_ctx = ExecutionContext(f"{ctx.workflow_id}::{agent.name}", parent_context=ctx)
        task = agent.execute(child_ctx)
        tasks.append((agent.name, task, child_ctx))

    # 并发运行
    results = {}
    gathered = await asyncio.gather(
        *(task for _, task, _ in tasks),
        return_exceptions=True,
    )
    for (agent_name, _, child_ctx), result in zip(tasks, gathered):
        if isinstance(result, Exception):
            results[agent_name] = {"error": str(result)}
            continue

        results[agent_name] = result

        # 执行完毕后,父上下文可以选择性地获取某些结果
        ctx.set(f"{agent_name}_result", result)

    return results
```

## 8.3.4 多Agent 专化架构与GAN式反馈循环

在长时间运行的复杂任务中，单个Agent面临两大核心问题：

1. **上下文退化**：随着上下文窗口填满，模型性能下降，某些模型会出现“上下文焦虑”，导致过早结束任务
2. **自评偏差**：Agent倾向于高估自己的工作质量，而非提供批判性反馈

解决这些问题的关键在于**职能分离**和**对抗性反馈循环**。Harness支持GAN启发式的多Agent架构，其中不同的Agent担任专化角色。

### 三角色专化架构模式

**规划者Agent** (Planner)：负责理解需求并生成详细规划

* 输入：用户的高层需求
* 输出：分步骤的实现计划，明确定义每步的目标
* 特点：强重点于理解、分解、规范化

**生成者Agent** (Generator)：负责执行具体的实现工作

* 输入：规划者的详细计划
* 输出：实际可运行的代码、文档或设计
* 特点：强重点于执行效率，但容易高估质量

**评估者Agent** (Evaluator)：负责质量评价和问题发现

* 输入：生成者的输出，加上评估标准
* 输出：详细的质量评分、问题清单、改进建议
* 特点：强重点于批判性分析，使用多维度评估标准

```yaml
states:
  planning:
    type: agent_invoke
    agent: PlannerAgent
    config:
      system_prompt: |
        You are an expert planner specializing in breaking down complex requirements
        into concrete, measurable implementation steps. Your output must include:
        1. Refined requirements understanding
        2. Detailed step-by-step plan
        3. Success criteria for each step
        4. Potential risk factors
      max_tokens: 2000
    on_success: generation

  generation:
    type: agent_invoke
    agent: GeneratorAgent
    config:
      system_prompt: |
        You are an implementation specialist tasked with executing the detailed plan
        from the previous step. Focus on:
        1. Code quality and correctness
        2. Following the exact plan structure
        3. Documenting your implementation choices
      input_from: planning
      max_tokens: 4000
      parallel_workers: 1
    on_success: evaluation

  evaluation:
    type: agent_invoke
    agent: EvaluatorAgent
    config:
      system_prompt: |
        You are a critical evaluator. Assess the generated output across four dimensions:
        1. Design Quality: Does it follow best practices?
        2. Originality: Is it distinctive and well-crafted?
        3. Technical Craft: Is the implementation sound?
        4. Functional Completeness: Does it meet all requirements?

        For each dimension, provide:
        - 1-5 rating
        - Specific issues found
        - Recommended improvements
      input_from: generation
      max_tokens: 2000
    on_success: refinement_decision

  refinement_decision:
    type: evaluate_quality
    quality_threshold: 4.0  # 平均分需要达到4.0分
    on_pass: completion
    on_fail: refinement

  refinement:
    type: agent_invoke
    agent: GeneratorAgent
    config:
      system_prompt: |
        Based on the detailed evaluation feedback, refine and improve the implementation.
        Address each issue mentioned in the evaluation report.
      input_from: [generation, evaluation]
      max_iterations: 2  # 最多再迭代2次
    on_success: evaluation  # 再评估一次
```

### GAN式反馈循环的质量保证

这种架构产生的关键效果是**对抗性迭代**：

```python
class GANStyleEvaluationLoop:
    """GAN启发的迭代优化循环"""

    def __init__(self, generator_agent, evaluator_agent, quality_threshold: float = 4.0):
        self.generator = generator_agent
        self.evaluator = evaluator_agent
        self.quality_threshold = quality_threshold
        self.iteration_history = []

    async def iterate(self, plan: str, max_iterations: int = 3) -> dict:
        """执行GAN式的反馈循环"""
        current_output = None

        for iteration in range(max_iterations):
            # 步骤1：生成
            if iteration == 0:
                current_output = await self.generator.execute(
                    input_text=plan,
                    prompt_variant="initial_generation"
                )
            else:
                current_output = await self.generator.execute(
                    input_text=f"{plan}\n\n上一轮反馈:{previous_feedback}",
                    prompt_variant="refinement"
                )

            # 步骤2：评估
            evaluation_result = await self.evaluator.execute(
                input_text=current_output,
                evaluation_criteria={
                    "design_quality": "Does it follow best practices?",
                    "originality": "Is it distinctive and creative?",
                    "craft": "Is the implementation sound?",
                    "completeness": "Does it meet all requirements?"
                }
            )

            # 步骤3：质量判断
            avg_score = sum(evaluation_result["scores"].values()) / len(evaluation_result["scores"])

            self.iteration_history.append({
                "iteration": iteration,
                "output_length": len(current_output),
                "scores": evaluation_result["scores"],
                "avg_score": avg_score,
                "issues": evaluation_result["issues"]
            })

            # 步骤4：判断是否继续迭代
            if avg_score >= self.quality_threshold:
                return {
                    "status": "completed",
                    "final_output": current_output,
                    "final_score": avg_score,
                    "iterations": iteration + 1,
                    "history": self.iteration_history
                }
            elif iteration < max_iterations - 1:
                previous_feedback = evaluation_result["improvement_suggestions"]
            else:
                # 达到最大迭代次数
                return {
                    "status": "max_iterations_reached",
                    "final_output": current_output,
                    "final_score": avg_score,
                    "iterations": max_iterations,
                    "history": self.iteration_history
                }

        return {
            "status": "failed",
            "final_output": current_output,
            "final_score": avg_score if current_output else 0,
            "iterations": max_iterations,
            "history": self.iteration_history
        }
```

### 专化角色的效果数据

实际案例表明，这种架构相比单Agent方案的收益显著：

| 指标    | 单Agent (20分钟, $9) | 多Agent (6小时, $200) | 改进    |
| ----- | ----------------- | ------------------ | ----- |
| 功能完整性 | 核心功能损坏            | 完整实现               | +100% |
| 代码质量  | 一次性脚本             | 生产级别               | ++++  |
| 迭代次数  | 0                 | 3-4次               | N/A   |
| 总体可用性 | <20%              | >90%               | +70pp |

关键洞察：**高质量的输出需要多个视角**。规划者提供结构，生成者执行细节，评估者发现问题。这种分工使得每个Agent都能在专长领域表现最佳，避免单个Agent的认知限制。

## 8.3.5 工作流的并发执行与资源管理

在Harness中，大型工作流通常会生成多个子任务，可以并发执行以提高效率。

```yaml
states:
  batch_processing:
    type: agent_invoke
    agent: DataProcessor
    concurrency:
      max_parallel: 5          # 最多并发5个处理
      batch_size: 100          # 每批处理100条记录
      timeout_per_task: 60s
```

**Harness的并发执行器**：

```python
class ConcurrencyManager:
    """管理并发任务的执行"""

    def __init__(self, max_concurrent: int = 5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.active_tasks: Dict[str, asyncio.Task] = {}

    async def execute_task(self, task_id: str, coro):
        """限制并发数的任务执行"""
        async with self.semaphore:
            try:
                task = asyncio.create_task(coro)
                self.active_tasks[task_id] = task
                result = await task
                return {"status": "success", "result": result}
            except Exception as e:
                return {"status": "failed", "error": str(e)}
            finally:
                del self.active_tasks[task_id]

    async def execute_batch(self, items: List[Any], executor_fn, max_concurrent: int = 5):
        """批量执行任务,控制并发度"""
        self.semaphore = asyncio.Semaphore(max_concurrent)

        tasks = [
            self.execute_task(f"task_{i}", executor_fn(item))
            for i, item in enumerate(items)
        ]

        results = await asyncio.gather(*tasks)
        return results
```

## 8.3.6 本小节小结

Harness通过以下机制支持复杂的多智能体编排：

1. **状态机驱动**：每个智能体和工作流阶段都映射到状态机的节点
2. **消息路由**：智能体间通过队列和路由规则进行通信
3. **错误传播**：失败智能体的错误可以传播到下游，支持重试、降级等恢复策略
4. **上下文隔离**：分层上下文避免并发执行时的数据污染
5. **并发控制**：信号量和并发度限制保证资源不会过载
6. **专化架构**：规划-生成-评估的三角色模式通过对抗性反馈循环提升质量
7. **适应性设计**：根据模型能力动态调整架构复杂度，避免不必要的开销

关于多智能体协作的通用理论和模式对比，请参阅《智能体 AI 权威指南》第五章。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yeasy.gitbook.io/harness_engineering_guide/di-san-bu-fen-xi-tong-ji-cheng-yu-gong-cheng-shi-jian/08_orchestration/8.3_multi_agent.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
