# 2.3 安全层与可观测性层

本节介绍安全层和可观测性层的设计原则、核心组件、实现策略和最佳实践，这两层贯穿整个Harness系统，为所有子系统提供基础保障。

## 2.3.1 基础保障的重要性

安全层和可观测性层虽然不是独立的执行层，但它们贯穿整个Harness系统的每个角落。它们的设计质量直接决定了系统的安全性、可维护性和生产就绪程度。

## 2.3.2 安全层

安全层的核心理念是：**假设智能体的任何决策都可能存在风险，需要进行多层验证。**

### 安全的层次化模型

根据操作的风险程度，我们定义了六个信任等级，每个等级对应不同的权限配置：

| 信任等级                   | 权限配置        | 典型应用    |
| ---------------------- | ----------- | ------- |
| Manual Only            | 完全人工操作      | 极高风险    |
| Approve Always         | 每步都需要审批     | 高风险操作   |
| Approve Once           | 整个流程开始前审批   | 生产环境    |
| Ask First              | 关键操作前请求审批   | 开发/测试环境 |
| Auto with Notification | 自动执行+发送通知   | 低风险日常操作 |
| Full Trust             | 完全自主执行，无需通知 | 玩具应用    |

### 权限管理系统的设计

权限管理系统通过定义权限等级来实现细粒度的访问控制：

```python
from enum import Enum
from typing import Protocol, Optional
from pydantic import BaseModel
from datetime import datetime

class PermissionLevel(str, Enum):
    """权限等级定义(6级信任模型)"""
    MANUAL_ONLY = "manual_only"           # 完全人工操作
    APPROVE_ALWAYS = "approve_always"     # 每步审批
    APPROVE_ONCE = "approve_once"         # 整个任务审批一次
    ASK_FIRST = "ask_first"               # 事前询问
    AUTO_WITH_NOTIFICATION = "auto_with_notification"  # 自动+通知
    FULL_TRUST = "full_trust"             # 完全信任

class ResourcePermission(BaseModel):
    """对某个资源的权限定义"""
    resource_id: str          # 资源标识(如"file://config.yaml")
    resource_type: str        # 资源类型(如"file", "api", "database")
    permission_level: PermissionLevel
    actions: list[str]        # 允许的操作(如"read", "write", "delete")
    conditions: dict = {}     # 额外条件(如"max_size_mb": 100)

class AgentPermissions(BaseModel):
    """智能体的权限配置"""
    agent_id: str
    permissions: list[ResourcePermission]
    default_level: PermissionLevel = PermissionLevel.ASK_FIRST  # 默认询问级别
    created_at: datetime
    expires_at: Optional[datetime] = None  # 权限过期时间

class PermissionManager:
    """权限管理器"""

    def __init__(self, policy_storage: PolicyStorage):
        self.policy_storage = policy_storage
        self.cache = {}  # 权限缓存,加速查询

    async def check_permission(
        self,
        agent_id: str,
        resource_id: str,
        action: str
    ) -> tuple[bool, Optional[str]]:
        """
        检查Agent是否有权进行操作。

        返回:(是否有权, 理由或None)
        """
        # 从缓存中获取权限
        permissions = self.cache.get(agent_id)
        if permissions is None:
            # 从存储中加载
            permissions = await self.policy_storage.load(agent_id)
            self.cache[agent_id] = permissions

        # 检查权限是否过期
        if permissions.expires_at and datetime.now() > permissions.expires_at:
            return False, "Agent permissions have expired"

        # 查找匹配的权限
        for perm in permissions.permissions:
            if self._resource_matches(perm.resource_id, resource_id):
                if action in perm.actions:
                    return True, None

        # 如果没有找到,使用默认权限
        if permissions.default_level == PermissionLevel.FULL_TRUST:
            return True, None
        elif permissions.default_level == PermissionLevel.ASK_FIRST:
            return False, "Request needs human approval"

        return False, "Permission denied"

    async def request_approval(
        self,
        agent_id: str,
        resource_id: str,
        action: str,
        reason: str
    ) -> ApprovalRequest:
        """提交审批请求"""
        return ApprovalRequest(
            agent_id=agent_id,
            resource_id=resource_id,
            action=action,
            reason=reason,
            created_at=datetime.now(),
            status="pending"
        )

    def _resource_matches(self, pattern: str, resource: str) -> bool:
        """检查资源是否匹配权限模式"""
        # 支持通配符匹配
        # 如 "file://*.log" 匹配所有.log文件
        from fnmatch import fnmatch
        return fnmatch.fnmatch(resource, pattern)
```

