def smart_sampling_defense(context, max_samples=1000):
"""
基于内容风险等级的智能采样防御
"""
# 步骤1:快速内容分类(O(n))
risk_scores = []
for chunk in context:
# 使用轻量级特征匹配
risk_score = quick_risk_assessment(chunk)
risk_scores.append(risk_score)
# 步骤2:优先级采样
# 高风险内容:100%检查
# 中风险内容:50%采样检查
# 低风险内容:10%采样检查
high_risk_indices = [i for i, s in enumerate(risk_scores) if s > 0.7]
mid_risk_indices = [i for i, s in enumerate(risk_scores) if 0.4 <= s <= 0.7]
low_risk_indices = [i for i, s in enumerate(risk_scores) if s < 0.4]
# 采样的块
sampled_indices = set(high_risk_indices)
sampled_indices.update(random.sample(mid_risk_indices, len(mid_risk_indices) // 2))
sampled_indices.update(random.sample(low_risk_indices, min(len(low_risk_indices) // 10, max_samples)))
# 步骤3:深度检查采样的块
threat_results = []
for idx in sampled_indices:
detailed_check = detailed_content_audit(context[idx])
if detailed_check["threat_detected"]:
threat_results.append({
"position": idx,
"threat": detailed_check
})
return threat_results
class StructuredContextManager:
"""
使用结构化方式管理长上下文,便于选择性检查
"""
def __init__(self):
self.context_blocks = []
self.metadata = []
def add_content(self, content, source_type, source_id, trust_level):
"""
添加内容块,并标记其元数据
"""
block = {
"content": content,
"source_type": source_type, # "system", "user", "retrieved", "history"
"source_id": source_id,
"trust_level": trust_level, # "trusted", "untrusted", "mixed"
"added_at": datetime.now()
}
self.context_blocks.append(block)
def get_audit_plan(self):
"""
根据源类型和信任等级生成审核计划
"""
audit_plan = []
for i, block in enumerate(self.context_blocks):
# 不同来源的审核策略
if block["source_type"] == "system":
# 系统提示通常可信,但需要定期验证
audit_plan.append({"index": i, "action": "sample_check", "rate": 0.1})
elif block["source_type"] == "user":
# 用户直接输入,高风险
audit_plan.append({"index": i, "action": "full_check", "rate": 1.0})
elif block["source_type"] == "retrieved":
# 检索结果,信任等级取决于来源可信度
if block["trust_level"] == "trusted":
audit_plan.append({"index": i, "action": "sample_check", "rate": 0.2})
else:
audit_plan.append({"index": i, "action": "full_check", "rate": 1.0})
elif block["source_type"] == "history":
# 历史对话,信任等级取决于是否被审核过
audit_plan.append({"index": i, "action": "sample_check", "rate": 0.3})
return audit_plan
def execute_audit(self):
"""
执行审核计划
"""
audit_plan = self.get_audit_plan()
threat_findings = []
for plan_item in audit_plan:
block = self.context_blocks[plan_item["index"]]
action = plan_item["action"]
rate = plan_item["rate"]
if action == "full_check":
result = detailed_security_audit(block["content"])
threat_findings.append((plan_item["index"], result))
elif action == "sample_check":
if random.random() < rate:
result = detailed_security_audit(block["content"])
threat_findings.append((plan_item["index"], result))
return threat_findings
def intelligent_context_compression(context, compression_ratio=0.5):
"""
使用摘要和去重来压缩长上下文,减少审核成本
"""
# 步骤1:检测重复和相似内容
unique_chunks = []
similarity_threshold = 0.9
for chunk in context:
is_duplicate = False
for unique_chunk in unique_chunks:
similarity = compute_semantic_similarity(chunk, unique_chunk)
if similarity > similarity_threshold:
is_duplicate = True
break
if not is_duplicate:
unique_chunks.append(chunk)
# 步骤2:对保留的块进行摘要
compressed_context = []
for chunk in unique_chunks:
if len(chunk) > 500: # 长块才摘要
summary = generate_summary(chunk)
compressed_context.append({
"type": "summary",
"original_length": len(chunk),
"summary": summary
})
else:
compressed_context.append({
"type": "original",
"content": chunk
})
# 步骤3:只审核压缩后的上下文
audit_results = security_audit(compressed_context)
return audit_results