# 9.5 智能体上下文管理的高级主题

本章节深化对智能体系统中复杂上下文问题的讨论，涵盖多智能体协作、长期任务、工具结果管理和记忆系统等实战主题。

## 9.5.1 多智能体上下文冲突与协调

### 问题场景

在多智能体系统中，不同的智能体可能有以下冲突：

```
场景：电商订单处理系统

Agent A (客服助手)的上下文：
  - 目标：快速回复用户
  - 信息：用户想要退货
  - 权限：只能提供政策信息

Agent B (订单管理)的上下文：
  - 目标：防止滥用退货
  - 信息：该用户频繁退货
  - 权限：可以拒绝退货请求

冲突：两个Agent看到的用户信息不同，可能导致：
  1. 重复处理（都试图处理同一个订单）
  2. 决策冲突（一个同意，一个拒绝）
  3. 循环依赖（Agent A 等待Agent B ，反之亦然）
```

### 冲突解决机制

```python
from enum import Enum
from dataclasses import dataclass
from typing import Any, Dict, List, Set, Optional
from datetime import datetime
import threading

class ConflictType(Enum):
    """多智能体冲突类型"""
    RESOURCE_CONFLICT = "resource_conflict"      # 竞争同一资源
    DECISION_CONFLICT = "decision_conflict"      # 相互矛盾的决策
    INFORMATION_CONFLICT = "information_conflict" # 掌握的信息不一致
    AUTHORITY_CONFLICT = "authority_conflict"     # 权限冲突
    ORDERING_CONFLICT = "ordering_conflict"       # 执行顺序冲突

@dataclass
class ConflictEvent:
    """冲突事件记录"""
    agent_a: str
    agent_b: str
    conflict_type: ConflictType
    resource: str  # 冲突涉及的资源
    timestamp: str
    resolved: bool = False
    resolution_strategy: Optional[str] = None
    resolution_details: Optional[Dict] = None

class ContextConflictResolver:
    """
    多智能体上下文冲突解决器。
    使用仲裁、顺序化和优先级来解决冲突。
    """

    def __init__(self):
        self.conflict_log: List[ConflictEvent] = []
        self.resource_locks: Dict[str, str] = {}  # 资源ID -> 持有者Agent
        self.priority_matrix: Dict[str, int] = {}  # Agent -> 优先级（1-10）
        self.conflict_mutex = threading.Lock()

    def detect_potential_conflict(
        self,
        agent_a_context: Dict,
        agent_b_context: Dict,
        shared_resource: str
    ) -> Optional[ConflictType]:
        """
        检测两个Agent的上下文是否可能产生冲突。

        检查：
        1. 是否竞争同一资源
        2. 信息视图是否矛盾
        3. 目标是否相反
        """

        # 检查资源冲突
        resources_a = set(agent_a_context.get('accessible_resources', []))
        resources_b = set(agent_b_context.get('accessible_resources', []))

        if shared_resource in (resources_a & resources_b):
            # 都能访问这个资源，可能产生竞争
            return ConflictType.RESOURCE_CONFLICT

        # 检查信息冲突
        info_a = agent_a_context.get('knowledge_base', {})
        info_b = agent_b_context.get('knowledge_base', {})

        # 简化：检查关于同一对象的信息是否矛盾
        for key in set(info_a.keys()) & set(info_b.keys()):
            if info_a[key] != info_b[key]:
                return ConflictType.INFORMATION_CONFLICT

        # 检查权限冲突
        perms_a = set(agent_a_context.get('permissions', []))
        perms_b = set(agent_b_context.get('permissions', []))

        if 'can_approve' in perms_a and 'can_reject' in perms_b:
            # 一个能批准，一个能拒绝同一事项
            return ConflictType.AUTHORITY_CONFLICT

        return None

    def resolve_resource_conflict(
        self,
        agent_a: str,
        agent_b: str,
        resource: str
    ) -> str:
        """
        解决资源竞争冲突。
        使用加锁机制确保同时只有一个Agent访问。
        """

        with self.conflict_mutex:
            # 检查资源是否已被锁定
            if resource in self.resource_locks:
                current_holder = self.resource_locks[resource]
                if current_holder != agent_a and current_holder != agent_b:
                    # 资源被第三方持有
                    return f"WAITING"

            # 根据优先级分配资源
            priority_a = self.priority_matrix.get(agent_a, 5)
            priority_b = self.priority_matrix.get(agent_b, 5)

            if priority_a > priority_b:
                winner = agent_a
            elif priority_b > priority_a:
                winner = agent_b
            else:
                # 优先级相同，按时间顺序（先请求先得）
                winner = agent_a

            # 锁定资源
            self.resource_locks[resource] = winner

            # 记录冲突
            conflict = ConflictEvent(
                agent_a=agent_a,
                agent_b=agent_b,
                conflict_type=ConflictType.RESOURCE_CONFLICT,
                resource=resource,
                timestamp=datetime.now().isoformat(),
                resolved=True,
                resolution_strategy="priority_based_locking",
                resolution_details={'winner': winner, 'priority_a': priority_a, 'priority_b': priority_b}
            )

            self.conflict_log.append(conflict)

            return winner

    def resolve_information_conflict(
        self,
        agent_a: str,
        agent_b: str,
        attribute: str,
        value_a: Any,
        value_b: Any
    ) -> Any:
        """
        解决信息冲突。
        优先使用更新的或更权威的信息源。
        """

        # 策略1：时间戳（更新的信息优先）
        if hasattr(value_a, 'timestamp') and hasattr(value_b, 'timestamp'):
            newer = value_a if value_a.timestamp > value_b.timestamp else value_b
            return newer

        # 策略2：权威性（某些Agent对某些信息更权威）
        authority_a = self._get_authority_score(agent_a, attribute)
        authority_b = self._get_authority_score(agent_b, attribute)

        if authority_a > authority_b:
            return value_a
        elif authority_b > authority_a:
            return value_b

        # 策略3：共识（需要询问两个Agent，取一致的答案）
        # 这里简化为取value_a，实际应该是复杂的协商过程
        return value_a

    def synchronize_contexts(
        self,
        agents: List[str],
        shared_attributes: Dict[str, str]  # {属性名: 真实值}
    ) -> Dict[str, Dict]:
        """
        同步多个Agent的上下文到一致状态。

        当冲突无法自动解决时，需要显式同步。
        """

        synchronized_contexts = {}

        for agent in agents:
            # 为每个Agent准备同步后的上下文
            sync_context = {
                'agent': agent,
                'synchronized_attributes': shared_attributes.copy(),
                'last_sync_time': datetime.now().isoformat(),
            }

            synchronized_contexts[agent] = sync_context

        return synchronized_contexts

    def _get_authority_score(self, agent: str, attribute: str) -> float:
        """
        计算Agent对某个属性的权威性评分。
        0 - 1 之间，1表示完全权威。
        """
        # 这是一个简化的实现，实际应该基于：
        # - Agent的特化程度
        # - 过往准确性记录
        # - 数据源的可信度

        authority_matrix = {
            ('customer_service', 'customer_preference'): 0.9,
            ('customer_service', 'fraud_detection'): 0.3,
            ('fraud_detection', 'fraud_detection'): 0.95,
            ('fraud_detection', 'customer_preference'): 0.1,
        }

        return authority_matrix.get((agent, attribute), 0.5)
```