### 沙箱隔离的设计

对于高风险的操作（如执行系统命令、修改文件），Harness需要在隔离的环境中执行，以防止其影响整个系统。

```python
class SandboxExecutor:
    """在沙箱中执行操作"""

    async def execute(
        self,
        tool_call: ToolCall,
        isolation_level: IsolationLevel = IsolationLevel.PROCESS
    ) -> ToolResult:
        """
        在隔离环境中执行工具调用。

        isolation_level:
          - NONE: 直接执行,无隔离
          - PROCESS: 进程级隔离
          - CONTAINER: 容器级隔离(Docker)
          - VM: 虚拟机级隔离
        """
        if isolation_level == IsolationLevel.NONE:
            return await self._execute_directly(tool_call)

        elif isolation_level == IsolationLevel.PROCESS:
            return await self._execute_in_subprocess(tool_call)

        elif isolation_level == IsolationLevel.CONTAINER:
            return await self._execute_in_container(tool_call)

        elif isolation_level == IsolationLevel.VM:
            return await self._execute_in_vm(tool_call)

    async def _execute_in_subprocess(self, tool_call: ToolCall) -> ToolResult:
        """在子进程中执行,隔离系统调用"""
        import subprocess

        try:
            # 构造执行命令
            cmd = self._build_command(tool_call)

            # 在子进程中执行,限制资源
            result = subprocess.run(
                cmd,
                timeout=30,
                capture_output=True,
                text=True,
                # 限制内存和CPU
                # cgroups配置需要在系统层面
            )

            if result.returncode == 0:
                return ToolResult(
                    status="success",
                    output=result.stdout
                )
            else:
                return ToolResult(
                    status="error",
                    error=result.stderr
                )

        except subprocess.TimeoutExpired:
            return ToolResult(
                status="timeout",
                error="Execution exceeded 30s timeout"
            )

    async def _execute_in_container(self, tool_call: ToolCall) -> ToolResult:
        """在Docker容器中执行,完整的资源隔离"""
        import docker

        client = docker.from_env()

        try:
            # 在临时容器中执行
            output = client.containers.run(
                image="python:3.11-slim",
                command=self._build_command(tool_call),
                remove=True,
                timeout=30,
                mem_limit="512m",  # 512MB内存限制
                memswap_limit="1g",  # 包括swap
                cpus=0.5,  # 限制到50%CPU
                network_disabled=True  # 禁用网络
            )

            return ToolResult(
                status="success",
                output=output.decode()
            )

        except docker.errors.ContainerError as e:
            return ToolResult(
                status="error",
                error=str(e)
            )
```

### 审计日志的设计

审计日志记录所有安全相关事件，便于事后追踪和分析：

