复制 from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json
from threading import RLock
@dataclass
class ScratchpadEntry:
"""Scratchpad中的一个条目"""
key: str
value: Any
created_at: datetime
updated_at: datetime
writer_id: str # 最后的写者ID
version: int = 0
read_only: bool = False
class Scratchpad:
"""跨Worker的共享内存"""
def __init__(self):
self._data: Dict[str, ScratchpadEntry] = {}
self._lock = RLock()
self._access_log: list = []
def put(self, key: str, value: Any, writer_id: str, read_only: bool = False) -> None:
"""写入值"""
with self._lock:
now = datetime.now()
new_version = 0
created_at = now
if key in self._data:
if self._data[key].read_only:
raise ValueError(f"Key {key} is read-only")
new_version = self._data[key].version + 1
created_at = self._data[key].created_at
entry = ScratchpadEntry(
key=key,
value=value,
created_at=created_at,
updated_at=now,
writer_id=writer_id,
version=new_version,
read_only=read_only,
)
self._data[key] = entry
self._access_log.append({
"timestamp": now,
"operation": "put",
"key": key,
"writer_id": writer_id,
})
def get(self, key: str, reader_id: str, default: Optional[Any] = None) -> Any:
"""读取值"""
with self._lock:
if key not in self._data:
return default
entry = self._data[key]
self._access_log.append({
"timestamp": datetime.now(),
"operation": "get",
"key": key,
"reader_id": reader_id,
"version": entry.version,
})
return entry.value
def get_all(self, reader_id: str) -> Dict[str, Any]:
"""获取所有键值对"""
with self._lock:
self._access_log.append({
"timestamp": datetime.now(),
"operation": "get_all",
"reader_id": reader_id,
})
return {entry.key: entry.value for entry in self._data.values()}
def delete(self, key: str, writer_id: str) -> bool:
"""删除键"""
with self._lock:
if key not in self._data:
return False
if self._data[key].read_only:
raise ValueError(f"Key {key} is read-only")
del self._data[key]
self._access_log.append({
"timestamp": datetime.now(),
"operation": "delete",
"key": key,
"writer_id": writer_id,
})
return True
def batch_get(self, keys: list, reader_id: str) -> Dict[str, Any]:
"""批量获取"""
with self._lock:
result = {}
for key in keys:
if key in self._data:
result[key] = self._data[key].value
self._access_log.append({
"timestamp": datetime.now(),
"operation": "batch_get",
"keys": keys,
"reader_id": reader_id,
"found": len(result),
})
return result
def exists(self, key: str) -> bool:
"""检查键是否存在"""
with self._lock:
return key in self._data
def get_version(self, key: str) -> int:
"""获取键的版本号"""
with self._lock:
return self._data[key].version if key in self._data else -1
def watch(self, key: str, callback, reader_id: str) -> None:
"""监视键的变化(简化实现)"""
# 这是一个简化版本,实际实现应该使用观察者模式
pass
def get_access_log(self, limit: int = 100) -> list:
"""获取访问日志"""
with self._lock:
return self._access_log[-limit:]
def clear(self) -> None:
"""清空所有数据"""
with self._lock:
self._data.clear()
self._access_log.clear()
# 使用示例:多个Worker共享数据
class Worker:
"""Worker进程"""
def __init__(self, worker_id: str, scratchpad: Scratchpad):
self.worker_id = worker_id
self.scratchpad = scratchpad
def research_phase(self):
"""Research阶段"""
print(f"[{self.worker_id}] 执行Research阶段")
findings = {
"key_insights": ["洞察1", "洞察2"],
"requirements": ["需求1", "需求2"],
}
self.scratchpad.put("research_findings", findings, self.worker_id)
print(f"[{self.worker_id}] Research完成,写入Scratchpad")
def synthesis_phase(self):
"""Synthesis阶段"""
print(f"[{self.worker_id}] 执行Synthesis阶段")
# 读取Research的输出
findings = self.scratchpad.get("research_findings", self.worker_id)
print(f"[{self.worker_id}] 读取Research结果: {findings}")
plan = {
"steps": ["步骤1", "步骤2", "步骤3"],
"resources": ["资源1", "资源2"],
}
self.scratchpad.put("execution_plan", plan, self.worker_id)
print(f"[{self.worker_id}] Synthesis完成,写入Scratchpad")
def implementation_phase(self):
"""Implementation阶段"""
print(f"[{self.worker_id}] 执行Implementation阶段")
# 读取Synthesis的输出
plan = self.scratchpad.get("execution_plan", self.worker_id)
print(f"[{self.worker_id}] 读取执行计划: {plan}")
# 执行并写入结果
results = {"artifacts": ["工件1", "工件2"]}
self.scratchpad.put("implementation_results", results, self.worker_id)
def run(self):
"""执行所有阶段"""
self.research_phase()
self.synthesis_phase()
self.implementation_phase()
if __name__ == "__main__":
# 创建共享Scratchpad
scratchpad = Scratchpad()
# 创建Workers
worker1 = Worker("Worker-1", scratchpad)
worker2 = Worker("Worker-2", scratchpad)
# 执行(演示单线程执行,实际应为多线程)
worker1.run()
print("\n=== 最终Scratchpad内容 ===")
all_data = scratchpad.get_all("demo")
for key, value in all_data.items():
print(f"{key}: {value}")
print("\n=== 访问日志 ===")
logs = scratchpad.get_access_log()
for log in logs:
print(log)