## 9.5.2 长期任务中的上下文漂移问题

### 问题分析

在长期任务（如跨多天的项目管理或持续监控）中，上下文可能发生“漂移”：

```
初始任务："为CEO准备季度业绩报告"

第 1 小时: 上下文精准，聚焦于财务数据收集
第 4 小时: 开始包含细枝末节（某个部门的招聘计划）
第 8 小时: 出现明显偏离（讨论员工食堂菜单）
第 16 小时: 完全偏离主题（生成无关的营销文案）

原因：
1. 每一轮交互都添加新信息，久而久之上下文变得嘈杂
2. 早期重要的约束条件逐渐被遗忘
3. 中间步骤的输出可能包含错误，这些错误被纳入后续上下文
4. Token预算压力导致关键信息被压缩或删除
```

### 缓解策略

```python
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import json

class ContextDriftDetector:
    """检测和防止上下文漂移"""

    def __init__(self, drift_threshold: float = 0.3):
        """
        drift_threshold: 漂移程度的阈值
        - 0: 完全不漂移
        - 1: 完全漂移
        """
        self.drift_threshold = drift_threshold
        self.initial_goal: Optional[str] = None
        self.context_history: List[Dict] = []

    def set_initial_goal(self, goal: str) -> None:
        """设置任务初始目标"""
        self.initial_goal = goal

    def measure_context_drift(
        self,
        current_context: str,
        step_number: int
    ) -> Tuple[float, str]:
        """
        测量当前上下文与初始目标的偏离程度。

        使用简化的关键词匹配：
        - 提取初始目标的关键词
        - 计算当前上下文中这些关键词的覆盖率
        """

        if not self.initial_goal:
            return 0.0, "No initial goal set"

        initial_keywords = set(self.initial_goal.lower().split())
        current_keywords = set(current_context.lower().split())

        # 计算漂移度：初始关键词在当前上下文中的缺失比例
        missing_keywords = initial_keywords - current_keywords
        drift_score = len(missing_keywords) / max(len(initial_keywords), 1)

        # 记录历史
        self.context_history.append({
            'step': step_number,
            'drift_score': drift_score,
            'timestamp': datetime.now().isoformat(),
        })

        return drift_score, f"Drift score: {drift_score:.2f}"

    def detect_drift_alert(
        self,
        current_context: str,
        step_number: int
    ) -> Tuple[bool, Optional[str]]:
        """
        检测是否需要发出漂移告警。
        """

        drift_score, _ = self.measure_context_drift(current_context, step_number)

        if drift_score > self.drift_threshold:
            alert_msg = (
                f"Warning: Context drift detected at step {step_number}. "
                f"Drift score: {drift_score:.2f}. "
                f"Consider refocusing the context."
            )
            return True, alert_msg

        return False, None

    def get_drift_trend(self) -> Dict:
        """获取漂移的趋势"""

        if len(self.context_history) < 2:
            return {'trend': 'insufficient_data'}

        recent_drifts = [h['drift_score'] for h in self.context_history[-5:]]

        # 简单的趋势判断
        if all(d < 0.2 for d in recent_drifts):
            trend = 'stable'
        elif recent_drifts[-1] > recent_drifts[0]:
            trend = 'drifting'
        else:
            trend = 'improving'

        return {
            'trend': trend,
            'current_drift': recent_drifts[-1],
            'max_drift': max(recent_drifts),
            'avg_drift': sum(recent_drifts) / len(recent_drifts),
        }


class ContextRefocuser:
    """当检测到漂移时，重新聚焦上下文"""

    def __init__(self):
        self.refocus_history = []

    def refocus_context(
        self,
        original_goal: str,
        current_context: str,
        step_number: int
    ) -> str:
        """
        重新生成一个聚焦于原始目标的上下文。

        策略：
        1. 提取原始目标的关键信息
        2. 从当前上下文中保留相关部分
        3. 删除无关信息
        4. 添加进度检查点
        """

        refocused = f"""
【任务重新聚焦 - 第{step_number}步】

【原始目标】
{original_goal}

【当前进度检查点】
- 当前步骤: {step_number}
- 近期完成的关键任务: [待填充]
- 下一步目标: [待填充]

【相关上下文】（已过滤无关信息）
"""

        # 简化：保留前500个字符（实际应该更智能的过滤）
        relevant_context = current_context[:500]
        refocused += relevant_context

        refocused += """

【对齐检查】
请确认：
1. 你仍在追求上述原始目标吗？
2. 你做的每一步都在帮助实现这个目标吗？
3. 是否有信息偏离了主题线？
"""

        # 记录重新聚焦
        self.refocus_history.append({
            'step': step_number,
            'timestamp': datetime.now().isoformat(),
        })

        return refocused
```