```python
class AuditLog(BaseModel):
    """审计日志记录"""
    timestamp: datetime
    agent_id: str
    action_type: str  # "permission_check", "tool_execution", "approval_request"等
    resource: str
    operation: str
    status: str  # "allowed", "denied", "executed", "failed"
    details: dict = {}  # 额外信息
    ip_address: Optional[str] = None
    user_id: Optional[str] = None

class AuditLogger:
    """审计日志系统"""

    def __init__(self, storage: AuditLogStorage):
        self.storage = storage
        self.buffer = []  # 日志缓冲,批量写入以提高性能
        self.buffer_size = 100

    async def log(self, audit_log: AuditLog) -> None:
        """记录一条审计日志"""
        self.buffer.append(audit_log)

        if len(self.buffer) >= self.buffer_size:
            await self._flush()

    async def _flush(self) -> None:
        """将缓冲的日志写入存储"""
        if not self.buffer:
            return

        await self.storage.batch_insert(self.buffer)
        self.buffer.clear()

    async def query(
        self,
        agent_id: str = None,
        action_type: str = None,
        status: str = None,
        time_range: tuple[datetime, datetime] = None,
        limit: int = 100
    ) -> list[AuditLog]:
        """查询审计日志"""
        filters = {}
        if agent_id:
            filters["agent_id"] = agent_id
        if action_type:
            filters["action_type"] = action_type
        if status:
            filters["status"] = status

        return await self.storage.query(
            filters=filters,
            time_range=time_range,
            limit=limit
        )

    async def export(
        self,
        file_path: str,
        format: str = "json"
    ) -> None:
        """导出审计日志"""
        logs = await self.storage.query(filters={})

        if format == "json":
            with open(file_path, "w") as f:
                json.dump(
                    [log.model_dump() for log in logs],
                    f,
                    default=str
                )
        elif format == "csv":
            import csv
            with open(file_path, "w", newline="") as f:
                writer = csv.DictWriter(f, fieldnames=AuditLog.model_fields)
                writer.writeheader()
                for log in logs:
                    writer.writerow(log.model_dump())
```

## 2.3.3 可观测性层

可观测性的核心目标是：**当系统出现问题时，能够快速定位根本原因。**

可观测性的三个支柱：日志、追踪、指标。

### 结构化日志

结构化日志使用JSON格式记录事件，便于机器解析和检索：

```python
from enum import Enum
import logging
import json
from datetime import datetime

class LogLevel(str, Enum):
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"

class StructuredLogger:
    """结构化日志系统"""

    def __init__(self, name: str, storage: LogStorage):
        self.name = name
        self.storage = storage
        self.context = {}  # 当前的上下文信息

    def set_context(self, **kwargs) -> None:
        """设置日志上下文"""
        self.context.update(kwargs)

    async def log(
        self,
        level: LogLevel,
        message: str,
        **kwargs
    ) -> None:
        """记录一条日志"""
        log_record = {
            "timestamp": datetime.now().isoformat(),
            "logger": self.name,
            "level": level.value,
            "message": message,
            "context": self.context,
            "extra": kwargs
        }

        # 输出到控制台
        print(json.dumps(log_record))

        # 存储
        await self.storage.save(log_record)

    async def search(
        self,
        query: str,
        level: LogLevel = None,
        time_range: tuple[datetime, datetime] = None,
        limit: int = 100
    ) -> list[dict]:
        """搜索日志"""
        return await self.storage.search(
            query=query,
            level=level,
            time_range=time_range,
            limit=limit
        )

# 结构化日志使用示例
logger = StructuredLogger("agent.runtime", storage)
logger.set_context(agent_id="agent-001", task_id="task-123")

await logger.log(
    level=LogLevel.INFO,
    message="Tool execution started",
    tool_name="weather_api",
    params={"city": "Beijing"}
)
```

### 分布式追踪

分布式追踪通过跟踪请求跨越多个系统组件的执行路径，来诊断性能问题：

