8.5 实战:为MiniHarness添加编排引擎

本节展示编排引擎的设计原理和关键概念。完整代码见 lab/mini_harness/orchestration/engine.py

8.5.1 架构需求

编排引擎需要解决四个核心问题:

  1. 任务生命周期管理:从注册到执行、完成或失败

  2. 依赖关系处理:确保任务按正确的顺序执行

  3. 状态机驱动:使用FSM定义工作流的阶段和转移条件

  4. 子Agent隔离:为每个子Agent创建独立的执行上下文

8.5.2 数据模型设计

编排引擎使用四个核心枚举和数据类:

class TaskType(Enum):
    LOCAL_BASH = "local_bash"
    LOCAL_AGENT = "local_agent"
    IN_PROCESS_TEAMMATE = "in_process_teammate"
    WORKFLOW = "workflow"

class TaskState(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    KILLED = "killed"

@dataclass
class TaskDefinition:
    task_id: str
    task_type: TaskType
    description: str
    dependencies: List[str] = field(default_factory=list)
    timeout_seconds: int = 300
    max_retries: int = 1

设计决策:使用数据类分离「定义」(配置)和「执行」(状态)。TaskDefinition是只读的任务描述,TaskExecution记录运行时的状态变化。这种分离使得同一个任务可以重新执行而不产生冲突。

8.5.3 任务管理器

依赖检查机制

TaskManager的核心是依赖管理。检查任务是否可执行:

此方法在执行主循环中被频繁调用,确保任务只在依赖完全满足时才运行。

通知队列

完整代码见 lab/mini_harness/orchestration/engine.py

当任务完成或失败时,TaskManager发出通知。这允许工作流响应Task状态变化:

关键设计:通知中包含 next_tasks 列表,这是现在可以执行的后续任务。这支持高效的事件驱动调度。

8.5.4 工作流状态机

FSM基础架构

工作流使用经典的有限状态机模式:

设计考虑:转移条件是可选的。无条件转移自动执行,有条件转移基于当前上下文计算。如果条件抛出异常,状态机会记录但继续尝试其他转移。

执行日志

状态机记录所有状态变化及其上下文快照:

这支持调试和可观测性,完整日志保存在执行结果中。

8.5.5 智能体上下文隔离

上下文变量设计

子Agent需要与父Agent隔离,但仍可访问继承的值。使用Python的ContextVar实现线程安全的隔离:

关键设计:双层查找(本地优先,回退到父)实现变量遮蔽。上下文作为context manager使用,确保异步代码中的正确清理。

8.5.6 编排引擎主循环

执行循环

完整代码见 lab/mini_harness/orchestration/engine.py

编排引擎的核心是主执行循环,它驱动Task和状态转移:

这种「任务优先」循环模式优先执行所有准备好的Task,然后在没有Task可执行时尝试状态转移。

任务类型处理

编排引擎根据任务类型分发执行:

上面我们详细介绍了编排引擎的各个核心组件和设计模式。现在通过完整的实际示例,展示如何将这些组件组合在一起,构建一个完整的工作流系统。

8.5.7 使用示例

工作流定义

以下示例展示了如何使用编排引擎定义一个简单的状态工作流,包括起点、研究和完成三个状态及其转换。

任务依赖

以下示例展示了如何定义带有依赖关系的任务,编排引擎会按照依赖顺序自动调度执行。

8.5.8 本小节小结

编排引擎通过以下设计实现高效的工作流:

  • 清晰的分层:TaskManager处理依赖,StateMachine处理阶段

  • 事件驱动:通知队列让调度器对完成做出反应

  • 上下文隔离:ContextVar保护子Agent免受状态污染

  • 可观测性:完整的日志和指标收集

完整实现见 lab/mini_harness/orchestration/engine.py

最后更新于