## 9.5.3 工具调用结果的上下文管理策略

```python
# 场景：智能体需要调用多个工具

agents_task = """
任务：查找用户John Doe在2024 年 3 月的所有订单

步骤：
1. 调用 search_user API -> 返回用户ID: U12345
2. 调用 get_orders API(U12345, 2024-03) -> 返回订单列表
3. 对每个订单，调用 get_order_details API
4. 生成总结报告
"""

# 挑战：
# - 中间结果可能很大（如1000个订单的列表）
# - 如果全部纳入上下文，会导致Token爆炸
# - 但删除某些结果可能导致后续步骤失败
# - 需要智能的结果摘要和聚合
```

### 解决方案

```python
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
import json

class ToolResultProcessingStrategy(Enum):
    """工具结果处理策略"""
    KEEP_ALL = "keep_all"                           # 保留所有结果
    SUMMARIZE = "summarize"                         # 摘要
    SAMPLE = "sample"                               # 采样
    AGGREGATE = "aggregate"                         # 聚合
    FILTER_BY_RELEVANCE = "filter_by_relevance"    # 按相关性过滤

@dataclass
class ToolResult:
    """工具调用结果"""
    tool_name: str
    parameters: Dict
    result: Any
    execution_time_ms: float
    success: bool
    error: Optional[str] = None
    timestamp: str = None

    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now().isoformat()

    def size_bytes(self) -> int:
        """估计结果的大小（字节）"""
        return len(json.dumps(self.result, default=str).encode())

class ToolResultContextManager:
    """管理工具结果在上下文中的大小和相关性"""

    def __init__(self, max_context_size_tokens: int = 4000):
        self.max_context_size_tokens = max_context_size_tokens
        self.tool_results: List[ToolResult] = []

    def process_tool_result(
        self,
        tool_result: ToolResult,
        strategy: ToolResultProcessingStrategy = ToolResultProcessingStrategy.SUMMARIZE,
        relevance_query: Optional[str] = None
    ) -> Dict:
        """
        处理工具结果，根据策略进行优化。
        """

        processed = {
            'tool_name': tool_result.tool_name,
            'original_size_bytes': tool_result.size_bytes(),
            'processed_size_bytes': 0,
            'strategy_used': strategy.value,
            'content': None,
        }

        if strategy == ToolResultProcessingStrategy.KEEP_ALL:
            processed['content'] = tool_result.result
            processed['processed_size_bytes'] = tool_result.size_bytes()

        elif strategy == ToolResultProcessingStrategy.SUMMARIZE:
            processed['content'] = self._summarize_result(tool_result.result)
            processed['processed_size_bytes'] = len(str(processed['content']).encode())

        elif strategy == ToolResultProcessingStrategy.SAMPLE:
            processed['content'] = self._sample_result(tool_result.result)
            processed['processed_size_bytes'] = len(str(processed['content']).encode())

        elif strategy == ToolResultProcessingStrategy.AGGREGATE:
            processed['content'] = self._aggregate_result(tool_result.result)
            processed['processed_size_bytes'] = len(str(processed['content']).encode())

        elif strategy == ToolResultProcessingStrategy.FILTER_BY_RELEVANCE:
            if relevance_query:
                processed['content'] = self._filter_by_relevance(
                    tool_result.result,
                    relevance_query
                )
            else:
                processed['content'] = tool_result.result

        # 计算压缩比
        if processed['original_size_bytes'] > 0:
            compression_ratio = (
                1 - processed['processed_size_bytes'] / processed['original_size_bytes']
            )
            processed['compression_ratio'] = compression_ratio

        self.tool_results.append(tool_result)

        return processed

    def _summarize_result(self, result: Any) -> Dict:
        """生成结果的摘要"""

        if isinstance(result, list):
            return {
                'type': 'list_summary',
                'total_count': len(result),
                'first_item': result[0] if result else None,
                'last_item': result[-1] if result else None,
                'sample_count': min(3, len(result)),
                'sample_items': result[:3],
            }
        elif isinstance(result, dict):
            return {
                'type': 'dict_summary',
                'keys': list(result.keys()),
                'value_types': {k: type(v).__name__ for k, v in result.items()},
            }
        else:
            return {'type': 'scalar', 'value': result}

    def _sample_result(self, result: Any, sample_size: int = 5) -> Any:
        """从结果中采样"""

        if isinstance(result, list):
            if len(result) <= sample_size:
                return result

            # 采样策略：头尾各取一些，中间均匀分布
            sampled = []
            sampled.append(result[0])  # 第一个

            if len(result) > 2:
                sampled.append(result[-1])  # 最后一个

            # 中间均匀分布
            if len(result) > sample_size:
                step = len(result) // (sample_size - 2)
                for i in range(1, len(result) - 1, step):
                    sampled.append(result[i])

            return sampled[:sample_size]

        return result

    def _aggregate_result(self, result: Any) -> Dict:
        """聚合结果统计信息"""

        if isinstance(result, list) and all(isinstance(item, (int, float)) for item in result):
            return {
                'type': 'numeric_aggregate',
                'count': len(result),
                'sum': sum(result),
                'mean': sum(result) / len(result) if result else 0,
                'min': min(result) if result else None,
                'max': max(result) if result else None,
            }

        elif isinstance(result, list) and all(isinstance(item, dict) for item in result):
            # 对于字典列表，汇总每个键的值
            aggregated = {}
            for key in result[0].keys() if result else []:
                values = [item[key] for item in result if key in item]

                if all(isinstance(v, (int, float)) for v in values):
                    aggregated[key] = {
                        'sum': sum(values),
                        'mean': sum(values) / len(values),
                        'count': len(values),
                    }
                else:
                    aggregated[key] = {
                        'unique_count': len(set(str(v) for v in values)),
                        'sample': str(values[0]) if values else None,
                    }

            return {'type': 'dict_list_aggregate', 'aggregates': aggregated}

        return {'type': 'non_aggregatable', 'message': 'Cannot aggregate this result type'}

    def _filter_by_relevance(self, result: Any, query: str) -> Any:
        """按相关性过滤结果"""

        if not isinstance(result, list):
            return result

        query_terms = set(query.lower().split())

        def relevance_score(item: Any) -> float:
            """计算项目与查询的相关性"""
            item_str = json.dumps(item, default=str).lower()
            matching_terms = sum(1 for term in query_terms if term in item_str)
            return matching_terms / len(query_terms) if query_terms else 0

        # 过滤：只保留相关性>0的项目
        filtered = [item for item in result if relevance_score(item) > 0]

        # 按相关性排序
        filtered.sort(key=relevance_score, reverse=True)

        return filtered

    def build_context_from_results(self) -> str:
        """从所有工具结果构建上下文"""

        context = "【工具执行结果】\n"

        current_size_tokens = 0

        for result in self.tool_results:
            result_text = f"\n【{result.tool_name}】\n"
            result_text += f"参数: {result.parameters}\n"
            result_text += f"结果: {str(result.result)[:200]}...\n"  # 限制长度
            result_text += f"执行时间: {result.execution_time_ms}ms\n"

            result_size_tokens = len(result_text) // 4

            if current_size_tokens + result_size_tokens <= self.max_context_size_tokens:
                context += result_text
                current_size_tokens += result_size_tokens
            else:
                break

        return context
```

