14.7 可运行的代码示例与实现模式

本章节汇总第十四章中最常用的代码模式,覆盖上下文工程的核心场景。这些代码分为两类:

  • 可直接运行的最小示例:完整、自包含,适合先本地跑通关键链路。

  • 教学性实现模式:用于展示接口边界、系统拆分和核心算法思路,接入真实 SDK 或线上基础设施前通常需要补齐鉴权、模型调用层和错误处理。

建议在学习这些章节时,不仅仅阅读代码,更要动手运行、修改和扩展这些示例。通过实际操作来理解上下文工程的各个环节。

14.7.1 完整的 RAG 管道实现

本节先给出一个 可直接运行的本地最小示例。它不依赖外部模型 API,而是用标准库完成分词、检索、上下文组装和带来源标注的回答,便于先验证完整链路。

"""
minimal_rag_system.py
一个可直接运行的本地 RAG 最小示例。
依赖: Python 3.11+(仅标准库)
"""

from __future__ import annotations

from dataclasses import dataclass
from math import sqrt
import re
from typing import Dict, List


def tokenize(text: str) -> List[str]:
    """同时兼容英文单词和中文字符的简单分词。"""
    raw_tokens = re.findall(r"[\u4e00-\u9fff]+|[A-Za-z0-9_]+", text.lower())
    normalized_tokens: List[str] = []

    for token in raw_tokens:
        if re.fullmatch(r"[\u4e00-\u9fff]+", token):
            normalized_tokens.extend(list(token))
        else:
            normalized_tokens.append(token)

    return normalized_tokens


def term_frequency(text: str) -> Dict[str, float]:
    """构造简单的词频向量。"""
    frequencies: Dict[str, float] = {}
    for token in tokenize(text):
        frequencies[token] = frequencies.get(token, 0.0) + 1.0
    return frequencies


def cosine_similarity(left: Dict[str, float], right: Dict[str, float]) -> float:
    """计算两个稀疏词频向量的余弦相似度。"""
    shared_terms = set(left) & set(right)
    dot_product = sum(left[term] * right[term] for term in shared_terms)
    left_norm = sqrt(sum(value * value for value in left.values()))
    right_norm = sqrt(sum(value * value for value in right.values()))

    if left_norm == 0 or right_norm == 0:
        return 0.0

    return dot_product / (left_norm * right_norm)


def split_sentences(text: str) -> List[str]:
    """按中英文句号做简单切句。"""
    sentences = re.split(r"(?<=[。!?.!?])\s*", text.strip())
    return [sentence.strip() for sentence in sentences if sentence.strip()]


@dataclass
class Document:
    """文档结构"""
    id: str
    content: str
    metadata: Dict

@dataclass
class RetrievalResult:
    """检索结果"""
    document: Document
    score: float

class SimpleEmbeddingModel:
    """
    极简嵌入模型包装。
    这里直接使用词频向量,便于本地运行和理解检索原理。
    实际生产可替换为领域嵌入模型或向量服务。
    """

    def embed(self, text: str) -> Dict[str, float]:
        """获取文本的嵌入向量"""
        return term_frequency(text)


class SimpleVectorStore:
    """
    简化的向量存储。
    实际应用中应使用带持久化和过滤能力的向量数据库。
    """

    def __init__(self):
        self.documents: List[Document] = []
        self.vectors: List[Dict[str, float]] = []
        self.embedding_model = SimpleEmbeddingModel()

    def add_document(self, doc: Document) -> None:
        """添加文档到向量库"""
        vector = self.embedding_model.embed(doc.content)
        self.documents.append(doc)
        self.vectors.append(vector)

    def add_documents(self, docs: List[Document]) -> None:
        """批量添加文档"""
        for doc in docs:
            self.add_document(doc)

    def retrieve(self, query: str, top_k: int = 3) -> List[RetrievalResult]:
        """检索与查询最相似的文档"""
        if not self.vectors:
            return []

        # 获取查询的嵌入
        query_vector = self.embedding_model.embed(query)

        # 计算相似度
        similarities = [
            cosine_similarity(query_vector, document_vector)
            for document_vector in self.vectors
        ]

        # 获取 top-k 最相似的文档
        top_indices = sorted(
            range(len(similarities)),
            key=lambda index: similarities[index],
            reverse=True
        )[:top_k]

        results = [
            RetrievalResult(
                document=self.documents[i],
                score=float(similarities[i])
            )
            for i in top_indices
            if similarities[i] > 0.0  # 过滤负相似度
        ]

        return results


class ContextAssembler:
    """
    上下文组装器。
    将检索到的文档组织成提示词中的上下文。
    """

    def __init__(self, max_context_chars: int = 1200):
        self.max_context_chars = max_context_chars

    def assemble_context(
        self,
        retrieved_results: List[RetrievalResult],
        user_query: str
    ) -> str:
        """组装最终的上下文提示词"""

        current_chars = len(user_query)

        context_parts = []

        # 按相似度从高到低添加文档
        for result in retrieved_results:
            doc_chars = len(result.document.content)

            # 检查是否超过上下文预算
            if current_chars + doc_chars > self.max_context_chars:
                break

            context_parts.append(
                f"【文档 {result.document.id}】(相关度: {result.score:.2f}\n"
                f"{result.document.content}\n"
            )

            current_chars += doc_chars

        return f"""你是一个有帮助的问答助手。请只依据参考文档回答,并在结论后附上来源。

【参考文档】
{chr(10).join(context_parts)}

【用户问题】
{user_query}

请基于上述参考文档回答问题,并在答案中使用 [来源: 文档ID] 标注依据。"""


