class AutoCompactor:
"""自动压缩器:当令牌使用超过阈值时触发压缩"""
def __init__(self, compression_threshold: float = 0.8):
"""
compression_threshold: 触发压缩的百分比
例如 0.8 表示当使用到总预算的 80% 时触发
"""
self.compression_threshold = compression_threshold
self.token_counter = TokenCounter()
def should_compact(self, budget_mgr: TokenBudgetManager,
state: AgentState) -> bool:
"""判断是否应该进行压缩"""
used_tokens = budget_mgr._estimate_current_usage(state)
threshold_tokens = budget_mgr.total_budget * self.compression_threshold
return used_tokens > threshold_tokens
def compact_messages(self, messages: List[Message],
target_token_count: int,
summarizer = None) -> List[Message]:
"""压缩消息历史以满足令牌目标"""
current_tokens = self.token_counter.count_messages_tokens(messages)
if current_tokens <= target_token_count:
return messages # 无需压缩
print(f"[Compacting] {current_tokens} tokens -> {target_token_count} tokens")
# 策略 1:移除早期消息
compacted = list(messages)
while compacted and self.token_counter.count_messages_tokens(compacted) > target_token_count:
# 移除最早的消息
removed = compacted.pop(0)
print(f" Removed: {removed.get_text()[:80]}...")
return compacted
def compact_with_summarization(
self,
messages: List[Message],
target_token_count: int,
summarizer
) -> List[Message]:
"""使用摘要进行压缩"""
# 首先尝试移除消息
compacted = self.compact_messages(messages, target_token_count)
# 如果还是太大,使用摘要压缩
if self.token_counter.count_messages_tokens(compacted) > target_token_count:
compacted = self._apply_summarization(compacted, summarizer, target_token_count)
return compacted
def _apply_summarization(self, messages: List[Message],
summarizer, target_tokens: int) -> List[Message]:
"""应用摘要压缩"""
result = []
for message in messages:
if message.role == "assistant" and len(message.get_text()) > 500:
# 摘要长响应
summary = summarizer(
message.get_text(),
max_length=int(len(message.get_text()) * 0.5)
)
summary_msg = Message.assistant([TextBlock(text=summary)])
result.append(summary_msg)
else:
result.append(message)
return result
# 自动压缩使用示例
compactor = AutoCompactor(compression_threshold=0.8)
budget_mgr = TokenBudgetManager()
async def agent_loop_with_auto_compaction(engine: QueryEngine,
state: AgentState):
"""智能体循环,带自动压缩"""
while True:
# 检查是否需要压缩
if compactor.should_compact(budget_mgr, state):
print(f"[Turn {state.current_turn}] Triggering auto-compaction...")
# 压缩到预算的 50%
target_tokens = budget_mgr.total_budget * 0.5
state.messages = compactor.compact_messages(
state.messages,
int(target_tokens)
)
print(f" Compacted to {budget_mgr._estimate_current_usage(state)} tokens")
# 继续推理
if not budget_mgr.can_continue_inference(state):
print(f"Cannot continue: insufficient token budget")
break
response = await engine.infer(state.messages)
state.add_message(response)
if not response.has_tool_calls():
break