## 9.5.4 智能体记忆的持久化与恢复

```python
import sqlite3
import json
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional

class AgentMemoryPersistence:
    """智能体记忆的持久化管理"""

    def __init__(self, db_path: str = "./agent_memory.db"):
        self.db_path = Path(db_path)
        self._init_db()

    def _init_db(self):
        """初始化数据库"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()

            # 创建记忆表
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS memories (
                    id TEXT PRIMARY KEY,
                    agent_id TEXT NOT NULL,
                    content TEXT NOT NULL,
                    memory_type TEXT NOT NULL,
                    importance REAL,
                    created_at TEXT NOT NULL,
                    last_updated TEXT NOT NULL,
                    access_count INTEGER DEFAULT 0,
                    data_json TEXT
                )
            """)

            # 创建会话表
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS sessions (
                    session_id TEXT PRIMARY KEY,
                    agent_id TEXT NOT NULL,
                    created_at TEXT NOT NULL,
                    ended_at TEXT,
                    context_size_tokens INTEGER,
                    final_status TEXT
                )
            """)

            conn.commit()

    def save_memory(
        self,
        agent_id: str,
        memory_id: str,
        content: str,
        memory_type: str,
        importance: float = 0.5,
        data: Optional[Dict[str, Any]] = None
    ) -> None:
        """保存单条记忆"""

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()

            data_json = json.dumps(data, ensure_ascii=False) if data else None

            cursor.execute("""
                INSERT OR REPLACE INTO memories
                (id, agent_id, content, memory_type, importance, created_at, last_updated, data_json)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                memory_id,
                agent_id,
                content,
                memory_type,
                importance,
                datetime.now().isoformat(),
                datetime.now().isoformat(),
                data_json
            ))

            conn.commit()

    def load_memories(
        self,
        agent_id: str,
        memory_type: Optional[str] = None,
        min_importance: float = 0.0
    ) -> List[Dict]:
        """加载记忆"""

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()

            query = "SELECT * FROM memories WHERE agent_id = ? AND importance >= ?"
            params = [agent_id, min_importance]

            if memory_type:
                query += " AND memory_type = ?"
                params.append(memory_type)

            cursor.execute(query, params)
            rows = cursor.fetchall()

            memories = []
            for row in rows:
                memory = {
                    'id': row[0],
                    'agent_id': row[1],
                    'content': row[2],
                    'memory_type': row[3],
                    'importance': row[4],
                    'created_at': row[5],
                    'access_count': row[8],
                }

                if row[9]:  # JSON 扩展字段，禁止用 pickle 反序列化不可信记忆
                    try:
                        memory['data'] = json.loads(row[9])
                    except json.JSONDecodeError:
                        pass

                memories.append(memory)

            return memories

    def save_session_context(
        self,
        agent_id: str,
        session_id: str,
        context_snapshot: Dict
    ) -> None:
        """
        保存整个会话的上下文快照。
        用于长期任务的中断和恢复。
        """

        # 序列化快照
        snapshot_json = json.dumps(context_snapshot, default=str)

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()

            cursor.execute("""
                INSERT INTO sessions
                (session_id, agent_id, created_at, context_size_tokens, final_status)
                VALUES (?, ?, ?, ?, ?)
            """, (
                session_id,
                agent_id,
                datetime.now().isoformat(),
                len(snapshot_json) // 4,  # Token估计
                'in_progress'
            ))

            conn.commit()

            # 保存详细的上下文为文件（避免BLOB过大）
            safe_session_id = re.sub(r"[^A-Za-z0-9_.-]", "_", session_id)
            context_file = self.db_path.parent / f"context_{safe_session_id}.json"
            with open(context_file, 'w', encoding='utf-8') as f:
                f.write(snapshot_json)

    def load_session_context(self, session_id: str) -> Optional[Dict]:
        """加载保存的会话上下文"""

        safe_session_id = re.sub(r"[^A-Za-z0-9_.-]", "_", session_id)
        context_file = self.db_path.parent / f"context_{safe_session_id}.json"

        if context_file.exists():
            with open(context_file, 'r', encoding='utf-8') as f:
                return json.load(f)

        return None

    def cleanup_old_sessions(self, days_to_keep: int = 30) -> int:
        """清理过期的会话记录"""

        cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat()

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()

            cursor.execute(
                "DELETE FROM sessions WHERE created_at < ? AND final_status = 'completed'",
                (cutoff_date,)
            )

            deleted_count = cursor.rowcount
            conn.commit()

            return deleted_count
```

