# 审批工作流管理器
class ApprovalWorkflow:
"""管理审批请求的生命周期"""
def __init__(self):
self.requests: dict[str, ApprovalRequest] = {}
self.callbacks: dict[str, List[Callable]] = {}
def create_request(
self,
action_type: str,
action_params: dict,
requester_id: str,
required_approvals: int = 1,
timeout_minutes: int = 60
) -> ApprovalRequest:
"""创建新的审批请求"""
request_id = str(uuid.uuid4())
request = ApprovalRequest(
id=request_id,
action_type=action_type,
action_params=action_params,
requester_id=requester_id,
request_time=datetime.now(),
required_approvals=required_approvals,
approval_timeout_minutes=timeout_minutes
)
self.requests[request_id] = request
self._trigger_callbacks('on_request_created', request)
return request
def approve(self, request_id: str, approver_id: str, comments: str = "") -> bool:
"""批准请求"""
if request_id not in self.requests:
return False
request = self.requests[request_id]
if request.is_expired():
request.status = ApprovalStatus.EXPIRED
self._trigger_callbacks('on_request_expired', request)
return False
request.approvals.append({'approver_id': approver_id, 'time': datetime.now(), 'comments': comments})
if request.is_approved():
request.status = ApprovalStatus.APPROVED
self._trigger_callbacks('on_request_approved', request)
return True
def reject(self, request_id: str, reviewer_id: str, reason: str) -> bool:
"""拒绝请求"""
if request_id not in self.requests:
return False
request = self.requests[request_id]
request.status = ApprovalStatus.REJECTED
request.rejections.append({'reviewer_id': reviewer_id, 'time': datetime.now(), 'reason': reason})
self._trigger_callbacks('on_request_rejected', request)
return True
def request_clarification(self, request_id: str, clarifier_id: str, question: str) -> bool:
"""请求澄清"""
if request_id not in self.requests:
return False
request = self.requests[request_id]
request.status = ApprovalStatus.NEEDS_CLARIFICATION
request.notes = question
self._trigger_callbacks('on_clarification_requested', request)
return True
def get_request(self, request_id: str) -> Optional[ApprovalRequest]:
"""获取单个请求"""
return self.requests.get(request_id)
def get_pending_requests(self) -> List[ApprovalRequest]:
"""获取所有待审批的请求"""
return [r for r in self.requests.values() if r.status == ApprovalStatus.PENDING and not r.is_expired()]
def register_callback(self, event: str, callback: Callable):
"""为事件注册回调处理器"""
if event not in self.callbacks:
self.callbacks[event] = []
self.callbacks[event].append(callback)
def _trigger_callbacks(self, event: str, request: ApprovalRequest):
"""触发事件回调"""
if event in self.callbacks:
for callback in self.callbacks[event]:
callback(request)
# core/permission_interpreter.py
from anthropic import Anthropic
from dataclasses import dataclass
from typing import List
import json
@dataclass
class PermissionDeclaration:
"""权限声明的解释结果"""
user_input: str
interpreted_permissions: List[str]
confidence: float
explanation: str
class PermissionInterpreter:
"""自然语言权限解释器"""
def __init__(self):
self.client = Anthropic()
def interpret(self, user_declaration: str) -> PermissionDeclaration:
"""将用户的自然语言声明转换为权限列表"""
system_prompt = """You are a permission interpreter for AI agents.
Analyze the user's declaration and extract the implied permissions.
Return a JSON object with:
{
"permissions": ["permission1", "permission2"],
"confidence": 0.95,
"explanation": "The user is granting permission to X because Y"
}
Common patterns: "You can X" → grant X; "Don't X" → deny X; "Ask before Y" → conditional Y
"""
response = self.client.messages.create(
model="claude-sonnet-4-6",
max_tokens=500,
system=system_prompt,
messages=[{"role": "user", "content": f"Interpret: {user_declaration}"}]
)
result = json.loads(response.content[0].text)
return PermissionDeclaration(
user_input=user_declaration,
interpreted_permissions=result['permissions'],
confidence=result['confidence'],
explanation=result['explanation']
)
def ask_for_clarification(self, declaration: str, context: str) -> str:
"""在权限解释不清晰时请求澄清"""
response = self.client.messages.create(
model="claude-sonnet-4-6",
max_tokens=300,
messages=[{"role": "user", "content": f"""User said: "{declaration}"\nContext: {context}\nSuggest clarifying questions."""}]
)
return response.content[0].text
# 权限解释器使用示例
if __name__ == "__main__":
interpreter = PermissionInterpreter()
# 简单授权:删除缓存文件
decl1 = interpreter.interpret("You can delete old cache files")
print(f"Permissions: {decl1.interpreted_permissions}")
print(f"Confidence: {decl1.confidence}")
# 条件授权:数据库迁移但要求在删除表前确认
decl2 = interpreter.interpret("Go ahead with migration, but confirm before dropping tables")
print(f"Permissions: {decl2.interpreted_permissions}")
# 请求澄清:对于含糊的权限声明
clarification = interpreter.ask_for_clarification(
"You can handle my emails",
"Agent considering: send, delete, move"
)
print(f"Clarification questions:\n{clarification}")
# core/ask_first_permission.py
from enum import Enum
from typing import Optional
from datetime import datetime, timedelta
class PermissionLevel(Enum):
"""6 级信任模型"""
MANUAL_ONLY = "Manual-only"
APPROVE_ALWAYS = "Approve-always"
APPROVE_ONCE = "Approve-once"
ASK_FIRST = "Ask-first"
AUTO_WITH_NOTIFICATION = "Auto-with-notification"
FULL_TRUST = "Full-trust"
class AskFirstManager:
"""Ask-First 权限管理器"""
def __init__(self):
self.user_decisions = {} # {user_id: {action_type: (decision, timestamp)}}
def should_ask(
self,
user_id: str,
action_type: str,
permission_level: PermissionLevel,
context_changed: bool = False
) -> bool:
"""判断是否需要询问用户"""
if permission_level == PermissionLevel.FULL_TRUST:
return False
if permission_level in (PermissionLevel.MANUAL_ONLY, PermissionLevel.APPROVE_ALWAYS, PermissionLevel.APPROVE_ONCE):
return True
if permission_level == PermissionLevel.ASK_FIRST:
# 检查是否已有用户决策
if user_id in self.user_decisions:
action_key = self._normalize_action(action_type)
if action_key in self.user_decisions[user_id]:
decision, timestamp = self.user_decisions[user_id][action_key]
# 24 小时后或上下文变化时重新询问
if context_changed or (datetime.now() - timestamp > timedelta(hours=24)):
return True
return not decision
return True # 首次询问
if permission_level == PermissionLevel.AUTO_WITH_NOTIFICATION:
return False
return False
def record_decision(
self,
user_id: str,
action_type: str,
approved: bool,
remember_for_hours: int = 24
):
"""记录用户决定以便下次记忆"""
if user_id not in self.user_decisions:
self.user_decisions[user_id] = {}
action_key = self._normalize_action(action_type)
self.user_decisions[user_id][action_key] = (approved, datetime.now())
def clear_decisions(self, user_id: str):
"""清除用户的决定缓存"""
if user_id in self.user_decisions:
del self.user_decisions[user_id]
@staticmethod
def _normalize_action(action_type: str) -> str:
"""规范化操作类型为小写"""
return action_type.lower().strip()
# Ask-First 对话生成和解析
class AskFirstDialog:
"""Ask-First 对话生成和响应解析"""
@staticmethod
def generate_prompt(action_type: str, action_description: str, impact: str) -> str:
"""生成询问用户的提示"""
return f"""The AI agent wants to perform the following action:
Action: {action_type}
Description: {action_description}
Impact: {impact}
Should I proceed? Please reply with:
- "yes" or "approve" to allow this action
- "no" or "deny" to block this action
- "ask next time" if you want to be asked again next time
- Or provide modified parameters"""
@staticmethod
def parse_response(user_response: str) -> dict:
"""解析用户的响应"""
response_lower = user_response.lower().strip()
if response_lower in ('yes', 'approve', 'ok', 'proceed'):
return {'approved': True, 'remember': True}
elif response_lower in ('no', 'deny', 'block', 'cancel'):
return {'approved': False, 'remember': True}
elif 'ask next time' in response_lower or 'ask again' in response_lower:
return {'approved': None, 'remember': False}
else:
return {'approved': None, 'modified_params': user_response}
# core/user_feedback.py
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
from typing import List, Optional
import uuid
class FeedbackType(Enum):
"""反馈类型"""
POSITIVE = "positive"
NEGATIVE = "negative"
CORRECTION = "correction"
PARTIAL = "partial"
IRRELEVANT = "irrelevant"
@dataclass
class UserFeedback:
"""用户反馈对象"""
feedback_id: str
action_id: str
feedback_type: FeedbackType
rating: int # 1-5
comment: str
user_id: str
timestamp: datetime
category: Optional[str] = None
class FeedbackCollector:
"""反馈采集器"""
def __init__(self):
self.feedbacks: List[UserFeedback] = []
def collect(
self,
action_id: str,
feedback_type: FeedbackType,
rating: int,
comment: str,
user_id: str,
category: str = None
) -> UserFeedback:
"""采集单条反馈"""
feedback = UserFeedback(
feedback_id=str(uuid.uuid4()),
action_id=action_id,
feedback_type=feedback_type,
rating=rating,
comment=comment,
user_id=user_id,
timestamp=datetime.now(),
category=category
)
self.feedbacks.append(feedback)
return feedback
def get_action_feedback(self, action_id: str) -> List[UserFeedback]:
"""获取特定操作的所有反馈"""
return [f for f in self.feedbacks if f.action_id == action_id]
def get_stats(self) -> dict:
"""获取反馈的聚合统计"""
total = len(self.feedbacks)
if total == 0:
return {'total': 0}
types = {}
for feedback in self.feedbacks:
types[feedback.feedback_type.value] = types.get(feedback.feedback_type.value, 0) + 1
avg_rating = sum(f.rating for f in self.feedbacks) / total
return {
'total': total,
'types': types,
'average_rating': avg_rating,
'negative_percentage': types.get('negative', 0) / total * 100
}
def get_improvement_areas(self) -> List[str]:
"""识别需要改进的领域(按负面反馈频率排序)"""
negative_feedbacks = [f for f in self.feedbacks if f.feedback_type == FeedbackType.NEGATIVE]
categories = {}
for feedback in negative_feedbacks:
category = feedback.category or 'general'
categories[category] = categories.get(category, 0) + 1
return sorted(categories.items(), key=lambda x: x[1], reverse=True)