from enum import Enum
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class PermissionDecision(Enum):
ALLOW = "allow"
DENY = "deny"
ASK_USER = "ask_user"
class PermissionDecisionEngine:
def __init__(self, policy_store: dict, audit_log: "AuditLog"):
self.policies = policy_store # tool_name -> ToolPermissionPolicy
self.audit_log = audit_log
self.user_approvals = {} # (user_id, tool_name) -> set of approved_params
async def decide(self,
tool_call: "ToolCall",
user_id: str,
context: "ExecutionContext") -> PermissionDecision:
"""
决策流程:
1. 参数验证
2. 权限等级评估
3. 用户历史记录查询
4. 最终决策
"""
# 步骤1: 参数Schema验证
if not self._validate_schema(tool_call):
logger.warning(f"Schema验证失败: {tool_call}")
self.audit_log.log("schema_validation_failed", tool_call, user_id)
return PermissionDecision.DENY
# 步骤2: 获取工具权限策略
policy = self.policies.get(tool_call.tool_name)
if not policy:
logger.error(f"未知工具: {tool_call.tool_name}")
return PermissionDecision.DENY
# 步骤3: 评估权限等级
permission_level = policy.evaluate(tool_call, context)
# 步骤4: 基于等级决策
if permission_level == PermissionLevel.DENY:
logger.warning(f"权限拒绝: {tool_call.tool_name}")
self.audit_log.log("permission_denied", tool_call, user_id)
return PermissionDecision.DENY
elif permission_level == PermissionLevel.OVERRIDE:
logger.info(f"管理员覆盖: {tool_call.tool_name}")
self.audit_log.log("override_allowed", tool_call, user_id)
return PermissionDecision.ALLOW
elif permission_level == PermissionLevel.FULL_TRUST:
logger.info(f"完全信任,自动批准: {tool_call.tool_name}")
self.audit_log.log("full_trust_approved", tool_call, user_id)
return PermissionDecision.ALLOW
elif permission_level == PermissionLevel.AUTO_WITH_NOTIFICATION:
logger.info(f"自动执行并通知: {tool_call.tool_name}")
self.audit_log.log("auto_with_notification", tool_call, user_id)
return PermissionDecision.ALLOW
elif permission_level == PermissionLevel.ASK_FIRST:
# 检查用户历史批准记录
approval_key = (user_id, tool_call.tool_name)
if self._is_approved_before(approval_key, tool_call.args):
logger.info(f"自动批准 (历史): {tool_call.tool_name}")
self.audit_log.log("ask_first_cached", tool_call, user_id)
return PermissionDecision.ALLOW
else:
logger.info(f"首次调用需询问: {tool_call.tool_name}")
return PermissionDecision.ASK_USER
else: # APPROVE_ONCE, APPROVE_ALWAYS, MANUAL_ONLY
return PermissionDecision.ASK_USER
def _validate_schema(self, tool_call: "ToolCall") -> bool:
"""验证工具调用的参数Schema"""
# 实现略
return True
def _is_approved_before(self, key: tuple, args: dict) -> bool:
"""检查用户是否曾批准过类似调用"""
# 简化实现:仅检查完全相同的调用
if key not in self.user_approvals:
return False
return str(args) in self.user_approvals[key]
def record_approval(self, user_id: str, tool_name: str, args: dict):
"""记录用户批准"""
key = (user_id, tool_name)
if key not in self.user_approvals:
self.user_approvals[key] = set()
self.user_approvals[key].add(str(args))