# 12.3 工具调用护栏

本节介绍在工具执行前对调用参数的多层检查机制，包括危险命令黑名单、约束护栏以及护栏框架的集成设计。

## 12.3.1 危险操作检测概览

工具调用护栏(Tool Calling Guardrails)是在执行工具前对调用参数进行检查，阻止明显危险的操作。护栏分为两类：

1. **黑名单护栏**：禁止已知危险的命令或参数
2. **约束护栏**：强制参数满足特定条件（如只读访问、路径限制等）

## 12.3.2 危险命令黑名单

### Claude Code 的 dangerousPatterns.ts

Claude Code 包含多种危险命令的黑名单。这些命令单独使用或组合使用都可能造成严重后果（以下列举部分典型模式）：

```python
# 危险命令黑名单
DANGEROUS_PATTERNS = [
    "rm",           # 删除文件
    "dd",           # 低级磁盘操作
    "mkfs",         # 创建文件系统(格式化)
    "shred",        # 安全删除
    "sysctl",       # 修改内核参数
    "iptables",     # 防火墙规则修改
    "insmod",       # 加载内核模块
    "rmmod",        # 卸载内核模块
    "kill -9",      # 强制杀进程
    "reboot",       # 重启系统
    "shutdown",     # 关闭系统
    "chown",        # 修改文件所有者(权限提升)
    "chmod",        # 修改文件权限(权限提升)
    "sudo",         # 提升权限(if not in allowed_list)
    "passwd",       # 修改密码
    "useradd",      # 添加用户
    "userdel",      # 删除用户
    "groupadd",     # 添加组
    "crontab",      # 计划任务编辑(权限提升风险)
    "visudo",       # 编辑sudoers(权限提升风险)
    "at",           # 延迟任务执行(权限提升风险)
    "cryptsetup",   # 磁盘加密
    "gdisk",        # 分区表修改
    "mdadm",        # RAID配置
    "lvcreate",     # 逻辑卷创建
    "lvremove",     # 逻辑卷删除
    "zypper",       # 包管理器(权限提升风险)
    "apt",          # Debian包管理器
    "yum",          # RHEL包管理器
]
```

### 实现方式：命令AST分析

简单的前缀匹配(startswith)容易产生误报。更好的方式是解析命令为AST（抽象语法树），判断 **执行的实际命令** 而非参数中的字符串：

```python
import shlex
import subprocess

class DangerousCommandDetector:
    """危险命令检测器：通过AST分析而非字符串匹配"""

    DANGEROUS_COMMANDS = {
        "rm", "dd", "mkfs", "shred", "sysctl", "iptables",
        "insmod", "rmmod", "reboot", "shutdown", "chown", "chmod",
        "sudo", "passwd", "useradd", "userdel", "cryptsetup", "gdisk",
        "crontab", "visudo", "at", "mount", "umount"
    }

    # 某些命令有安全的子命令
    SAFE_SUBCOMMANDS = {
        "apt": ["list", "search", "show"],  # 仅允许查询
        "yum": ["list", "search", "info"],
    }

    def detect(self, command: str) -> tuple[bool, str]:
        """
        检测危险操作
        返回： (is_dangerous, reason)
        """
        try:
            # 分词(处理引号、转义)
            tokens = shlex.split(command)
            if not tokens:
                return False, ""

            # 提取主命令(第一个非管道/重定向符号)
            main_cmd = self._extract_main_command(tokens)

            if not main_cmd:
                return False, ""

            # 检查黑名单
            if main_cmd in self.DANGEROUS_COMMANDS:
                return True, f"禁止的命令: {main_cmd}"

            # 检查子命令(如apt list是否安全)
            if main_cmd in self.SAFE_SUBCOMMANDS:
                if len(tokens) < 2:
                    return True, f"命令 {main_cmd} 必须有子命令"
                subcommand = tokens[1]
                if subcommand not in self.SAFE_SUBCOMMANDS[main_cmd]:
                    return True, f"不安全的子命令: {main_cmd} {subcommand}"

            # 检查管道链中的所有命令
            for cmd in self._extract_piped_commands(tokens):
                if cmd in self.DANGEROUS_COMMANDS:
                    return True, f"管道中包含危险命令: {cmd}"

            return False, ""

        except ValueError as e:
            # shlex.split失败(如引号不匹配)
            return True, f"命令解析失败: {e}"

    def _extract_main_command(self, tokens: list) -> str:
        """提取主命令(忽略路径)"""
        if not tokens:
            return ""
        cmd = tokens[0]
        # 移除路径： /bin/rm -> rm
        return cmd.split("/")[-1]

    def _extract_piped_commands(self, tokens: list) -> list:
        """提取管道链中的所有命令"""
        commands = []
        current_cmd = []

        for token in tokens:
            if token in ["|", "||", "&", "&&"]:
                if current_cmd:
                    commands.append(self._extract_main_command(current_cmd))
                current_cmd = []
            else:
                current_cmd.append(token)

        if current_cmd:
            commands.append(self._extract_main_command(current_cmd))

        return commands

# 危险命令检测使用示例
detector = DangerousCommandDetector()

test_cases = [
    "ls -la /tmp",                  # 安全
    "rm -rf /",                     # 危险
    "cat file.txt | grep pattern",  # 安全
    "ls | rm -rf /",                # 危险
    "apt list",                     # 安全(白名单子命令)
    "apt install curl",             # 危险(非白名单子命令)
]

for cmd in test_cases:
    is_dangerous, reason = detector.detect(cmd)
    print(f"{cmd}: {'危险' if is_dangerous else '安全'} - {reason}")
```

