# core/error_as_observation.py
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class ErrorObservationType(Enum):
"""错误类型分类"""
TOOL_ERROR = "tool_error"
TIMEOUT = "timeout"
RATE_LIMIT = "rate_limit"
VALIDATION_ERROR = "validation_error"
NETWORK_ERROR = "network_error"
@dataclass
class ErrorObservation:
"""错误观察对象"""
type: ErrorObservationType
source: str
message: str
error_code: str
timestamp: datetime
severity: int # 1-5
recoverable: bool
suggested_action: str
retry_after: int = 0
class ErrorAsObservationHandler:
"""错误处理器:将异常转化为观察"""
def __init__(self):
self.observations = []
async def handle_tool_error(
self,
tool_name: str,
error: Exception,
context: dict
) -> ErrorObservation:
"""将工具错误转化为观察(而不是抛异常)"""
observation = self._classify_error(tool_name, error, context)
self.observations.append(observation)
return observation
def _classify_error(
self,
tool_name: str,
error: Exception,
context: dict
) -> ErrorObservation:
"""根据异常类型分类错误"""
error_type = type(error).__name__
if 'timeout' in error_type.lower():
return ErrorObservation(
type=ErrorObservationType.TIMEOUT,
source=tool_name,
message=str(error),
error_code='TIMEOUT',
timestamp=datetime.now(),
severity=3,
recoverable=True,
suggested_action='retry',
retry_after=5
)
elif 'rate' in error_type.lower():
return ErrorObservation(
type=ErrorObservationType.RATE_LIMIT,
source=tool_name,
message=f"Rate limit exceeded for {tool_name}",
error_code='RATE_LIMIT',
timestamp=datetime.now(),
severity=2,
recoverable=True,
suggested_action='backoff_and_retry',
retry_after=60
)
else:
return ErrorObservation(
type=ErrorObservationType.TOOL_ERROR,
source=tool_name,
message=str(error),
error_code=error_type,
timestamp=datetime.now(),
severity=4,
recoverable=False,
suggested_action='report_to_user'
)
async def inject_observations(self, agent_context: dict) -> dict:
"""将观察注入到 Agent 上下文"""
observations_list = [
{
'type': obs.type.value,
'source': obs.source,
'message': obs.message,
'recoverable': obs.recoverable,
'suggested_action': obs.suggested_action,
'retry_after': obs.retry_after
}
for obs in self.observations
]
agent_context['observations'] = agent_context.get('observations', []) + observations_list
return agent_context
def get_summary(self) -> dict:
"""获取错误观察摘要"""
by_type = {}
for obs in self.observations:
obs_type = obs.type.value
by_type[obs_type] = by_type.get(obs_type, 0) + 1
return {
'total_observations': len(self.observations),
'by_type': by_type,
'unrecoverable_count': sum(1 for o in self.observations if not o.recoverable)
}