# 构建审批工作流
executor = WorkflowExecutor()
# 定义状态
executor.add_state(State(
state_id="submitted",
state_type=StateType.INITIAL,
actions=[Action(action_id="validate_input", action_type=ActionType.TOOL_CALL)],
))
executor.add_state(State(
state_id="reviewing",
state_type=StateType.NORMAL,
actions=[Action(
action_id="apply_changes",
action_type=ActionType.TOOL_CALL,
side_effect=True, # 需要人工审批
params={"target": "production"},
)],
))
executor.add_state(State(state_id="approved", state_type=StateType.FINAL))
executor.add_state(State(state_id="rejected", state_type=StateType.FINAL))
# 定义转移
executor.add_transition(Transition(
from_state="submitted",
to_state="reviewing",
condition=lambda ctx: ctx.get("valid", False),
))
executor.add_transition(Transition(
from_state="submitted",
to_state="rejected",
condition=lambda ctx: not ctx.get("valid", False),
))
executor.add_transition(Transition(
from_state="reviewing",
to_state="approved",
condition=lambda ctx: ctx.get("approved", False),
))
# 初始化并执行
executor.initialize("submitted", context={"valid": True})
result = executor.run()
print(result)
# {"final_state": "reviewing", "paused": True, "pending_approvals": ["apply_changes"], ...}
# 模拟人工审批
executor.context["approved"] = True
executor.approve_action("apply_changes")
result = executor.run()
print(result)
# {"final_state": "approved", "paused": False, ...}