import re
from enum import Enum
from typing import List, Tuple, Dict
class PIICategory(Enum):
"""PII类别枚举"""
NAME = "name"
EMAIL = "email"
PHONE = "phone"
ID_NUMBER = "id_number"
CREDIT_CARD = "credit_card"
ADDRESS = "address"
BANK_ACCOUNT = "bank_account"
MEDICAL_RECORD = "medical_record"
class ContextPIIPipeline:
"""集成到 RAG管道中的 PII检测与脱敏"""
def __init__(self, detection_mode: str = "strict"):
"""
detection_mode:
- "strict": 高敏感度,假阳性较多但保护更好
- "balanced": 均衡模式
- "relaxed": 低敏感度,但可能遗漏 PII
"""
self.detection_mode = detection_mode
self.pii_patterns = self._build_patterns()
self.pii_log = [] # 审计日志
def _build_patterns(self) -> Dict[PIICategory, List[str]]:
"""构建 PII检测的正则模式库"""
patterns = {
PIICategory.EMAIL: [
r'[\w\.-]+@[\w\.-]+\.\w+',
],
PIICategory.PHONE: [
r'\+?1?\d{9,15}',
r'(\d{3}[-.\s]?)(\d{3}[-.\s]?)(\d{4})', # XXX-XXX-XXXX
],
PIICategory.ID_NUMBER: [
r'\b\d{6,18}\b', # 身份证、学号
],
PIICategory.CREDIT_CARD: [
r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
],
PIICategory.ADDRESS: [
r'([0-9]{1,5})\s([A-Za-z]+)\s([A-Za-z]+)\s([A-Za-z]{2})\s([0-9]{5})', # 美国地址
],
PIICategory.BANK_ACCOUNT: [
r'(账户|账号|银行账户)[\s]*[::][\s]*[\d]{10,20}',
],
PIICategory.MEDICAL_RECORD: [
r'(病历|诊断|医嘱)[\s]*[::][\s]*[^\n]+',
]
}
return patterns
def detect_pii_in_query(self, query: str) -> List[Tuple[PIICategory, List[str]]]:
"""检测用户查询中的 PII"""
findings = []
for category, pattern_list in self.pii_patterns.items():
for pattern in pattern_list:
matches = re.findall(pattern, query, re.IGNORECASE)
if matches:
# 扁平化结果(有些正则返回元组)
flat_matches = []
for match in matches:
if isinstance(match, tuple):
flat_matches.append(''.join(match))
else:
flat_matches.append(match)
findings.append((category, flat_matches))
return findings
def filter_retrieval_documents(
self,
query: str,
documents: List[Dict],
user_role: str = "standard" # "admin", "medical_staff", "standard"
) -> Tuple[List[Dict], Dict]:
"""
在检索结果阶段过滤 PII。
不同用户角色有不同的 PII访问权限。
"""
# 检测查询中的 PII
query_pii = self.detect_pii_in_query(query)
# 根据用户角色确定 PII可访问性
pii_access_matrix = {
"admin": {PIICategory.NAME, PIICategory.EMAIL, PIICategory.PHONE, PIICategory.ID_NUMBER},
"medical_staff": {PIICategory.MEDICAL_RECORD, PIICategory.NAME},
"standard": set(), # 标准用户无法看到任何 PII
}
allowed_categories = pii_access_matrix.get(user_role, set())
filtered_documents = []
filtering_log = {
'query_pii_found': len(query_pii) > 0,
'pii_categories': [cat.value for cat, _ in query_pii],
'docs_filtered': 0,
'docs_retained': 0,
}
for doc in documents:
# 检测文档中的 PII
doc_pii = self.detect_pii_in_query(doc.get('content', ''))
# 检查是否有用户无权访问的 PII
unauthorized_pii = [cat for cat, _ in doc_pii if cat not in allowed_categories]
if unauthorized_pii:
# 如果文档包含用户无权访问的 PII,执行脱敏或拒绝
if self.detection_mode == "strict":
filtering_log['docs_filtered'] += 1
continue # 完全拒绝
else:
# 对文档进行脱敏
redacted_doc = self._redact_document(doc, unauthorized_pii)
filtered_documents.append(redacted_doc)
filtering_log['docs_retained'] += 1
else:
filtered_documents.append(doc)
filtering_log['docs_retained'] += 1
# 审计日志
self.pii_log.append({
'timestamp': datetime.utcnow().isoformat(),
'query': query,
'user_role': user_role,
'filtering_result': filtering_log,
})
return filtered_documents, filtering_log
def sanitize_response(self, response: str, user_role: str) -> str:
"""对最终响应进行脱敏"""
pii_access = {
"admin": {PIICategory.NAME, PIICategory.EMAIL, PIICategory.PHONE, PIICategory.ID_NUMBER},
"medical_staff": {PIICategory.MEDICAL_RECORD, PIICategory.NAME},
"standard": set(),
}
allowed_categories = pii_access.get(user_role, set())
redacted_response = response
for category in PIICategory:
if category not in allowed_categories:
for pattern in self.pii_patterns.get(category, []):
replacement = f"[{category.value.upper()}已隐藏]"
redacted_response = re.sub(pattern, replacement, redacted_response, flags=re.IGNORECASE)
return redacted_response
def _redact_document(self, doc: Dict, categories: List[PIICategory]) -> Dict:
"""对文档中的特定 PII类别进行脱敏"""
redacted_doc = doc.copy()
content = redacted_doc.get('content', '')
for category in categories:
for pattern in self.pii_patterns.get(category, []):
replacement = f"[{category.value.upper()}已隐藏]"
content = re.sub(pattern, replacement, content, flags=re.IGNORECASE)
redacted_doc['content'] = content
redacted_doc['redaction_applied'] = True
redacted_doc['redacted_categories'] = [cat.value for cat in categories]
return redacted_doc
# 使用示例
pipeline = ContextPIIPipeline(detection_mode="strict")
# 检测查询中的 PII
user_query = "用户张三的邮箱是 zhangsan@example.com,电话是+86-138-1234-5678"
pii_findings = pipeline.detect_pii_in_query(user_query)
print(f"发现 PII: {pii_findings}")
# 过滤检索结果
documents = [
{'id': '1', 'content': '患者张三的诊断结果是...'},
{'id': '2', 'content': '通用的医疗知识...'},
]
filtered_docs, log = pipeline.filter_retrieval_documents(user_query, documents, user_role="standard")
print(f"过滤结果: {log}")
# 脱敏最终响应
response = "根据信息,用户张三的账户..."
sanitized = pipeline.sanitize_response(response, user_role="standard")