# 8.5 实战：为MiniHarness添加编排引擎

本节展示编排引擎的设计原理和关键概念。完整代码见 `lab/mini_harness/orchestration/engine.py`。

## 8.5.1 架构需求

编排引擎需要解决四个核心问题：

1. **任务生命周期管理**：从注册到执行、完成或失败
2. **依赖关系处理**：确保任务按正确的顺序执行
3. **状态机驱动**：使用FSM定义工作流的阶段和转移条件
4. **子Agent隔离**：为每个子Agent创建独立的执行上下文

## 8.5.2 数据模型设计

编排引擎使用四个核心枚举和数据类：

```python
class TaskType(Enum):
    LOCAL_BASH = "local_bash"
    LOCAL_AGENT = "local_agent"
    IN_PROCESS_TEAMMATE = "in_process_teammate"
    WORKFLOW = "workflow"

class TaskState(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    KILLED = "killed"

@dataclass
class TaskDefinition:
    task_id: str
    task_type: TaskType
    description: str
    dependencies: List[str] = field(default_factory=list)
    timeout_seconds: int = 300
    max_retries: int = 1
```

**设计决策**：使用数据类分离「定义」（配置）和「执行」（状态）。TaskDefinition是只读的任务描述，TaskExecution记录运行时的状态变化。这种分离使得同一个任务可以重新执行而不产生冲突。

## 8.5.3 任务管理器

### 依赖检查机制

TaskManager的核心是依赖管理。检查任务是否可执行：

```python
def can_execute(self, task_id: str) -> Tuple[bool, Optional[str]]:
    task = self.get_task(task_id)
    for dep_id in task.dependencies:
        dep_exec = self.get_execution(dep_id)
        if not dep_exec or dep_exec.state != TaskState.COMPLETED:
            return False, f"Dependency {dep_id} not completed"
    return True, None
```

此方法在执行主循环中被频繁调用，确保任务只在依赖完全满足时才运行。

### 通知队列

完整代码见 `lab/mini_harness/orchestration/engine.py`。

当任务完成或失败时，TaskManager发出通知。这允许工作流响应Task状态变化：

```python
def mark_completed(self, task_id: str, result: Dict[str, Any]) -> None:
    exec_record = self.executions[task_id]
    exec_record.state = TaskState.COMPLETED
    exec_record.result = result
    self._emit_notification(task_id, TaskState.COMPLETED, result)

def _emit_notification(self, task_id: str, state: TaskState,
                       result: Optional[Dict[str, Any]]) -> None:
    next_tasks = self._find_dependent_tasks(task_id)
    notification = TaskNotification(
        task_id=task_id,
        state=state,
        next_tasks=next_tasks,
        timestamp=datetime.now(),
    )
    self.notification_queue.append(notification)
```

**关键设计**：通知中包含 `next_tasks` 列表，这是现在可以执行的后续任务。这支持高效的事件驱动调度。

## 8.5.4 工作流状态机

### FSM基础架构

工作流使用经典的有限状态机模式：

```python
class WorkflowStateMachine:
    def __init__(self):
        self.states: Dict[str, StateDefinition] = {}
        self.transitions: List[TransitionDefinition] = []
        self.current_state: Optional[str] = None
        self.context: Dict[str, Any] = {}

    def find_next_state(self) -> Optional[str]:
        for transition in self.transitions:
            if transition.from_state != self.current_state:
                continue
            if transition.condition is None:
                return transition.to_state
            try:
                if transition.condition(self.context):
                    return transition.to_state
            except Exception as e:
                continue
        return None
```

**设计考虑**：转移条件是可选的。无条件转移自动执行，有条件转移基于当前上下文计算。如果条件抛出异常，状态机会记录但继续尝试其他转移。

### 执行日志

状态机记录所有状态变化及其上下文快照：

```python
def _log_state_entry(self, state_id: str) -> None:
    self.execution_log.append({
        "timestamp": datetime.now().isoformat(),
        "event": "state_entry",
        "state": state_id,
        "context_snapshot": dict(self.context),
    })
```

这支持调试和可观测性，完整日志保存在执行结果中。

## 8.5.5 智能体上下文隔离

### 上下文变量设计

子Agent需要与父Agent隔离，但仍可访问继承的值。使用Python的ContextVar实现线程安全的隔离：

```python
_agent_context: ContextVar = ContextVar('agent_context', default=None)

class AgentContext:
    def __init__(self, agent_id: str,
                 parent_context: Optional[Dict] = None):
        self.agent_id = agent_id
        self.variables: Dict[str, Any] = {}
        self.parent_context = parent_context or {}

    def get(self, key: str, default: Any = None) -> Any:
        if key in self.variables:
            return self.variables[key]
        return self.parent_context.get(key, default)

    def __enter__(self):
        self.token = _agent_context.set(self)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        _agent_context.reset(self.token)
```

