# kb_indexer.py
# 知识库向量化与索引构建脚本
import json
import os
from qdrant_client import QdrantClient
from sentence_transformers import SentenceTransformer
from typing import List, Dict
class KnowledgeBaseIndexer:
def __init__(self, qdrant_url: str = "http://localhost:6333"):
self.client = QdrantClient(url=qdrant_url)
self.encoder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
self.collection_name = "customer_kb"
def create_collection(self):
"""创建向量索引集合"""
from qdrant_client.http import models
self.client.recreate_collection(
collection_name=self.collection_name,
vectors_config=models.VectorParams(
size=384,
distance=models.Distance.COSINE
)
)
print(f"✓ Collection '{self.collection_name}' created")
def load_kb_from_file(self, kb_file: str) -> List[Dict]:
"""从JSON文件加载知识库"""
with open(kb_file, 'r', encoding='utf-8') as f:
kb = json.load(f)
articles = []
for category, items in kb.items():
for item in items:
article = {
'title': item['title'],
'content': item['content'],
'category': category,
'language': item.get('language', 'en'),
'lastUpdated': item.get('lastUpdated', ''),
'tags': item.get('tags', []),
'priority': item.get('priority', 'normal')
}
articles.append(article)
return articles
def index_articles(self, articles: List[Dict]):
"""将文章向量化并索引"""
from qdrant_client.http import models
import uuid
points = []
for i, article in enumerate(articles):
# 将标题和内容合并后编码
text = f"{article['title']}\n{article['content']}"
vector = self.encoder.encode(text).tolist()
point = models.PointStruct(
id=i,
vector=vector,
payload={
'title': article['title'],
'content': article['content'],
'category': article['category'],
'language': article['language'],
'tags': article['tags'],
'priority': article['priority'],
'lastUpdated': article['lastUpdated']
}
)
points.append(point)
# 批量插入
batch_size = 100
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
self.client.upsert(
collection_name=self.collection_name,
points=batch
)
print(f"✓ {len(articles)} articles indexed")
def search(self, query: str, limit: int = 5, language: str = None) -> List[Dict]:
"""搜索知识库"""
query_vector = self.encoder.encode(query).tolist()
# 构建过滤条件(如果指定了语言)
query_filter = None
if language:
from qdrant_client.http import models
query_filter = models.HasIdCondition(
has_id=[
# 这里简化处理,实际需要更复杂的过滤
]
)
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
limit=limit,
query_filter=query_filter
)
articles = []
for result in results:
articles.append({
'score': result.score,
**result.payload
})
return articles
def update_article(self, article_id: int, updated_content: Dict):
"""更新知识库文章"""
from qdrant_client.http import models
point = self.client.retrieve(
collection_name=self.collection_name,
ids=[article_id]
)[0]
# 重新编码更新后的内容
text = f"{updated_content['title']}\n{updated_content['content']}"
vector = self.encoder.encode(text).tolist()
updated_payload = {**point.payload, **updated_content}
new_point = models.PointStruct(
id=article_id,
vector=vector,
payload=updated_payload
)
self.client.upsert(
collection_name=self.collection_name,
points=[new_point]
)
print(f"✓ Article {article_id} updated")
# 使用示例
if __name__ == '__main__':
indexer = KnowledgeBaseIndexer()
indexer.create_collection()
# 加载知识库
articles = indexer.load_kb_from_file('data/customer_kb.json')
indexer.index_articles(articles)
# 搜索示例
results = indexer.search("How do I reset my password?", limit=3)
for result in results:
print(f"- {result['title']} (score: {result['score']:.2f})")
{
"account": [
{
"title": "How to reset my password",
"content": "To reset your password:\n1. Click 'Forgot Password' on the login page\n2. Enter your email address\n3. Check your email for the reset link\n4. Click the link and create a new password\n5. Use your new password to log in",
"language": "en",
"tags": ["password", "account", "login"],
"priority": "high",
"lastUpdated": "2024-01-15"
},
{
"title": "忘记密码",
"content": "重置密码步骤:\n1. 点击登录页面的\"忘记密码\"\n2. 输入您的邮箱地址\n3. 查看邮箱中的重置链接\n4. 点击链接并创建新密码\n5. 使用新密码登录",
"language": "zh",
"tags": ["密码", "账户", "登录"],
"priority": "high",
"lastUpdated": "2024-01-15"
}
],
"billing": [
{
"title": "How to update billing information",
"content": "To update your billing information:\n1. Go to Settings > Billing\n2. Click 'Edit Payment Method'\n3. Enter your new card details\n4. Click 'Save'\n5. A confirmation email will be sent",
"language": "en",
"tags": ["billing", "payment", "card"],
"priority": "normal",
"lastUpdated": "2024-01-10"
}
],
"technical": [
{
"title": "What to do when encountering API errors",
"content": "Common API error solutions:\n- 401 Unauthorized: Check your API key\n- 429 Too Many Requests: Implement rate limiting (max 100 req/min)\n- 500 Internal Server Error: Our team is investigating, contact support",
"language": "en",
"tags": ["api", "error", "troubleshooting"],
"priority": "high",
"lastUpdated": "2024-01-12"
}
]
}
#!/bin/bash
# tests/load_test.sh
set -e
GATEWAY_URL="http://localhost:18789"
CONCURRENT_USERS=50
DURATION=300
RESULTS_FILE="load_test_results.json"
echo "=== OpenClaw 客户支持智能体压力测试 ==="
echo "并发用户数: $CONCURRENT_USERS"
echo "测试时长: ${DURATION}s"
echo ""
# 生成测试消息
generate_test_message() {
local user_id=$1
local message_templates=(
"How do I reset my password?"
"我需要修改账单信息"
"API返回401错误"
"Can you help with billing?"
)
local random_idx=$((RANDOM % ${#message_templates[@]}))
local message="${message_templates[$random_idx]}"
echo "{
\"sender\": {
\"id\": \"user_${user_id}\",
\"email\": \"user${user_id}@example.com\"
},
\"channel\": \"email\",
\"text\": \"$message\"
}"
}
# 使用ab工具进行压力测试
echo "[1/2] 使用Apache Bench进行HTTP压力测试..."
ab -n 5000 \
-c $CONCURRENT_USERS \
-H "Content-Type: application/json" \
-p <(generate_test_message 1) \
"$GATEWAY_URL/webhook/support" | tee -a load_test_results.txt
# 使用wrk工具进行更复杂的压力测试
echo "[2/2] 使用Wrk进行持续压力测试..."
cat > /tmp/wrk_script.lua << 'EOF'
request = function()
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/json"
wrk.body = '{"sender":{"id":"user1","email":"test@example.com"},"channel":"email","text":"How to reset password?"}'
return wrk.format(nil)
end
response = function(status, headers, body)
if status == 200 then
io.write("✓")
else
io.write("✗")
end
end
EOF
wrk -t 8 -c $CONCURRENT_USERS -d ${DURATION}s \
-s /tmp/wrk_script.lua \
"$GATEWAY_URL/webhook/support" | tee -a load_test_results.txt
echo ""
echo "✓ 压力测试完成"
echo "结果已保存至: load_test_results.txt"