# 8.2 状态机与工作流引擎

状态机是工作流编排的数学基础，通过有限个状态和转移规则来描述系统行为。本节介绍 FSM 原理、Claude Code 的工作流实现、OpenClaw 的 Lobster 引擎，以及 Python 状态机的完整实现。

## 8.2.1 有限状态机的核心原理

有限状态机(Finite State Machine, FSM)是工作流引擎的数学基础。FSM通过状态和转移来描述系统在不同条件下的行为演进。

在Agent编排中，FSM定义了：

1. **有限个状态**：工作流的每个阶段（如“待验证”、“执行中”、“完成”）
2. **输入符号**：触发转移的事件（如“approve”、“error”、“timeout”）
3. **转移函数**：δ(state, event) → new\_state
4. **初始状态** 和 **接受状态**

## 8.2.2 Claude Code的工作流执行方式

在Claude Code中，FSM通过智能体的Task系统实现，支持通过Task的Workflow类型来定义状态机。主要特点：

* **代码优先**：通过Python代码定义状态和转移
* **灵活的条件**：支持任意Python表达式作为转移条件
* **内置上下文**：自动维护上下文变量和执行历史
* **错误处理**：与智能体的错误恢复机制深度集成

## 8.2.3 OpenClaw Lobster引擎概述

**Lobster** 是OpenClaw的确定性工作流引擎，其核心特点：

* **声明式定义**：YAML格式工作流定义，无需编码
* **确定性执行**：相同输入保证相同的执行路径和输出
* **副作用暂停**：在执行副作用前暂停，等待人工审批
* **自动恢复**：支持从检查点恢复中断的执行
* **可审计**：完整的执行历史和决策日志

Lobster引擎的执行过程：

```
1. YAML解析 → 构建内部FSM表示
2. 初始化执行上下文
3. 循环执行:
   - 评估当前状态的出边条件
   - 选择满足条件的转移
   - 标记副作用(需审批)或执行纯计算
   - 状态转移
   - 检查是否到达终止状态

```

状态机的核心循环提供了通用的执行框架。为了使非技术人员也能定义和配置工作流，系统设计了一套标准的YAML语法，用于声明式地描述工作流的结构和行为。

## 8.2.4 YAML工作流定义语法

### 基本结构

YAML工作流定义的基本结构如下：

```yaml
version: "1.0"
name: "工作流名称"
description: "工作流描述"

## 全局变量
variables:
  max_retries: 3
  timeout: 300

## 状态定义
states:
  start:
    type: initial

  validate:
    type: normal
    actions:
      - id: validation_check
        type: tool_call
        tool: validator
        params:
          data: "{{ context.input }}"

  execute:
    type: normal
    actions:
      - id: main_execution
        type: tool_call
        tool: executor
        side_effect: true  # 需要审批

  complete:
    type: final

## 转移定义
transitions:
  - from: start
    to: validate

  - from: validate
    to: execute
    condition: "{{ result.validation_check.success }}"

  - from: validate
    to: error
    condition: "{{ not result.validation_check.success }}"

  - from: execute
    to: complete
    condition: "{{ result.main_execution.success }}"

  - from: execute
    to: retry
    condition: "{{ attempt < variables.max_retries }}"

## 错误处理
error_handlers:
  - on_state: "*"
    action: log_error
    fallback_state: error_state
```

### 状态类型详解

| 状态类型     | 含义   | 特点               |
| -------- | ---- | ---------------- |
| initial  | 初始状态 | 工作流启动时的状态，有且仅有一个 |
| normal   | 普通状态 | 执行actions，评估转移条件 |
| final    | 终止状态 | 工作流成功完成，可有多个     |
| error    | 异常状态 | 工作流失败或异常中止       |
| wait     | 等待状态 | 等待外部输入或异步结果      |
| parallel | 并行状态 | 同时执行多个分支         |

### 条件分支示例

条件分支的YAML定义示例如下：