## 9.5.5 智能体推理中的上下文工程视角

Wei 等人在 2026 年的综述 *Agentic Reasoning for Large Language Models*（arXiv: 2601.12538）中提出了智能体推理的三层分类体系：基础推理、自我进化推理和集体多智能体推理。每一层对上下文工程都提出了不同的需求。

### 上下文内推理与训练后推理

论文区分了两种实现推理能力的路径，这一区分对上下文工程实践有直接影响：

* **上下文内推理**（In-Context Reasoning）：通过精心编排的提示词、工具描述和检索结果，在推理时激活模型的推理能力。ReAct、CoT、ToT 等技术均属此类。**上下文工程的核心价值正体现于此**——推理质量高度依赖上下文的信息密度、结构化程度和噪声水平。本书前面章节讨论的 [写入](/context_engineering_guide/di-er-bu-fen-he-xin-ji-shu-yu-ce-le/04_write.md)、[选择](/context_engineering_guide/di-er-bu-fen-he-xin-ji-shu-yu-ce-le/05_select.md)、[压缩](/context_engineering_guide/di-er-bu-fen-he-xin-ji-shu-yu-ce-le/06_compress.md) 和 [隔离](/context_engineering_guide/di-er-bu-fen-he-xin-ji-shu-yu-ce-le/07_isolate.md) 四大策略，本质上都是为上下文内推理服务的。
* **训练后推理**（Post-Training Reasoning）：通过强化学习或监督微调将推理模式固化到模型权重中。这类推理对上下文的依赖相对较低，但仍需要高质量的上下文来触发正确的推理路径。