```python
from dataclasses import dataclass
import uuid
from datetime import datetime
from typing import Optional

@dataclass
class TraceSpan:
    """追踪单元"""
    trace_id: str          # 追踪的全局ID
    span_id: str           # 这个span的ID
    parent_span_id: Optional[str]  # 父span的ID
    operation_name: str    # 操作名称
    start_time: datetime
    end_time: Optional[datetime]
    duration_ms: Optional[float]
    status: str            # "success", "error", "timeout"
    error: Optional[str]
    attributes: dict = {}  # 属性(如参数、结果等)

class Tracer:
    """分布式追踪系统"""

    def __init__(self, service_name: str, storage: TraceStorage):
        self.service_name = service_name
        self.storage = storage
        self.current_trace_id: Optional[str] = None
        self.span_stack: list[TraceSpan] = []

    def start_trace(self, trace_id: str = None) -> str:
        """开始一个新的追踪"""
        if trace_id is None:
            trace_id = str(uuid.uuid4())

        self.current_trace_id = trace_id
        self.span_stack = []
        return trace_id

    def start_span(
        self,
        operation_name: str,
        attributes: dict = None
    ) -> TraceSpan:
        """开始一个新的span"""
        parent_span = self.span_stack[-1] if self.span_stack else None

        span = TraceSpan(
            trace_id=self.current_trace_id,
            span_id=str(uuid.uuid4()),
            parent_span_id=parent_span.span_id if parent_span else None,
            operation_name=operation_name,
            start_time=datetime.now(),
            end_time=None,
            duration_ms=None,
            status="pending",
            error=None,
            attributes=attributes or {}
        )

        self.span_stack.append(span)
        return span

    async def end_span(
        self,
        span: TraceSpan,
        status: str = "success",
        error: str = None
    ) -> None:
        """结束一个span"""
        span.end_time = datetime.now()
        span.duration_ms = (
            span.end_time - span.start_time
        ).total_seconds() * 1000
        span.status = status
        span.error = error

        # 弹出栈
        if self.span_stack and self.span_stack[-1] == span:
            self.span_stack.pop()

        # 存储
        await self.storage.save(span)

    async def context_manager(
        self,
        operation_name: str,
        attributes: dict = None
    ):
        """上下文管理器,简化span的使用"""
        span = self.start_span(operation_name, attributes)
        try:
            yield span
            await self.end_span(span, status="success")
        except Exception as e:
            await self.end_span(span, status="error", error=str(e))
            raise

# 分布式追踪使用示例
async with tracer.context_manager("tool_execution", {"tool": "weather_api"}):
    # 执行工具
    result = await weather_api.get(city="Beijing")
```

### 性能指标收集

性能指标收集系统记录和汇总关键性能指标，支持实时监控和告警：

```python
from collections import defaultdict
from statistics import mean, stdev, quantiles
from datetime import datetime
import time

class MetricsCollector:
    """性能指标收集"""

    def __init__(self):
        self.metrics: dict[str, list[float]] = defaultdict(list)
        self.counters: dict[str, int] = defaultdict(int)

    def record_duration(
        self,
        metric_name: str,
        duration_ms: float
    ) -> None:
        """记录一个时间指标"""
        self.metrics[metric_name].append(duration_ms)

    def increment_counter(
        self,
        counter_name: str,
        value: int = 1
    ) -> None:
        """增加计数"""
        self.counters[counter_name] += value

    def get_statistics(self, metric_name: str) -> dict:
        """获取指标的统计信息"""
        if metric_name not in self.metrics or not self.metrics[metric_name]:
            return {}

        values = self.metrics[metric_name]
        return {
            "count": len(values),
            "min": min(values),
            "max": max(values),
            "mean": mean(values),
            "stdev": stdev(values) if len(values) > 1 else 0,
            "p50": quantiles(values, n=4)[1],  # 50th percentile (median)
            "p99": quantiles(values, n=100)[-1]  # 99th percentile
        }

    async def export_metrics(self, time_interval: int = 60) -> dict:
        """定期导出指标"""
        return {
            "timestamp": datetime.now().isoformat(),
            "metrics": {
                name: self.get_statistics(name)
                for name in self.metrics
            },
            "counters": dict(self.counters)
        }

# 性能指标收集使用示例
metrics = MetricsCollector()

# 记录工具执行时间
start = time.time()
result = await tool.execute()
duration = (time.time() - start) * 1000
metrics.record_duration("tool_execution_time", duration)

# 记录错误计数
if result.status != "success":
    metrics.increment_counter("tool_execution_errors")

# 导出指标
stats = metrics.get_statistics("tool_execution_time")
print(f"Tool execution: mean={stats['mean']:.1f}ms, p99={stats['p99']:.1f}ms")
```

### 可观测性的集成

可观测性的三个支柱（日志、追踪、指标）需要有机集成，通过共同的上下文实现关联：