```yaml
states:
  classify:
    type: normal
    actions:
      - id: classify_task
        type: tool_call
        tool: classifier
        params:
          input: "{{ context.data }}"

transitions:
  # 基于分类结果的条件分支
  - from: classify
    to: path_a
    condition: "{{ result.classify_task.category == 'typeA' }}"

  - from: classify
    to: path_b
    condition: "{{ result.classify_task.category == 'typeB' }}"

  - from: classify
    to: path_c
    condition: "{{ result.classify_task.category == 'typeC' }}"

  # 默认分支
  - from: classify
    to: unknown
    condition: "{{ true }}"
```

### 循环与重试

循环与重试的YAML定义示例如下：

```yaml
states:
  retry_loop:
    type: normal
    variables:
      attempt: 0
    actions:
      - id: attempt_operation
        type: tool_call
        tool: operation
        params:
          data: "{{ context.data }}"

transitions:
  # 成功则继续
  - from: retry_loop
    to: process_result
    condition: "{{ result.attempt_operation.success }}"

  # 失败且还有重试次数则重试
  - from: retry_loop
    to: retry_loop
    condition: "{{ not result.attempt_operation.success and state.attempt < variables.max_retries }}"
    on_transition:
      - action: increment_variable
        variable: state.attempt

  # 失败且无重试次数则进入错误处理
  - from: retry_loop
    to: handle_error
    condition: "{{ not result.attempt_operation.success and state.attempt >= variables.max_retries }}"
```

## 8.2.5 Python状态机实现

状态机的Python实现包括数据结构定义、执行引擎逻辑、和具体使用示例。我们将分三部分展示：

### 第一部分：数据结构与枚举类型

首先定义表示状态、动作、转移的数据结构：

```python
from typing import Dict, List, Callable, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import json
from datetime import datetime

class ActionType(Enum):
    """Action类型"""
    TOOL_CALL = "tool_call"
    CONDITIONAL = "conditional"
    SET_VARIABLE = "set_variable"
    LOG = "log"
    PARALLEL = "parallel"

class StateType(Enum):
    """状态类型"""
    INITIAL = "initial"
    NORMAL = "normal"
    FINAL = "final"
    ERROR = "error"
    WAIT = "wait"
    PARALLEL = "parallel"

@dataclass
class Action:
    """Action定义"""
    action_id: str
    action_type: ActionType
    side_effect: bool = False
    params: Dict[str, Any] = field(default_factory=dict)
    result: Optional[Dict[str, Any]] = None
    executed: bool = False

@dataclass
class State:
    """状态定义"""
    state_id: str
    state_type: StateType
    actions: List[Action] = field(default_factory=list)
    variables: Dict[str, Any] = field(default_factory=dict)

@dataclass
class Transition:
    """转移定义"""
    from_state: str
    to_state: str
    condition: Optional[Callable[[Dict[str, Any]], bool]] = None
    on_transition: List[Callable] = field(default_factory=list)
```

**设计说明**：分离ActionType和StateType的枚举使代码更加类型安全。每个Action都包含“副作用”标记，这对于支持需要人工审批的操作至关重要。

### 第二部分：工作流执行引擎

执行引擎是状态机的核心，负责状态转移和动作执行的逻辑：

```python
class WorkflowExecutor:
    """工作流执行引擎"""

    def __init__(self):
        self.states: Dict[str, State] = {}
        self.transitions: List[Transition] = []
        self.current_state: Optional[str] = None
        self.execution_history: List[Dict] = []
        self.context: Dict[str, Any] = {}
        self.execution_paused = False
        self.pending_approvals: Dict[str, Action] = {}

    def add_state(self, state: State) -> None:
        """添加状态"""
        self.states[state.state_id] = state

    def add_transition(self, transition: Transition) -> None:
        """添加转移"""
        self.transitions.append(transition)

    def initialize(self, initial_state: str, context: Dict[str, Any]) -> None:
        """初始化工作流"""
        if initial_state not in self.states:
            raise ValueError(f"初始状态{initial_state}不存在")
        self.current_state = initial_state
        self.context = context
        self.execution_history = []
        self.pending_approvals = {}
        self.execution_paused = False

    def evaluate_condition(self, condition: Callable) -> bool:
        """评估转移条件"""
        try:
            return condition(self.context)
        except Exception as e:
            print(f"条件评估失败: {e}")
            return False

    def find_next_state(self) -> Optional[str]:
        """根据当前状态和条件查找下一个状态"""
        applicable_transitions = [
            t for t in self.transitions if t.from_state == self.current_state
        ]
        for transition in applicable_transitions:
            if transition.condition is None or self.evaluate_condition(transition.condition):
                return transition.to_state
        return None
```