**工程启示**：对于同一个智能体系统，应根据任务特征混合使用两种路径——高频标准任务使用训练后推理以降低上下文工程复杂度；长尾复杂任务使用上下文内推理以保持灵活性。

### 自我进化与上下文的动态演进

论文的“自我进化推理”层强调智能体通过反馈和记忆持续改进。从上下文工程角度看，这意味着上下文不再是静态的输入，而是一个随智能体经验积累而动态演进的系统：

1. **经验到上下文的转化**：成功的推理轨迹应被提炼为可复用的上下文模板（如 Few-Shot 示例），失败的轨迹则转化为约束规则注入系统提示词。这与 [9.4 节](/context_engineering_guide/di-san-bu-fen-jin-jie-ji-shu-yu-jia-gou/09_agents/9.4_agent_memory.md) 讨论的记忆机制形成闭环。
2. **上下文质量的反馈信号**：推理成功率本身就是衡量上下文质量的指标。当智能体在某类任务上频繁失败时，应触发上下文管道的自动审查——可能是检索召回率不足、压缩过度丢失了关键信息，或是工具描述不够精确。

### 多智能体推理的上下文协调

论文将多智能体协作纳入“推理”范畴，认为协调本身是一种分布式推理过程。这与 [9.5.1 节](#951-多智能体上下文冲突与协调) 讨论的冲突解决机制相呼应，并进一步引出：

* **共享上下文层**：参与协作推理的智能体需要一个共享的“事实基础”（Shared Grounding）。上下文工程的挑战在于——既要保证共享信息的一致性，又要为每个智能体保留其专属的角色上下文，防止角色混淆。
* **推理链的上下文开销**：多智能体推理中，每一轮协调都会产生新的中间状态需要注入上下文。随着推理轮次增加，上下文膨胀问题比单智能体场景更为严峻，[第六章](/context_engineering_guide/di-er-bu-fen-he-xin-ji-shu-yu-ce-le/06_compress.md) 讨论的压缩策略在此场景下尤为关键。

这一章节深化了对智能体系统中复杂上下文管理的理解，为生产环境中的多智能体协作提供了实用的解决方案。

## 9.5.6 案例研究：Claude Code 的上下文管理架构

Claude Code 是 Anthropic 官方的命令行工具，其公开文档展示了现代智能体系统中上下文工程的若干实践。以下分析只把公开文档明确描述的 CLAUDE.md、MCP、工具和记忆机制作为事实；涉及内部函数名、比例和成本数字的内容均为工程化推断，不代表官方实现细节。

![Claude Code 上下文架构图](/files/DJ6QXVEBrW4EU9RTi1Um)

图 9-7：Claude Code 上下文架构图

从工程视角看，Claude Code 的关键不是把所有信息一次性塞进上下文，而是把“可缓存前缀”“项目记忆层级”“按需工具细节”和“压缩恢复回路”拆成不同责任层。

一种可借鉴的系统提示词架构是模块化和可缓存性。可以把系统提示词分解为若干个可独立管理的片段，并用内部边界标记分为两部分：

* **静态部分**：包含通用的智能体行为准则、工具使用规范、安全边界等，这部分在整个组织内保持一致，可以跨会话复用。利用供应商提供的 prompt caching 机制，这部分的 cache 创建成本可被多个会话摊销。
* **动态部分**：包含用户特定的约束、会话参数、项目配置等。这部分不能缓存，但占比较小。

在自建系统中，可以把“不可缓存片段”封装成显式函数，并在代码审查中要求说明为什么必须破坏缓存。这样能迫使开发者意识到 cache read 与 cache write 的价格差异；具体倍率应按当前 Anthropic pricing page 和目标模型重算。

```python
# 伪代码：系统提示词架构
def build_system_prompt(user_context, session_config):
    static_part = SYSTEM_PROMPT_STATIC  # 可缓存的稳定前缀

    # 缓存标记
    prompt = static_part + "\n\n[SYSTEM_PROMPT_DYNAMIC_BOUNDARY]\n\n"

    # 动态部分，不缓存
    dynamic_part = assemble_dynamic_context(user_context, session_config)
    prompt += dynamic_part

    return prompt

def add_uncached_constraint(prompt, constraint):
    """添加会破坏缓存的内容"""
    DANGEROUS_uncachedSystemPromptSection()  # 显式警告
    return prompt + f"\nCRITICAL: {constraint}"
```

### 上下文组装的两层策略

公开文档中的项目说明、记忆和工具机制可以抽象为两层上下文组装策略：

**第一层：系统上下文**

* 收集 Git 仓库状态（当前分支、未提交改动、最近提交信息）
* 提取仓库元数据（语言、框架、构建工具、依赖项）
* 扫描项目结构和关键文件清单

这一层的结果在每个会话内进行 memoize，避免重复收集同样的静态元数据。

**第二层：用户上下文**

* 发现并加载 CLAUDE.md 文件（见下文）
* 加载记忆文件和日期上下文
* 整合用户提供的额外信息

### CLAUDE.md 文件的优先级链发现机制

Claude Code 文档显示，CLAUDE.md 相关说明支持组织级、用户级、项目级和本地级别的上下文管理：

1. **托管策略（Managed Policy）**：组织统一管理的指令
2. **用户级说明（\~/.claude/CLAUDE.md）**：用户主目录下的个人默认上下文
3. **项目级说明**：项目根目录或 `.claude/` 目录下的 CLAUDE.md
4. **本地级说明（CLAUDE.local.md）**：不提交到版本库的本地上下文

加载时需要区分“策略优先级”和“拼接顺序”：官方文档说明 Claude Code 会沿目录树读取 CLAUDE.md / CLAUDE.local.md，并按从外层目录到当前工作目录的顺序拼接；同一目录中本地说明会追加在项目说明之后。也就是说，靠近启动目录的说明更晚进入上下文，但不应假设它们会像配置字典一样覆盖父级内容。

```python
# 伪代码：CLAUDE.md 发现优先级链
def discover_claude_md(start_path):
    priority_files = [
        ("managed", get_org_mdm_memory()),  # 最高优先级
        ("user", Path.home() / ".claude" / "CLAUDE.md"),
        ("project", find_in_git_tree([
            "CLAUDE.md",
            ".claude/CLAUDE.md",
            ".claude/rules/*.md"
        ])),
        ("local", Path.cwd() / "CLAUDE.local.md"),  # 最低优先级
    ]

    loaded_context = []
    for level, path in reversed(priority_files):  # 低优先级先加载
        if path.exists():
            loaded_context.append(load_markdown(path))

    return "\n\n".join(loaded_context)
```

### @include 指令与可组合上下文

Claude Code 支持在 CLAUDE.md 文件中使用 `@path/to/import` 形式组合多个外部文件，支持相对路径、绝对路径和用户主目录路径：

* `@path/to/file`：相对于项目根目录的相对路径
* `@./relative`：相对于当前文件所在目录
* `@~/home`：相对于用户主目录
* `@/absolute`：绝对路径

该机制通过维护一个“已处理文件追踪集合”（processed-file tracking set）和最大包含深度来防止循环包含。当处理导入指令时，系统会：

1. 解析目标文件路径
2. 检查该文件是否已在当前包含链中处理过
3. 如果是，抛出循环依赖错误；否则加载并递归处理
4. 将处理结果合并到当前上下文中

这种设计允许团队构建可复用的上下文库（如通用的编码标准、API 规范等），各项目通过轻量级的 @include 语句组合使用。

### 令牌预算的精确管理

生产级智能体可以采用精确的令牌预算跟踪。对于并行工具调用（如同一步骤内多个文件操作），系统记录每个工具调用的标识，并在接收结果时根据实际消费的令牌调整预算。

**自动压缩阈值**应按模型窗口、估算误差、工具结果波动和成本预算配置，并保留足够安全余量，确保即使估计略有偏差也不会超出窗口限制。

系统可以实现一个**断路器（circuit breaker）机制**：当连续压缩失败超过预算时（通常表明上下文中存在不可压缩的大型工具结果），系统停止重试，转而采用更激进的策略（如删除较早的工具结果）。这样做的目的是避免每次失败都再次发送完整上下文，造成成本意外放大。

### 延迟工具加载（Deferred Tool Loading）

为了节省系统提示词中的令牌开销，Claude Code 文档描述了 MCP Tool Search 和 `alwaysLoad` 等机制：常用工具可以预先加载，不常用工具则通过搜索或按需展开减少上下文占用。

例如，代码分析工具可以在初始上下文中只保留短描述，当用户请求代码分析时再加载其完整的参数定义和用法示例。具体节省比例取决于工具数量、schema 大小和触发频率，需要通过线上 token 指标验证。

这类优化体现了现代智能体系统的一个重要方向：把能力、上下文成本和工具可见性作为同一个工程问题来管理，而不是把所有说明和工具 schema 一次性塞进提示词。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yeasy.gitbook.io/context_engineering_guide/di-san-bu-fen-jin-jie-ji-shu-yu-jia-gou/09_agents/9.5_advanced_agent_context.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
