7.1 模型抽象层设计
7.1.1 核心概念
7.1.2 设计权衡:多模型 vs 单模型绑定
7.1.3 配置管理系统
故障转移链路
Provider 接口设计
具体实现:Claude Provider
模型选择引擎
配置文件管理
模型抽象层架构图
总结
最后更新于
from enum import Enum
from dataclasses import dataclass
from typing import Optional, List
class ModelProviderType(Enum):
CLAUDE = "claude"
OPENAI = "openai"
DEEPSEEK = "deepseek"
GEMINI = "gemini"
OLLAMA = "ollama"
@dataclass
class ModelConfig:
"""模型配置对象"""
provider: ModelProviderType
model_id: str
api_key: Optional[str] = None
api_endpoint: Optional[str] = None
timeout: int = 30
max_tokens: int = 4096
temperature: float = 0.7
@dataclass
class ModelSelectionPolicy:
"""模型选择策略"""
primary: ModelConfig
fallback_chain: List[ModelConfig] = None
cost_threshold: float = None # 成本上限,超过则切换
latency_threshold: int = None # 延迟上限,超过则切换import time
class CircuitBreaker:
"""熔断器,跟踪模型健康状态"""
def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.state = "closed" # closed, open, half-open
self.last_failure_time = None
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = "open"
self.last_failure_time = time.time()
def is_available(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
return True
return False
return True # half-open 允许尝试from typing import Protocol, List, Any, Optional
class Message:
def __init__(self, role: str, content: str):
self.role = role # "user", "assistant"
self.content = content
class ProviderResponse:
def __init__(self, content: str, tokens_used: int, model: str):
self.content = content
self.tokens_used = tokens_used
self.model = model
class ModelProvider(Protocol):
"""LLM 供应商的统一接口"""
def complete(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: int = 4096,
) -> ProviderResponse:
"""完整的模型调用(无流式)"""
...
def stream(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: int = 4096,
):
"""流式模型调用,逐块返回"""
...
def estimate_tokens(self, text: str) -> int:
"""估算文本的 token 数"""
...
def validate_config(self) -> bool:
"""验证配置有效性(API 密钥等)"""
...import anthropic
from typing import Generator
class ClaudeProvider:
def __init__(self, config: ModelConfig):
self.config = config
self.client = anthropic.Anthropic(api_key=config.api_key)
def complete(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: int = 4096,
) -> ProviderResponse:
"""Claude 完整调用"""
api_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
response = self.client.messages.create(
model=self.config.model_id,
max_tokens=max_tokens,
temperature=temperature,
messages=api_messages,
)
return ProviderResponse(
content=response.content[0].text,
tokens_used=response.usage.output_tokens
+ response.usage.input_tokens,
model=self.config.model_id,
)
def stream(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: int = 4096,
) -> Generator[str, None, None]:
"""Claude 流式调用"""
api_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
with self.client.messages.stream(
model=self.config.model_id,
max_tokens=max_tokens,
temperature=temperature,
messages=api_messages,
) as stream:
for text in stream.text_stream:
yield text
def estimate_tokens(self, text: str) -> int:
"""Claude token 计数"""
response = self.client.messages.count_tokens(messages=[
{"role": "user", "content": text}
])
return response.input_tokens
def validate_config(self) -> bool:
try:
self.estimate_tokens("test")
return True
except:
return Falseclass ModelSelectionEngine:
def __init__(self, policy: ModelSelectionPolicy):
self.policy = policy
self.breakers = {}
self._init_breakers()
def _init_breakers(self):
for config in [self.policy.primary] + (
self.policy.fallback_chain or []
):
self.breakers[config.model_id] = CircuitBreaker()
def select_model(self) -> ModelProvider:
"""根据健康状态选择可用模型"""
candidates = [self.policy.primary] + (
self.policy.fallback_chain or []
)
for config in candidates:
breaker = self.breakers[config.model_id]
if breaker.is_available():
return self._create_provider(config)
raise Exception("所有模型不可用")
def mark_failure(self, model_id: str):
"""记录模型故障"""
if model_id in self.breakers:
self.breakers[model_id].record_failure()
def mark_success(self, model_id: str):
"""记录模型成功"""
if model_id in self.breakers:
self.breakers[model_id].record_success()
def _create_provider(self, config: ModelConfig):
if config.provider == ModelProviderType.CLAUDE:
return ClaudeProvider(config)
# ... 其他 provider 实现{
"model_selection": {
"primary": {
"provider": "claude",
"model_id": "claude-sonnet-4-6",
"timeout": 30,
"max_tokens": 4096
},
"fallback_chain": [
{
"provider": "openai",
"model_id": "gpt-5.4",
"timeout": 30,
"max_tokens": 4096
},
{
"provider": "deepseek",
"model_id": "deepseek-chat",
"timeout": 30,
"max_tokens": 4096
}
],
"cost_threshold": 0.05,
"latency_threshold": 5000
}
}