```python
class ObservabilityManager:
    """统一的可观测性管理"""

    def __init__(
        self,
        logger: StructuredLogger,
        tracer: Tracer,
        metrics: MetricsCollector
    ):
        self.logger = logger
        self.tracer = tracer
        self.metrics = metrics

    async def record_tool_execution(
        self,
        tool_name: str,
        params: dict,
        result: ToolResult,
        duration_ms: float
    ) -> None:
        """记录工具执行的完整可观测性"""
        # 1. 记录日志
        await self.logger.log(
            level=LogLevel.INFO if result.status == "success" else LogLevel.ERROR,
            message=f"Tool execution completed: {tool_name}",
            tool_name=tool_name,
            status=result.status,
            duration_ms=duration_ms
        )

        # 2. 记录指标
        self.metrics.record_duration(
            f"tool_execution_time:{tool_name}",
            duration_ms
        )
        if result.status != "success":
            self.metrics.increment_counter(f"tool_execution_errors:{tool_name}")

        # 3. 追踪信息已通过span记录
```

## 2.3.4 安全与可观测性的协同

安全检查和可观测性监控需要紧密协作，通过统一的上下文追踪和记录所有安全相关事件：

```python
class SecureObservableHarness:
    """整合安全性和可观测性的Harness"""

    def __init__(
        self,
        permission_manager: PermissionManager,
        audit_logger: AuditLogger,
        observability: ObservabilityManager
    ):
        self.permission_manager = permission_manager
        self.audit_logger = audit_logger
        self.observability = observability

    async def execute_tool_safely(
        self,
        agent_id: str,
        tool_name: str,
        params: dict
    ) -> ToolResult:
        """安全且可观测的工具执行"""
        # 1. 权限检查
        allowed, reason = await self.permission_manager.check_permission(
            agent_id=agent_id,
            resource_id=f"tool://{tool_name}",
            action="execute"
        )

        if not allowed:
            # 记录被拒绝的尝试
            await self.audit_logger.log(AuditLog(
                timestamp=datetime.now(),
                agent_id=agent_id,
                action_type="permission_check",
                resource=f"tool://{tool_name}",
                operation="execute",
                status="denied",
                details={"reason": reason}
            ))

            return ToolResult(
                status="permission_denied",
                error=reason
            )

        # 2. 记录尝试
        await self.audit_logger.log(AuditLog(
            timestamp=datetime.now(),
            agent_id=agent_id,
            action_type="tool_execution_start",
            resource=f"tool://{tool_name}",
            operation="execute",
            status="started"
        ))

        # 3. 执行工具
        start_time = time.time()
        try:
            result = await tool.execute(**params)
            duration_ms = (time.time() - start_time) * 1000

            # 4. 记录成功
            await self.audit_logger.log(AuditLog(
                timestamp=datetime.now(),
                agent_id=agent_id,
                action_type="tool_execution_complete",
                resource=f"tool://{tool_name}",
                operation="execute",
                status="success"
            ))

            # 5. 记录可观测性数据
            await self.observability.record_tool_execution(
                tool_name=tool_name,
                params=params,
                result=result,
                duration_ms=duration_ms
            )

            return result

        except Exception as e:
            duration_ms = (time.time() - start_time) * 1000

            # 记录错误
            await self.audit_logger.log(AuditLog(
                timestamp=datetime.now(),
                agent_id=agent_id,
                action_type="tool_execution_error",
                resource=f"tool://{tool_name}",
                operation="execute",
                status="error",
                details={"error": str(e)}
            ))

            await self.observability.record_tool_execution(
                tool_name=tool_name,
                params=params,
                result=ToolResult(status="error", error=str(e)),
                duration_ms=duration_ms
            )

            raise
```

## 2.3.5 总结

安全层和可观测性层虽然横切于整个系统，但它们的实现：

* **安全层**：通过权限管理、沙箱隔离、审计日志，确保系统的每一个操作都在控制范围内
* **可观测性层**：通过日志、追踪、指标，确保当出现问题时能够快速定位和诊断

这两层的良好设计是生产级Harness系统的必要条件。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yeasy.gitbook.io/harness_engineering_guide/di-yi-bu-fen-harness-gong-cheng-ji-chu/02_architecture/2.3_safety_observability.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