**设计说明**：`find_next_state` 采用了第一个满足条件的转移，这意味着转移的顺序很重要。在实际应用中可以添加优先级或更复杂的选择策略。

### 第三部分：动作执行与审批处理

这部分处理具体的动作执行，包括对副作用的暂停和人工审批：

```python
from __future__ import annotations
from typing import Any, Dict


class WorkflowExecutor:
    # 省略 __init__、状态注册和转移选择方法；此处只展示执行/审批相关方法。

    def execute_action(self, action: Action) -> bool:
        """执行单个Action"""
        if action.executed:
            return True

        if action.action_type == ActionType.TOOL_CALL:
            if action.side_effect:
                self.pending_approvals[action.action_id] = action
                self.execution_paused = True
                print(f"[审批] Action {action.action_id} 需要批准")
                return False
            else:
                action.result = self._simulate_tool_call(action.params)
                action.executed = True
                return True

        elif action.action_type == ActionType.CONDITIONAL:
            result = self.evaluate_condition(action.params.get("condition"))
            action.result = {"value": result}
            action.executed = True
            return True

        elif action.action_type == ActionType.SET_VARIABLE:
            var_name = action.params.get("name")
            var_value = action.params.get("value")
            self.context[var_name] = var_value
            action.executed = True
            return True

        elif action.action_type == ActionType.LOG:
            message = action.params.get("message", "")
            print(f"[日志] {message}")
            action.executed = True
            return True
        return False

    def _simulate_tool_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """模拟工具调用"""
        return {"success": True, "output": f"Executed with {params}"}

    def execute_state(self) -> bool:
        """执行当前状态的所有actions"""
        if self.current_state is None:
            return False
        state = self.states[self.current_state]
        print(f"\n[进入状态] {self.current_state}")
        for action in state.actions:
            if not self.execute_action(action):
                if self.execution_paused:
                    return False
        return True

    def approve_action(self, action_id: str) -> bool:
        """批准等待的副作用Action"""
        if action_id not in self.pending_approvals:
            return False
        action = self.pending_approvals[action_id]
        action.result = self._simulate_tool_call(action.params)
        action.executed = True
        del self.pending_approvals[action_id]
        if not self.pending_approvals:
            self.execution_paused = False
        return True

    def step(self) -> bool:
        """执行一步:执行当前状态,查找下一个状态"""
        if self.current_state is None:
            return False
        if not self.execute_state():
            return False
        self.execution_history.append({
            "timestamp": datetime.now().isoformat(),
            "state": self.current_state,
            "context": dict(self.context),
        })
        next_state = self.find_next_state()
        if next_state is None:
            return False
        self.current_state = next_state
        if self.states[self.current_state].state_type == StateType.FINAL:
            return False
        return True

    def run(self) -> Dict[str, Any]:
        """执行工作流直到完成或错误"""
        max_iterations = 100
        iterations = 0
        while iterations < max_iterations:
            if self.execution_paused:
                break
            if not self.step():
                break
            iterations += 1
        return {
            "final_state": self.current_state,
            "context": self.context,
            "history": self.execution_history,
            "paused": self.execution_paused,
            "pending_approvals": list(self.pending_approvals.keys()),
        }

    def get_status(self) -> Dict[str, Any]:
        """获取当前工作流状态"""
        return {
            "current_state": self.current_state,
            "is_paused": self.execution_paused,
            "pending_approvals": list(self.pending_approvals.keys()),
            "context": self.context,
            "history_length": len(self.execution_history),
        }
```

