lightweight_qdrant_setup.py•8.77 kB
#!/usr/bin/env python3
"""
轻量级Qdrant服务器模式设置
适合个人单机本地使用场景
"""
import os
import sys
import time
import json
import subprocess
from pathlib import Path
import requests
class LightweightQdrantSetup:
"""轻量级Qdrant设置"""
def __init__(self, data_dir="./memos_data"):
self.data_dir = Path(data_dir)
self.qdrant_port = 6334
self.qdrant_storage = self.data_dir / "qdrant_storage"
self.config_file = self.data_dir / "qdrant_config.json"
def check_docker(self):
"""检查Docker是否可用"""
try:
result = subprocess.run(["docker", "--version"],
capture_output=True, text=True)
if result.returncode == 0:
print("✅ Docker已安装")
return True
else:
print("❌ Docker未安装")
return False
except FileNotFoundError:
print("❌ Docker未找到")
return False
def create_simple_compose(self):
"""创建简化的docker-compose配置"""
compose_content = f"""version: "3.8"
services:
qdrant:
image: qdrant/qdrant:latest
restart: unless-stopped
ports:
- "{self.qdrant_port}:6333"
volumes:
- {self.qdrant_storage.absolute()}:/qdrant/storage
environment:
- QDRANT__SERVICE__MAX_REQUEST_SIZE_MB=16
- QDRANT__LOG_LEVEL=INFO
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:6333/health"]
interval: 30s
timeout: 10s
retries: 3
"""
compose_file = self.data_dir / "docker-compose.yml"
with open(compose_file, 'w') as f:
f.write(compose_content)
print(f"✅ 创建Docker Compose配置: {compose_file}")
return compose_file
def start_qdrant_server(self):
"""启动Qdrant服务器"""
# 确保存储目录存在
self.qdrant_storage.mkdir(parents=True, exist_ok=True)
# 检查是否已经运行
if self.is_qdrant_running():
print("✅ Qdrant服务器已在运行")
return True
# 创建compose文件
compose_file = self.create_simple_compose()
# 启动服务
print("🚀 启动Qdrant服务器...")
try:
result = subprocess.run([
"docker", "compose", "-f", str(compose_file),
"up", "-d"
], capture_output=True, text=True)
if result.returncode == 0:
print("✅ Qdrant容器启动成功")
# 等待服务就绪
if self.wait_for_qdrant():
print("✅ Qdrant服务器已就绪")
return True
else:
print("⚠️ Qdrant服务器启动超时")
return False
else:
print(f"❌ 启动失败: {result.stderr}")
return False
except Exception as e:
print(f"❌ 启动异常: {e}")
return False
def is_qdrant_running(self):
"""检查Qdrant是否运行"""
try:
response = requests.get(f"http://localhost:{self.qdrant_port}/health",
timeout=2)
return response.status_code == 200
except:
return False
def wait_for_qdrant(self, max_wait=60):
"""等待Qdrant服务就绪"""
print("⏳ 等待Qdrant服务启动...")
for i in range(max_wait):
if self.is_qdrant_running():
return True
time.sleep(1)
if i % 10 == 0:
print(f" 等待中... ({i}s)")
return False
def create_unified_config(self):
"""创建统一的配置文件"""
config = {
"qdrant": {
"mode": "server",
"url": f"http://localhost:{self.qdrant_port}",
"collection_name": "memos_unified",
"prefer_grpc": False # 个人使用HTTP足够
},
"memory_policy": {
"use_namespace": True,
"namespace_field": "source", # cli/mcp/角色标识
"enable_public_memory": True,
"public_field": "is_public"
},
"performance": {
"max_concurrent_requests": 5, # 个人使用限制
"request_timeout": 30,
"batch_size": 100
}
}
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
print(f"✅ 创建统一配置: {self.config_file}")
return config
def test_connection(self):
"""测试连接"""
print("🧪 测试Qdrant连接...")
try:
# 测试健康检查
health_response = requests.get(f"http://localhost:{self.qdrant_port}/health")
if health_response.status_code == 200:
print("✅ 健康检查通过")
# 测试集合列表
collections_response = requests.get(f"http://localhost:{self.qdrant_port}/collections")
if collections_response.status_code == 200:
collections = collections_response.json()
print(f"✅ 连接成功,当前集合数: {len(collections.get('result', {}).get('collections', []))}")
return True
else:
print(f"❌ 连接失败: {collections_response.status_code}")
return False
except Exception as e:
print(f"❌ 连接异常: {e}")
return False
def migrate_existing_data(self):
"""迁移现有数据的建议"""
print("\n📋 数据迁移建议:")
print("1. 备份现有数据:")
print(f" cp -r ./memos_data ./memos_data_backup_$(date +%Y%m%d)")
print(f" cp -r ./mcp_memos_data ./mcp_memos_data_backup_$(date +%Y%m%d)")
print("\n2. 更新MemOS配置:")
print(" - 修改 advanced_examples.py 中的 QdrantClient 初始化")
print(" - 使用 url='http://localhost:6333' 替代 path 参数")
print("\n3. 数据合并:")
print(" - CLI数据标记为 source='cli'")
print(" - MCP数据标记为 source='mcp'")
print(" - 公共数据标记为 is_public=True")
def stop_qdrant_server(self):
"""停止Qdrant服务器"""
compose_file = self.data_dir / "docker-compose.yml"
if compose_file.exists():
print("🛑 停止Qdrant服务器...")
subprocess.run([
"docker", "compose", "-f", str(compose_file), "down"
])
print("✅ Qdrant服务器已停止")
else:
print("⚠️ 未找到compose文件")
def main():
"""主函数"""
print("🔧 MemOS轻量级Qdrant服务器模式设置")
print("适合个人单机本地使用场景")
print("=" * 50)
setup = LightweightQdrantSetup()
try:
# 1. 检查Docker
if not setup.check_docker():
print("\n❌ 需要Docker支持")
print("安装命令: sudo apt update && sudo apt install docker.io docker-compose")
return False
# 2. 启动Qdrant服务器
if not setup.start_qdrant_server():
print("❌ Qdrant服务器启动失败")
return False
# 3. 创建配置
config = setup.create_unified_config()
# 4. 测试连接
if not setup.test_connection():
print("❌ 连接测试失败")
return False
# 5. 数据迁移建议
setup.migrate_existing_data()
print("\n🎉 轻量级Qdrant服务器模式设置完成!")
print("\n📋 下一步:")
print("1. 更新MemOS代码使用服务器模式")
print("2. 迁移现有数据到新服务器")
print("3. 测试CLI和MCP同时运行")
print(f"\n🔗 Qdrant管理界面: http://localhost:{setup.qdrant_port}/dashboard")
print(f"📊 健康检查: http://localhost:{setup.qdrant_port}/health")
return True
except KeyboardInterrupt:
print("\n⚠️ 用户中断")
return False
except Exception as e:
print(f"\n❌ 设置失败: {e}")
return False
if __name__ == "__main__":
success = main()
if not success:
sys.exit(1)