14.7 可运行的代码示例与实现模式
14.7.1 完整的 RAG 管道实现
"""
minimal_rag_system.py
一个可直接运行的本地 RAG 最小示例。
依赖: Python 3.11+(仅标准库)
"""
from __future__ import annotations
from dataclasses import dataclass
from math import sqrt
import re
from typing import Dict, List
def tokenize(text: str) -> List[str]:
"""同时兼容英文单词和中文字符的简单分词。"""
raw_tokens = re.findall(r"[\u4e00-\u9fff]+|[A-Za-z0-9_]+", text.lower())
normalized_tokens: List[str] = []
for token in raw_tokens:
if re.fullmatch(r"[\u4e00-\u9fff]+", token):
normalized_tokens.extend(list(token))
else:
normalized_tokens.append(token)
return normalized_tokens
def term_frequency(text: str) -> Dict[str, float]:
"""构造简单的词频向量。"""
frequencies: Dict[str, float] = {}
for token in tokenize(text):
frequencies[token] = frequencies.get(token, 0.0) + 1.0
return frequencies
def cosine_similarity(left: Dict[str, float], right: Dict[str, float]) -> float:
"""计算两个稀疏词频向量的余弦相似度。"""
shared_terms = set(left) & set(right)
dot_product = sum(left[term] * right[term] for term in shared_terms)
left_norm = sqrt(sum(value * value for value in left.values()))
right_norm = sqrt(sum(value * value for value in right.values()))
if left_norm == 0 or right_norm == 0:
return 0.0
return dot_product / (left_norm * right_norm)
def split_sentences(text: str) -> List[str]:
"""按中英文句号做简单切句。"""
sentences = re.split(r"(?<=[。!?.!?])\s*", text.strip())
return [sentence.strip() for sentence in sentences if sentence.strip()]
@dataclass
class Document:
"""文档结构"""
id: str
content: str
metadata: Dict
@dataclass
class RetrievalResult:
"""检索结果"""
document: Document
score: float
class SimpleEmbeddingModel:
"""
极简嵌入模型包装。
这里直接使用词频向量,便于本地运行和理解检索原理。
实际生产可替换为领域嵌入模型或向量服务。
"""
def embed(self, text: str) -> Dict[str, float]:
"""获取文本的嵌入向量"""
return term_frequency(text)
class SimpleVectorStore:
"""
简化的向量存储。
实际应用中应使用带持久化和过滤能力的向量数据库。
"""
def __init__(self):
self.documents: List[Document] = []
self.vectors: List[Dict[str, float]] = []
self.embedding_model = SimpleEmbeddingModel()
def add_document(self, doc: Document) -> None:
"""添加文档到向量库"""
vector = self.embedding_model.embed(doc.content)
self.documents.append(doc)
self.vectors.append(vector)
def add_documents(self, docs: List[Document]) -> None:
"""批量添加文档"""
for doc in docs:
self.add_document(doc)
def retrieve(self, query: str, top_k: int = 3) -> List[RetrievalResult]:
"""检索与查询最相似的文档"""
if not self.vectors:
return []
# 获取查询的嵌入
query_vector = self.embedding_model.embed(query)
# 计算相似度
similarities = [
cosine_similarity(query_vector, document_vector)
for document_vector in self.vectors
]
# 获取 top-k 最相似的文档
top_indices = sorted(
range(len(similarities)),
key=lambda index: similarities[index],
reverse=True
)[:top_k]
results = [
RetrievalResult(
document=self.documents[i],
score=float(similarities[i])
)
for i in top_indices
if similarities[i] > 0.0 # 过滤负相似度
]
return results
class ContextAssembler:
"""
上下文组装器。
将检索到的文档组织成提示词中的上下文。
"""
def __init__(self, max_context_chars: int = 1200):
self.max_context_chars = max_context_chars
def assemble_context(
self,
retrieved_results: List[RetrievalResult],
user_query: str
) -> str:
"""组装最终的上下文提示词"""
current_chars = len(user_query)
context_parts = []
# 按相似度从高到低添加文档
for result in retrieved_results:
doc_chars = len(result.document.content)
# 检查是否超过上下文预算
if current_chars + doc_chars > self.max_context_chars:
break
context_parts.append(
f"【文档 {result.document.id}】(相关度: {result.score:.2f})\n"
f"{result.document.content}\n"
)
current_chars += doc_chars
return f"""你是一个有帮助的问答助手。请只依据参考文档回答,并在结论后附上来源。
【参考文档】
{chr(10).join(context_parts)}
【用户问题】
{user_query}
请基于上述参考文档回答问题,并在答案中使用 [来源: 文档ID] 标注依据。"""
class LocalAnswerGenerator:
"""基于检索结果生成带来源标注的本地答案。"""
def __init__(self, max_sentences: int = 2):
self.max_sentences = max_sentences
def generate(self, query: str, retrieved_results: List[RetrievalResult]) -> str:
if not retrieved_results:
return "未在参考文档中找到相关信息。"
query_terms = set(tokenize(query))
ranked_sentences = []
for result in retrieved_results:
for sentence in split_sentences(result.document.content):
sentence_terms = set(tokenize(sentence))
overlap = len(query_terms & sentence_terms)
if overlap > 0:
ranked_sentences.append((overlap, result.score, sentence, result.document.id))
if not ranked_sentences:
return "未在参考文档中找到足够依据回答该问题。"
ranked_sentences.sort(reverse=True)
selected_sentences: List[str] = []
selected_sources: List[str] = []
seen_sentences = set()
for _, _, sentence, source_id in ranked_sentences:
if sentence in seen_sentences:
continue
selected_sentences.append(sentence)
selected_sources.append(source_id)
seen_sentences.add(sentence)
if len(selected_sentences) >= self.max_sentences:
break
source_list = " ".join(f"[来源: {source_id}]" for source_id in dict.fromkeys(selected_sources))
return f"{';'.join(selected_sentences)} {source_list}".strip()
class RAGPipeline:
"""
完整的RAG管道。
管理从查询到生成的整个流程。
"""
def __init__(self):
self.vector_store = SimpleVectorStore()
self.context_assembler = ContextAssembler(max_context_chars=1200)
self.answer_generator = LocalAnswerGenerator(max_sentences=2)
def add_documents(self, documents: List[Dict]) -> None:
"""添加文档到系统"""
doc_objects = [
Document(
id=doc.get('id', str(i)),
content=doc['content'],
metadata=doc.get('metadata', {})
)
for i, doc in enumerate(documents)
]
self.vector_store.add_documents(doc_objects)
def retrieve(self, query: str, top_k: int = 3) -> List[RetrievalResult]:
"""检索相关文档"""
return self.vector_store.retrieve(query, top_k=top_k)
def query(self, user_query: str, top_k: int = 3) -> Dict:
"""执行完整的RAG查询"""
# 1. 检索相关文档
retrieved = self.retrieve(user_query, top_k=top_k)
# 2. 组装上下文
assembled_context = self.context_assembler.assemble_context(retrieved, user_query)
# 3. 基于检索结果生成带引用的答案
answer = self.answer_generator.generate(user_query, retrieved)
# 4. 返回结果(包含中间步骤用于调试)
return {
'query': user_query,
'retrieved_documents': [
{
'id': r.document.id,
'content': r.document.content[:100] + '...',
'relevance_score': r.score
}
for r in retrieved
],
'assembled_context': assembled_context,
'generated_answer': answer,
'context_size_chars': len(assembled_context),
}
# 使用示例
if __name__ == "__main__":
# 可直接运行的最小示例
rag = RAGPipeline()
# 添加示例文档
documents = [
{
'id': 'doc1',
'content': 'Python是一种高级编程语言,以其简洁易读的语法而闻名。Python广泛用于数据科学、机器学习和Web开发。',
'metadata': {'source': 'wiki'}
},
{
'id': 'doc2',
'content': '机器学习是人工智能的一个分支,它使计算机能够从数据中学习而无需显式编程。常见的算法包括决策树、随机森林和神经网络。',
'metadata': {'source': 'wiki'}
},
{
'id': 'doc3',
'content': '向量数据库是一种优化的数据库系统,专门用于存储和检索高维向量数据。它们在语义搜索和RAG系统中至关重要。',
'metadata': {'source': 'tech_blog'}
},
]
rag.add_documents(documents)
# 执行查询
query = "Python有什么用途?"
result = rag.query(query, top_k=2)
print("=" * 50)
print(f"查询: {result['query']}")
print("\n检索到的文档:")
for doc in result['retrieved_documents']:
print(f" - {doc['id']}: 相关度 {doc['relevance_score']:.2f}")
print(f"\nContext大小: {result['context_size_chars']} 字符")
print(f"\n生成的答案:\n{result['generated_answer']}")14.7.2 上下文压缩与摘要生成
14.7.3 动态上下文选择
14.7.4 简单的记忆系统实现
最后更新于