**设计说明**：`approve_action` 方法允许外部系统（如用户或管理员）在工作流暂停时批准或拒绝操作。`execute_action` 必须先跳过已经执行的动作，否则审批后再次 `run()` 会把同一副作用动作重新挂起，无法继续转移到下一个状态。

### 第四部分：使用示例

以下是一个完整的审批工作流示例，展示如何构建和执行状态机。首先定义状态，然后定义转移，最后执行工作流：

```python
# 构建审批工作流
executor = WorkflowExecutor()

# 定义状态
executor.add_state(State(
    state_id="submitted",
    state_type=StateType.INITIAL,
    actions=[Action(action_id="validate_input", action_type=ActionType.TOOL_CALL)],
))
executor.add_state(State(
    state_id="reviewing",
    state_type=StateType.NORMAL,
    actions=[Action(
        action_id="apply_changes",
        action_type=ActionType.TOOL_CALL,
        side_effect=True,  # 需要人工审批
        params={"target": "production"},
    )],
))
executor.add_state(State(state_id="approved", state_type=StateType.FINAL))
executor.add_state(State(state_id="rejected", state_type=StateType.FINAL))

# 定义转移
executor.add_transition(Transition(
    from_state="submitted",
    to_state="reviewing",
    condition=lambda ctx: ctx.get("valid", False),
))
executor.add_transition(Transition(
    from_state="submitted",
    to_state="rejected",
    condition=lambda ctx: not ctx.get("valid", False),
))
executor.add_transition(Transition(
    from_state="reviewing",
    to_state="approved",
    condition=lambda ctx: ctx.get("approved", False),
))

# 初始化并执行
executor.initialize("submitted", context={"valid": True})
result = executor.run()
print(result)
# {"final_state": "reviewing", "paused": True, "pending_approvals": ["apply_changes"], ...}

# 模拟人工审批
executor.context["approved"] = True
executor.approve_action("apply_changes")
result = executor.run()
print(result)
# {"final_state": "approved", "paused": False, ...}
```

## 8.2.6 错误处理与恢复

### 错误处理策略

工作流可能在多个阶段出现错误：

1. **验证错误**：输入数据不合法
2. **执行错误**：工具调用失败
3. **超时错误**：任务执行超时
4. **审批拒绝**：人工审批被拒

```yaml
error_handlers:
  - on_state: validate
    error_type: validation_error
    action: notify_requester
    fallback_state: rejected

  - on_state: execute
    error_type: tool_failure
    action: retry_with_backoff
    max_retries: 3
    fallback_state: manual_intervention

  - on_state: "*"
    error_type: timeout
    action: kill_workflow
    fallback_state: error_state
```

### 检查点与恢复

通过保存执行状态，工作流可以从中断点恢复：

```python
def save_checkpoint(self) -> str:
    """保存检查点"""
    checkpoint = {
        "current_state": self.current_state,
        "context": self.context,
        "execution_history": self.execution_history,
        "timestamp": datetime.now().isoformat(),
    }
    checkpoint_id = hash_checkpoint(checkpoint)
    # 保存到持久化存储
    return checkpoint_id

def restore_from_checkpoint(self, checkpoint_id: str) -> bool:
    """从检查点恢复"""
    checkpoint = load_checkpoint(checkpoint_id)
    if not checkpoint:
        return False

    self.current_state = checkpoint["current_state"]
    self.context = checkpoint["context"]
    self.execution_history = checkpoint["execution_history"]
    return True
```

## 8.2.7 本小节小结

状态机提供了优雅的方式来描述工作流的行为。OpenClaw的Lobster引擎通过YAML声明式定义和确定性执行，使复杂工作流的编写和维护变得简洁可靠。结合Python实现的执行引擎，我们可以处理条件分支、循环、重试和错误恢复等复杂场景。下一节将探讨多个智能体如何协调执行更复杂的任务。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yeasy.gitbook.io/harness_engineering_guide/di-san-bu-fen-xi-tong-ji-cheng-yu-gong-cheng-shi-jian/08_orchestration/8.2_state_machine.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
