# core/action_streaming.py
from typing import AsyncGenerator, Callable
import json
class ActionStream:
"""Agent行动流"""
def __init__(self):
self.listeners: List[Callable] = []
self.history: List[Dict[str, Any]] = []
async def emit_action(
self,
action_type: str,
content: Dict[str, Any],
metadata: Dict[str, Any] = None
):
"""发出一个行动事件"""
event = {
"type": action_type,
"timestamp": datetime.now().isoformat(),
"content": content,
"metadata": metadata or {}
}
self.history.append(event)
# 通知所有监听者
for listener in self.listeners:
try:
if asyncio.iscoroutinefunction(listener):
await listener(event)
else:
listener(event)
except Exception as e:
print(f"Error notifying listener: {e}")
def subscribe(self, listener: Callable):
"""订阅行动事件"""
self.listeners.append(listener)
def unsubscribe(self, listener: Callable):
"""取消订阅"""
if listener in self.listeners:
self.listeners.remove(listener)
async def stream_sse(self) -> AsyncGenerator[str, None]:
"""以SSE格式流式发出事件"""
last_sent = 0
while True:
if len(self.history) > last_sent:
for event in self.history[last_sent:]:
yield f"data: {json.dumps(event)}\n\n"
last_sent = len(self.history)
await asyncio.sleep(0.1)
class AgentWithStreaming(ControlledAgent):
"""支持行动流的Agent"""
def __init__(self, model_client, checkpoint_manager: CheckpointManager):
super().__init__(model_client, checkpoint_manager)
self.action_stream = ActionStream()
async def _emit_thinking(self, reasoning: str):
"""发出思考事件"""
await self.action_stream.emit_action(
action_type="thinking",
content={"reasoning": reasoning},
metadata={"iteration": self.iteration}
)
async def _emit_tool_call(self, tool_name: str, tool_args: Dict[str, Any]):
"""发出工具调用事件"""
await self.action_stream.emit_action(
action_type="tool_call",
content={"tool": tool_name, "arguments": tool_args},
metadata={"iteration": self.iteration}
)
async def _emit_tool_result(self, tool_name: str, result: Any, duration_ms: float):
"""发出工具结果事件"""
await self.action_stream.emit_action(
action_type="tool_result",
content={"tool": tool_name, "result": str(result)},
metadata={"iteration": self.iteration, "duration_ms": duration_ms}
)
async def run(self) -> Any:
"""Agent主循环,支持行动流"""
while self.state == AgentState.RUNNING:
self.iteration += 1
# 安全点:检查暂停请求
if await self._check_pause_at_safe_point():
break
# LLM推理
reasoning = "Analyzing current task..."
await self._emit_thinking(reasoning)
# 工具调用
tool_name = "process_data"
tool_args = {"step": self.iteration}
await self._emit_tool_call(tool_name, tool_args)
# 工具执行(模拟)
import time
start = time.time()
result = f"Iteration {self.iteration} completed"
duration_ms = (time.time() - start) * 1000
await self._emit_tool_result(tool_name, result, duration_ms)
# 安全点:检查暂停
if await self._check_pause_at_safe_point():
break
# 模拟工作
await asyncio.sleep(0.2)
# 循环终止
if self.iteration >= 5:
self.state = AgentState.COMPLETED
await self.action_stream.emit_action(
action_type="completion",
content={"result": "Task completed"},
metadata={"total_iterations": self.iteration}
)
return {"status": self.state.value, "iterations": self.iteration}