setup_concurrent_mode.py•5.32 kB
#!/usr/bin/env python3
"""
MemOS 并发模式设置脚本
配置Qdrant服务器模式以支持多进程访问
"""
import json
import subprocess
import time
import requests
from pathlib import Path
from concurrent_manager import MemOSConcurrentManager
def check_docker():
"""检查Docker是否可用"""
try:
result = subprocess.run(["docker", "--version"], capture_output=True, text=True)
if result.returncode == 0:
print("✅ Docker已安装")
return True
else:
print("❌ Docker未安装")
return False
except FileNotFoundError:
print("❌ Docker未找到")
return False
def start_qdrant_server(port=6333, data_dir="./memos_data"):
"""启动Qdrant服务器"""
data_path = Path(data_dir)
qdrant_storage = data_path / "qdrant_storage"
qdrant_storage.mkdir(parents=True, exist_ok=True)
print(f"🚀 启动Qdrant服务器 (端口: {port})...")
# 检查是否已有容器运行
try:
result = subprocess.run(
["docker", "ps", "-q", "-f", "name=qdrant-memos"],
capture_output=True, text=True
)
if result.stdout.strip():
print("⚠️ Qdrant容器已存在,停止旧容器...")
subprocess.run(["docker", "stop", "qdrant-memos"], capture_output=True)
subprocess.run(["docker", "rm", "qdrant-memos"], capture_output=True)
except Exception:
pass
# 启动新容器
cmd = [
"docker", "run", "-d",
"--name", "qdrant-memos",
"-p", f"{port}:6333",
"-v", f"{qdrant_storage.absolute()}:/qdrant/storage",
"qdrant/qdrant"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("✅ Qdrant容器启动成功")
# 等待服务器启动
print("⏳ 等待Qdrant服务器启动...")
for i in range(30): # 最多等待30秒
try:
response = requests.get(f"http://localhost:{port}/health", timeout=2)
if response.status_code == 200:
print("✅ Qdrant服务器已就绪")
return True
except Exception:
pass
time.sleep(1)
print("⚠️ Qdrant服务器启动超时")
return False
else:
print(f"❌ Qdrant容器启动失败: {result.stderr}")
return False
except Exception as e:
print(f"❌ 启动Qdrant失败: {e}")
return False
def configure_concurrent_mode(data_dir="./memos_data"):
"""配置并发模式"""
manager = MemOSConcurrentManager(data_dir)
# 更新配置为服务器模式
config = manager.config.copy()
config.update({
"max_concurrent_processes": 5, # 增加并发限制
"qdrant_mode": "server",
"qdrant_port": 6333,
"lock_timeout": 60,
"resource_sharing": {
"vectors": "shared",
"memory": "shared",
"config": "shared"
}
})
manager._save_config(config)
print("✅ 并发模式配置已更新")
return True
def test_concurrent_access(data_dir="./memos_data"):
"""测试并发访问"""
print("🧪 测试并发访问...")
# 创建多个管理器实例
managers = []
for i in range(3):
manager = MemOSConcurrentManager(data_dir)
if manager.register_process(f"test_{i}"):
managers.append(manager)
print(f"✅ 测试进程 {i} 注册成功")
else:
print(f"❌ 测试进程 {i} 注册失败")
# 清理测试进程
for manager in managers:
manager.cleanup()
print(f"✅ 并发测试完成,成功注册 {len(managers)} 个进程")
return len(managers) > 1
def main():
"""主函数"""
print("🔧 MemOS 并发模式设置")
print("=" * 50)
# 1. 检查Docker
if not check_docker():
print("\n❌ 需要Docker支持,请先安装Docker")
print("安装指南: https://docs.docker.com/get-docker/")
return False
# 2. 配置并发模式
if not configure_concurrent_mode():
print("❌ 配置并发模式失败")
return False
# 3. 启动Qdrant服务器
if not start_qdrant_server():
print("❌ Qdrant服务器启动失败")
return False
# 4. 测试并发访问
if not test_concurrent_access():
print("❌ 并发访问测试失败")
return False
print("\n🎉 MemOS 并发模式设置完成!")
print("\n📋 使用说明:")
print("1. 现在可以同时运行多个MemOS实例")
print("2. CLI和MCP服务器可以并存")
print("3. 最大并发进程数: 5")
print("4. Qdrant服务器地址: http://localhost:6333")
print("\n🚀 启动命令:")
print("- CLI: python3 memos_cli.py")
print("- MCP: python3 memos_mcp_server.py")
print("\n🛑 停止Qdrant服务器:")
print("docker stop qdrant-memos")
return True
if __name__ == "__main__":
success = main()
if not success:
exit(1)