class LocalAnswerGenerator:
    """基于检索结果生成带来源标注的本地答案。"""

    def __init__(self, max_sentences: int = 2):
        self.max_sentences = max_sentences

    def generate(self, query: str, retrieved_results: List[RetrievalResult]) -> str:
        if not retrieved_results:
            return "未在参考文档中找到相关信息。"

        query_terms = set(tokenize(query))
        ranked_sentences = []

        for result in retrieved_results:
            for sentence in split_sentences(result.document.content):
                sentence_terms = set(tokenize(sentence))
                overlap = len(query_terms & sentence_terms)
                if overlap > 0:
                    ranked_sentences.append((overlap, result.score, sentence, result.document.id))

        if not ranked_sentences:
            return "未在参考文档中找到足够依据回答该问题。"

        ranked_sentences.sort(reverse=True)

        selected_sentences: List[str] = []
        selected_sources: List[str] = []
        seen_sentences = set()

        for _, _, sentence, source_id in ranked_sentences:
            if sentence in seen_sentences:
                continue
            selected_sentences.append(sentence)
            selected_sources.append(source_id)
            seen_sentences.add(sentence)
            if len(selected_sentences) >= self.max_sentences:
                break

        source_list = " ".join(f"[来源: {source_id}]" for source_id in dict.fromkeys(selected_sources))
        return f"{''.join(selected_sentences)} {source_list}".strip()


class RAGPipeline:
    """
    完整的RAG管道。
    管理从查询到生成的整个流程。
    """

    def __init__(self):
        self.vector_store = SimpleVectorStore()
        self.context_assembler = ContextAssembler(max_context_chars=1200)
        self.answer_generator = LocalAnswerGenerator(max_sentences=2)

    def add_documents(self, documents: List[Dict]) -> None:
        """添加文档到系统"""
        doc_objects = [
            Document(
                id=doc.get('id', str(i)),
                content=doc['content'],
                metadata=doc.get('metadata', {})
            )
            for i, doc in enumerate(documents)
        ]
        self.vector_store.add_documents(doc_objects)

    def retrieve(self, query: str, top_k: int = 3) -> List[RetrievalResult]:
        """检索相关文档"""
        return self.vector_store.retrieve(query, top_k=top_k)

    def query(self, user_query: str, top_k: int = 3) -> Dict:
        """执行完整的RAG查询"""
        # 1. 检索相关文档
        retrieved = self.retrieve(user_query, top_k=top_k)

        # 2. 组装上下文
        assembled_context = self.context_assembler.assemble_context(retrieved, user_query)

        # 3. 基于检索结果生成带引用的答案
        answer = self.answer_generator.generate(user_query, retrieved)

        # 4. 返回结果(包含中间步骤用于调试)
        return {
            'query': user_query,
            'retrieved_documents': [
                {
                    'id': r.document.id,
                    'content': r.document.content[:100] + '...',
                    'relevance_score': r.score
                }
                for r in retrieved
            ],
            'assembled_context': assembled_context,
            'generated_answer': answer,
            'context_size_chars': len(assembled_context),
        }


# 使用示例
if __name__ == "__main__":
    # 可直接运行的最小示例
    rag = RAGPipeline()

    # 添加示例文档
    documents = [
        {
            'id': 'doc1',
            'content': 'Python是一种高级编程语言,以其简洁易读的语法而闻名。Python广泛用于数据科学、机器学习和Web开发。',
            'metadata': {'source': 'wiki'}
        },
        {
            'id': 'doc2',
            'content': '机器学习是人工智能的一个分支,它使计算机能够从数据中学习而无需显式编程。常见的算法包括决策树、随机森林和神经网络。',
            'metadata': {'source': 'wiki'}
        },
        {
            'id': 'doc3',
            'content': '向量数据库是一种优化的数据库系统,专门用于存储和检索高维向量数据。它们在语义搜索和RAG系统中至关重要。',
            'metadata': {'source': 'tech_blog'}
        },
    ]

    rag.add_documents(documents)

    # 执行查询
    query = "Python有什么用途?"
    result = rag.query(query, top_k=2)

    print("=" * 50)
    print(f"查询: {result['query']}")
    print("\n检索到的文档:")
    for doc in result['retrieved_documents']:
        print(f"  - {doc['id']}: 相关度 {doc['relevance_score']:.2f}")
    print(f"\nContext大小: {result['context_size_chars']} 字符")
    print(f"\n生成的答案:\n{result['generated_answer']}")

14.7.2 上下文压缩与摘要生成

从本节开始,代码重点展示实现模式与接口边界。若接入真实模型服务,请按所选供应商的 SDK、鉴权方式和错误处理规范替换调用层。

14.7.3 动态上下文选择

14.7.4 简单的记忆系统实现

这些代码示例展示了上下文工程中的核心实现模式,可作为生产系统的基础模板。实际应用时,需要根据具体需求进行扩展和优化。

最后更新于