## 12.3.3 约束护栏

### 1. 只读约束

某些工具应限制为 **只读操作**，不允许修改系统状态：

```python
class ReadOnlyConstraint:
    """只读约束：防止write操作"""

    WRITE_PATTERNS = [
        r"^(rm|touch|mkdir|mv|cp|tee|sed -i|awk -i)",  # 直接修改文件
        r">",                                            # 输出重定向
        r"dd of=",                                       # 磁盘写
        r"write\(",                                      # API write调用
    ]

    def enforce(self, command: str, tool_name: str) -> bool:
        """检查命令是否违反只读约束"""
        if "read_only" not in tool_name:
            return True  # 无约束

        import re
        for pattern in self.WRITE_PATTERNS:
            if re.search(pattern, command):
                raise PermissionDenied(f"只读工具不允许: {pattern}")

        return True

# 使用
constraint = ReadOnlyConstraint()

# 这应该成功
constraint.enforce("cat /etc/passwd", tool_name="read_file_read_only")

# 这应该失败
try:
    constraint.enforce("echo malware > /etc/passwd", tool_name="read_file_read_only")
except PermissionDenied as e:
    print(f"约束违反: {e}")
```

### 2. 参数范围约束

某些参数应限制在特定范围内：

```python
from dataclasses import dataclass
from typing import Any, Callable

@dataclass
class ParameterConstraint:
    name: str
    validator: Callable[[Any], bool]
    error_message: str

class ParameterConstraintEnforcer:
    """参数范围约束"""

    def __init__(self):
        self.constraints = {
            "timeout_seconds": ParameterConstraint(
                name="timeout_seconds",
                validator=lambda x: 0 < x <= 300,  # 0-300秒
                error_message="timeout必须在0-300秒之间"
            ),
            "memory_mb": ParameterConstraint(
                name="memory_mb",
                validator=lambda x: 0 < x <= 2048,  # 0-2GB
                error_message="内存限制必须在0-2GB之间"
            ),
            "file_size_mb": ParameterConstraint(
                name="file_size_mb",
                validator=lambda x: 0 < x <= 1024,  # 0-1GB
                error_message="文件大小必须在0-1GB之间"
            ),
        }

    def validate(self, params: dict) -> None:
        """验证所有参数"""
        for param_name, constraint in self.constraints.items():
            if param_name in params:
                value = params[param_name]
                if not constraint.validator(value):
                    raise ValueError(constraint.error_message)

# 使用
enforcer = ParameterConstraintEnforcer()

# 正常参数
try:
    enforcer.validate({"timeout_seconds": 60, "memory_mb": 512})
    print("参数有效")
except ValueError as e:
    print(f"参数错误: {e}")

# 超出范围的参数
try:
    enforcer.validate({"timeout_seconds": 3600})  # 超过300秒
except ValueError as e:
    print(f"参数错误: {e}")
```

### 3. 超时强制

防止工具调用导致的资源耗尽：

```python
import signal
import subprocess
from contextlib import contextmanager

class TimeoutEnforcer:
    """超时强制：确保工具调用在规定时间内完成"""

    @contextmanager
    def enforce_timeout(self, timeout_seconds: int):
        """
        上下文管理器：强制执行超时
        """
        def timeout_handler(signum, frame):
            raise TimeoutError(f"执行超时({timeout_seconds}秒)")

        # 设置信号处理器
        old_handler = signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(timeout_seconds)

        try:
            yield
        finally:
            # 取消警报
            signal.alarm(0)
            signal.signal(signal.SIGALRM, old_handler)

    def run_with_timeout(self,
                        command: list[str],
                        timeout_seconds: int = 30) -> str:
        """运行命令并强制超时"""
        try:
            with self.enforce_timeout(timeout_seconds):
                result = subprocess.run(
                    command,
                    shell=False,
                    capture_output=True,
                    text=True,
                    timeout=timeout_seconds
                )
                return result.stdout
        except subprocess.TimeoutExpired:
            raise TimeoutError(f"命令在{timeout_seconds}秒内未完成")
        except TimeoutError as e:
            raise e

# 使用
enforcer = TimeoutEnforcer()

# 这会在30秒后被中断
try:
    result = enforcer.run_with_timeout(["sleep", "60"], timeout_seconds=5)
except TimeoutError as e:
    print(f"超时: {e}")
```

## 12.3.4 护栏框架集成

将所有护栏整合为一个统一的框架：

