class TrajectoryCollector:
def __init__(self):
self.current_trajectory: Optional[Trajectory] = None
self.storage = TrajectoryStorage()
def start_task(self, task_id: str, user_query: str):
self.current_trajectory = Trajectory(
task_id=task_id,
user_query=user_query,
start_time=datetime.now(),
end_time=None,
steps=[],
final_result=None,
success=None
)
def log_step(
self,
observation: str,
thought: str,
action_type: str,
action_name: str = None,
action_input: Dict = None,
action_output: Any = None,
**metadata
):
step = TrajectoryStep(
timestamp=datetime.now(),
step_id=len(self.current_trajectory.steps),
observation=observation,
context=metadata.get("context", {}),
thought=thought,
reasoning_type=metadata.get("reasoning_type", "cot"),
action_type=action_type,
action_name=action_name,
action_input=action_input,
action_output=action_output,
model=metadata.get("model", "unknown"),
tokens_used=metadata.get("tokens", 0),
latency_ms=metadata.get("latency", 0),
confidence=metadata.get("confidence")
)
self.current_trajectory.add_step(step)
def end_task(self, result: str, success: bool):
self.current_trajectory.end_time = datetime.now()
self.current_trajectory.final_result = result
self.current_trajectory.success = success
self.storage.save(self.current_trajectory)