from enum import Enum
from dataclasses import dataclass
class AgentPriority(Enum):
"""Agent优先级"""
CRITICAL = 10 # 关键任务,不能被限流
HIGH = 7 # 高优先级,优先分配资源
NORMAL = 5 # 普通优先级
LOW = 2 # 低优先级,可以被推迟
@dataclass
class AgentTokenQuota:
"""单个Agent的Token配额"""
agent_id: str
daily_limit: int # 每日总Token限额
hourly_limit: int # 每小时Token限额
concurrent_request_limit: int # 并发请求数限额
priority: AgentPriority # 优先级
burst_multiplier: float # 突发容限倍数(可临时超过hourly_limit)
class MultiAgentTokenCoordinator:
"""多Agent Token资源协调器"""
def __init__(self):
self.quotas: Dict[str, AgentTokenQuota] = {}
self.usage_tracking: Dict[str, Dict] = {}
self.request_queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
def register_agent(self, quota: AgentTokenQuota) -> None:
"""注册Agent及其配额"""
self.quotas[quota.agent_id] = quota
self.usage_tracking[quota.agent_id] = {
"today_tokens": 0,
"current_hour_tokens": 0,
"concurrent_requests": 0,
"last_hour_reset": datetime.now()
}
async def request_token_allocation(self, agent_id: str,
tokens_needed: int,
timeout: float = 5.0) -> Tuple[bool, str]:
"""请求Token分配,支持优先级队列"""
if agent_id not in self.quotas:
return False, f"Agent {agent_id} not registered"
quota = self.quotas[agent_id]
tracking = self.usage_tracking[agent_id]
# 检查并发限制
if tracking["concurrent_requests"] >= quota.concurrent_request_limit:
return False, "Concurrent request limit reached"
# 检查日限额(硬限制)
if tracking["today_tokens"] + tokens_needed > quota.daily_limit:
hours_until_reset = self._hours_until_midnight()
return False, f"Daily limit exceeded. Reset in {hours_until_reset}h"
# 检查小时限额(可突发)
self._reset_hourly_if_needed(agent_id)
current_hour_usage = tracking["current_hour_tokens"]
hourly_hard_limit = quota.hourly_limit * quota.burst_multiplier
if current_hour_usage + tokens_needed > hourly_hard_limit:
wait_time = self._calculate_wait_time(agent_id, tokens_needed)
return False, f"Hourly burst limit exceeded. Wait {wait_time:.1f}s"
# 所有检查通过,分配资源
tracking["today_tokens"] += tokens_needed
tracking["current_hour_tokens"] += tokens_needed
tracking["concurrent_requests"] += 1
return True, "Allocation successful"
async def release_token_allocation(self, agent_id: str) -> None:
"""释放Token分配(用于并发计数)"""
if agent_id in self.usage_tracking:
tracking = self.usage_tracking[agent_id]
tracking["concurrent_requests"] = max(0, tracking["concurrent_requests"] - 1)
def get_agent_quota_status(self, agent_id: str) -> Dict:
"""获取Agent的配额状态"""
if agent_id not in self.quotas:
return {}
quota = self.quotas[agent_id]
tracking = self.usage_tracking[agent_id]
today_key = datetime.now().date().isoformat()
current_hour = datetime.now().isoformat()[:13]
daily_remaining = quota.daily_limit - tracking["today_tokens"]
hourly_remaining = quota.hourly_limit - tracking["current_hour_tokens"]
return {
"agent_id": agent_id,
"priority": quota.priority.name,
"daily_quota": {
"limit": quota.daily_limit,
"used": tracking["today_tokens"],
"remaining": daily_remaining,
"utilization_percent": (tracking["today_tokens"] / quota.daily_limit * 100)
},
"hourly_quota": {
"limit": quota.hourly_limit,
"burst_limit": quota.hourly_limit * quota.burst_multiplier,
"used": tracking["current_hour_tokens"],
"remaining": hourly_remaining,
"utilization_percent": (tracking["current_hour_tokens"] / quota.hourly_limit * 100)
},
"concurrent": {
"limit": quota.concurrent_request_limit,
"current": tracking["concurrent_requests"],
"available": quota.concurrent_request_limit - tracking["concurrent_requests"]
}
}
def get_system_quota_summary(self) -> Dict:
"""获取整个系统的配额摘要"""
summary = {
"timestamp": datetime.now().isoformat(),
"total_daily_capacity": 0,
"total_daily_used": 0,
"agents": {}
}
for agent_id in self.quotas:
status = self.get_agent_quota_status(agent_id)
summary["agents"][agent_id] = status
summary["total_daily_capacity"] += self.quotas[agent_id].daily_limit
summary["total_daily_used"] += status["daily_quota"]["used"]
summary["total_daily_remaining"] = \
summary["total_daily_capacity"] - summary["total_daily_used"]
summary["system_utilization_percent"] = \
(summary["total_daily_used"] / summary["total_daily_capacity"] * 100) \
if summary["total_daily_capacity"] > 0 else 0
return summary
def _reset_hourly_if_needed(self, agent_id: str) -> None:
"""检查并重置小时计数器"""
tracking = self.usage_tracking[agent_id]
now = datetime.now()
if (now - tracking["last_hour_reset"]).total_seconds() > 3600:
tracking["current_hour_tokens"] = 0
tracking["last_hour_reset"] = now
def _hours_until_midnight(self) -> float:
"""计算距离午夜的小时数"""
now = datetime.now()
midnight = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0)
return (midnight - now).total_seconds() / 3600
def _calculate_wait_time(self, agent_id: str, tokens_needed: int) -> float:
"""计算Agent需要等待的时间(秒)"""
quota = self.quotas[agent_id]
tracking = self.usage_tracking[agent_id]
# 简化计算:假设均匀分布
current_hour_usage = tracking["current_hour_tokens"]
tokens_per_second = quota.hourly_limit / 3600
wait_seconds = (current_hour_usage + tokens_needed - quota.hourly_limit) / tokens_per_second
return max(0, wait_seconds)