```python
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)

class GuardrailFramework:
    """统一的护栏框架"""

    def __init__(self):
        self.dangerous_detector = DangerousCommandDetector()
        self.readonly_constraint = ReadOnlyConstraint()
        self.param_constraint = ParameterConstraintEnforcer()
        self.timeout_enforcer = TimeoutEnforcer()

    async def check_tool_call(self,
                             tool_call: "ToolCall",
                             user_id: str) -> "GuardrailCheckResult":
        """
        执行完整的护栏检查

        返回GuardrailCheckResult,包含：
        - is_safe: bool
        - violations: List[str]
        - recommendation: str (ALLOW / ASK / DENY)
        """

        violations = []

        # 1. 危险命令检测
        if hasattr(tool_call, 'command'):
            is_dangerous, reason = self.dangerous_detector.detect(tool_call.command)
            if is_dangerous:
                violations.append(f"危险命令: {reason}")

        # 2. 只读约束检查
        try:
            self.readonly_constraint.enforce(
                tool_call.command if hasattr(tool_call, 'command') else "",
                tool_call.tool_name
            )
        except PermissionDenied as e:
            violations.append(str(e))

        # 3. 参数范围检查
        try:
            self.param_constraint.validate(tool_call.args)
        except ValueError as e:
            violations.append(str(e))

        # 4. 审计记录
        logger.info(f"护栏检查: user={user_id}, tool={tool_call.tool_name}, "
                   f"violations={len(violations)}")

        # 5. 生成建议
        if violations:
            return GuardrailCheckResult(
                is_safe=False,
                violations=violations,
                recommendation="DENY"
            )
        else:
            return GuardrailCheckResult(
                is_safe=True,
                violations=[],
                recommendation="ALLOW"
            )

    async def execute_with_guardrails(self,
                                     tool_call: "ToolCall",
                                     user_id: str) -> Any:
        """带护栏的工具执行"""

        # 检查护栏
        check_result = await self.check_tool_call(tool_call, user_id)

        if not check_result.is_safe:
            logger.warning(f"护栏拒绝: {check_result.violations}")
            raise PermissionDenied(
                f"工具调用被护栏阻止: {check_result.violations[0]}"
            )

        # 执行(在超时保护下)
        try:
            result = self.timeout_enforcer.run_with_timeout(
                command=tool_call.command if hasattr(tool_call, 'command') else "",
                timeout_seconds=tool_call.args.get('timeout_seconds', 30)
            )
            logger.info(f"工具成功执行: {tool_call.tool_name}")
            return result
        except TimeoutError as e:
            logger.error(f"工具超时: {tool_call.tool_name}")
            raise

class GuardrailCheckResult:
    def __init__(self, is_safe: bool, violations: List[str], recommendation: str):
        self.is_safe = is_safe
        self.violations = violations
        self.recommendation = recommendation
```

## 12.3.5 护栏配置示例

护栏配置文件的完整示例：

```yaml
## guardrails.yaml - 护栏配置文件

dangerous_commands:
  enabled: true
  action: "deny"  # deny / ask / allow
  blacklist:
    - rm
    - dd
    - mkfs
    - reboot

readonly_tools:
  enabled: true
  tools:
    - name: "read_file_read_only"
      mode: "readonly"
    - name: "list_directory_read_only"
      mode: "readonly"

parameter_constraints:
  enabled: true
  timeout_seconds:
    min: 1
    max: 300
    default: 30
  memory_mb:
    min: 1
    max: 2048
    default: 512

timeout_enforcement:
  enabled: true
  default_timeout_seconds: 30
  max_timeout_seconds: 300
```

## 12.3.6 新一代护栏模式

传统的输入/输出检查已不足以应对复杂攻击。2026 年出现了更全面的护栏架构：

**Guardrail Sandwich 模式**：将防护分为三层——输入清洗（剥离已知注入模式）→ 有界推理（工具白名单 + 步数限制 + Token预算）→ 输出验证（检查敏感信息泄露和格式合规）。这种“三明治”结构确保即使某一层被绕过，其他层仍能拦截。

**执行流 Webhook 钩子**：Arcade 提出的 Contextual Access 模式在工具执行流中插入三个 webhook 钩子——access（调用前权限检查）、pre-execution（参数验证和风险评估）、post-execution（结果审计和敏感信息过滤）。这种细粒度的控制使得安全策略可以根据工具类型和上下文动态调整。

```python
class GuardrailSandwich:
    """三明治护栏模式"""

    async def execute_with_guardrails(self, action: ToolCall) -> ToolResult:
        # 第一层：输入清洗
        sanitized = await self.input_sanitizer.clean(action)

        # 第二层：有界推理
        if not self.boundary_checker.within_bounds(sanitized):
            raise BoundaryViolation(f"Action exceeds bounds: {sanitized}")

        result = await self.tool_executor.execute(sanitized)

        # 第三层：输出验证
        validated = await self.output_validator.validate(result)
        return validated
```

***

**本节总结**：护栏通过多维度的检查（危险命令、约束、超时）在执行前阻止危险操作。框架设计应易于扩展和配置。下一节介绍最复杂的防护机制：路径校验。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yeasy.gitbook.io/harness_engineering_guide/di-si-bu-fen-an-quan-ping-gu-yu-yan-jin/12_security/12.3_guardrails.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