**关键设计**：双层查找（本地优先，回退到父）实现变量遮蔽。上下文作为context manager使用，确保异步代码中的正确清理。

## 8.5.6 编排引擎主循环

### 执行循环

完整代码见 `lab/mini_harness/orchestration/engine.py`。

编排引擎的核心是主执行循环，它驱动Task和状态转移：

```python
import asyncio

async def execute_workflow(self, initial_state: str,
                          context: Dict[str, Any],
                          max_iterations: int = 100) -> Dict[str, Any]:
    self.state_machine.initialize(initial_state, context)
    iterations = 0

    while iterations < max_iterations:
        if self.state_machine.is_final_state():
            break
        if self.state_machine.is_error_state():
            break

        # 执行可执行的Task
        for task_id, task in self.task_manager.tasks.items():
            can_exec, _ = self.task_manager.can_execute(task_id)
            if can_exec:
                result = await self._execute_task(task_id, task)
                if result["success"]:
                    self.task_manager.mark_completed(task_id, result)
                else:
                    self.task_manager.mark_failed(task_id, result["error"])

        # 处理通知并转移状态
        # 注:在 asyncio 取消时正确清理通知队列
        try:
            while (notification := self.task_manager.get_notification()):
                # 处理通知逻辑
                pass
        except asyncio.CancelledError:
            # 优雅处理任务取消,清空待处理通知
            self.task_manager.clear_notifications()
            raise

        next_state = self.state_machine.find_next_state()
        if next_state:
            self.state_machine.transition(next_state)

        iterations += 1
```

这种「任务优先」循环模式优先执行所有准备好的Task，然后在没有Task可执行时尝试状态转移。

### 任务类型处理

编排引擎根据任务类型分发执行：

```python
async def _execute_task(self, task_id: str,
                       task: TaskDefinition) -> Dict[str, Any]:
    try:
        if task.task_type == TaskType.LOCAL_BASH:
            return {"success": True, "output": f"Executed {task_id}"}
        elif task.task_type == TaskType.IN_PROCESS_TEAMMATE:
            if self.subagent_factory:
                subagent = self.subagent_factory.create_subagent(
                    f"subagent_{task_id}", task.task_type
                )
                result = await subagent.execute(task,
                    self.state_machine.context.copy())
                return {"success": True, "result": result}
        else:
            return {"success": False, "error": "Unknown task type"}
    except Exception as e:
        return {"success": False, "error": str(e)}
```

上面我们详细介绍了编排引擎的各个核心组件和设计模式。现在通过完整的实际示例，展示如何将这些组件组合在一起，构建一个完整的工作流系统。

## 8.5.7 使用示例

### 工作流定义

以下示例展示了如何使用编排引擎定义一个简单的状态工作流，包括起点、研究和完成三个状态及其转换。

```python
engine = OrchestrationEngine()

states = [
    StateDefinition("start", StateType.INITIAL, "起点"),
    StateDefinition("research", StateType.NORMAL, "研究"),
    StateDefinition("complete", StateType.FINAL, "完成"),
]

transitions = [
    TransitionDefinition("start", "research"),
    TransitionDefinition("research", "complete"),
]

engine.setup_workflow(states, transitions)
```

### 任务依赖

以下示例展示了如何定义带有依赖关系的任务，编排引擎会按照依赖顺序自动调度执行。

```python
tasks = [
    TaskDefinition("task_0", TaskType.LOCAL_BASH, "数据收集"),
    TaskDefinition("task_1", TaskType.LOCAL_AGENT, "数据分析",
                   dependencies=["task_0"]),
    TaskDefinition("task_2", TaskType.IN_PROCESS_TEAMMATE, "模型训练",
                   dependencies=["task_1"]),
]

engine.register_tasks(tasks)
engine.initialize_subagent_factory("main_agent")

result = await engine.execute_workflow("start", {"data": "sample"})
```

## 8.5.8 本小节小结

编排引擎通过以下设计实现高效的工作流：

* **清晰的分层**：TaskManager处理依赖，StateMachine处理阶段
* **事件驱动**：通知队列让调度器对完成做出反应
* **上下文隔离**：ContextVar保护子Agent免受状态污染
* **可观测性**：完整的日志和指标收集

完整实现见 `lab/mini_harness/orchestration/engine.py`。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yeasy.gitbook.io/harness_engineering_guide/di-san-bu-fen-xi-tong-ji-cheng-yu-gong-cheng-shi-jian/08_orchestration/8.5_miniharness_orchestration.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
