Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
concurrent_manager.py12 kB
#!/usr/bin/env python3 """ MemOS 并发访问管理器 支持多进程安全访问,解决Qdrant单实例限制 """ import os import sys import time import threading import multiprocessing from pathlib import Path from typing import Optional, Dict, Any import json import fcntl from contextlib import contextmanager class MemOSConcurrentManager: """MemOS并发访问管理器""" def __init__(self, data_dir: str = "./memos_data"): # 专家建议:断言检查防止退回embedded默认 assert data_dir, "必须显式传入 data_dir,防止退回 embedded 默认" self.data_dir = Path(data_dir) self.lock_dir = self.data_dir / "locks" self.config_file = self.data_dir / "concurrent_config.json" self.current_pid = os.getpid() # 确保目录存在 self.data_dir.mkdir(exist_ok=True) self.lock_dir.mkdir(exist_ok=True) # 加载配置 self.config = self._load_config() # 专家建议:打印关键并发参数用于调试 print(f"[CONCURRENT_MGR] cfg_path={self.config_file} mode={self.config['qdrant_mode']} port={self.config['qdrant_port']}") # 初始化锁 self._init_locks() def _load_config(self) -> Dict[str, Any]: """加载并发配置""" default_config = { "max_concurrent_processes": 5, # 最大并发进程数 "qdrant_mode": "server", # 专家建议:强制使用server模式避免并发冲突 "qdrant_port": 6335, # 专家建议:统一使用6335端口 "lock_timeout": 60, # 锁超时时间(秒) "allowed_process_types": ["cli", "mcp", "api", "test", "analysis"], "resource_sharing": { "vectors": "shared", # shared/exclusive "memory": "shared", "config": "shared" }, "concurrent_access": True, # 允许并发访问 "process_isolation": False # 不强制进程隔离 } if self.config_file.exists(): try: with open(self.config_file, 'r') as f: config = json.load(f) # 合并默认配置 for key, value in default_config.items(): if key not in config: config[key] = value print(f"[DEBUG] loaded config from {self.config_file} → qdrant_port={config.get('qdrant_port')}") return config except Exception as e: print(f"配置文件加载失败,使用默认配置: {e}") # 保存默认配置 self._save_config(default_config) print(f"[DEBUG] using default config → qdrant_port={default_config.get('qdrant_port')}") return default_config def _save_config(self, config: Dict[str, Any]): """保存配置""" try: with open(self.config_file, 'w') as f: json.dump(config, f, indent=2, ensure_ascii=False) except Exception as e: print(f"配置保存失败: {e}") def _init_locks(self): """初始化锁文件""" self.process_lock_file = self.lock_dir / "process.lock" self.qdrant_lock_file = self.lock_dir / "qdrant.lock" self.resource_lock_file = self.lock_dir / "resource.lock" @contextmanager def acquire_resource_lock(self, resource_name: str, mode: str = "shared"): """获取资源锁的上下文管理器""" lock_file = self.lock_dir / f"{resource_name}.lock" lock_fd = None try: # 创建锁文件 lock_fd = open(lock_file, 'w') # 根据模式选择锁类型 if mode == "exclusive": lock_type = fcntl.LOCK_EX else: # shared lock_type = fcntl.LOCK_SH # 尝试获取锁(非阻塞) try: fcntl.flock(lock_fd.fileno(), lock_type | fcntl.LOCK_NB) except IOError: # 如果是共享模式,尝试等待 if mode == "shared": print(f"等待 {resource_name} 资源锁...") fcntl.flock(lock_fd.fileno(), lock_type) else: raise # 写入进程信息 lock_fd.write(json.dumps({ "pid": self.current_pid, "mode": mode, "timestamp": time.time(), "resource": resource_name })) lock_fd.flush() yield lock_fd finally: if lock_fd: try: fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN) lock_fd.close() except Exception: pass # 清理锁文件 try: if lock_file.exists(): lock_file.unlink() except Exception: pass def register_process(self, process_type: str) -> bool: """注册进程""" if process_type not in self.config["allowed_process_types"]: print(f"不支持的进程类型: {process_type}") return False # 如果允许并发访问,简化注册流程 if self.config.get("concurrent_access", False): try: with self.acquire_resource_lock("process_registry", "shared"): registry_file = self.lock_dir / "process_registry.json" # 读取现有注册信息 registry = {} if registry_file.exists(): try: with open(registry_file, 'r') as f: registry = json.load(f) except Exception: registry = {} # 简单清理过期进程 current_time = time.time() active_processes = {} for pid_str, info in registry.items(): pid = int(pid_str) try: os.kill(pid, 0) # 检查进程存在 active_processes[pid_str] = info except OSError: pass # 进程不存在,跳过 # 宽松的并发限制检查 if len(active_processes) >= self.config["max_concurrent_processes"]: print(f"⚠️ 已达到建议的最大并发进程数: {self.config['max_concurrent_processes']}") print("继续启动可能影响性能,但仍然允许...") # 注册当前进程 active_processes[str(self.current_pid)] = { "type": process_type, "timestamp": current_time, "data_dir": str(self.data_dir) } # 保存注册信息 with open(registry_file, 'w') as f: json.dump(active_processes, f, indent=2) print(f"✅ 进程注册成功: PID {self.current_pid}, 类型: {process_type}") print(f"📊 当前活跃进程数: {len(active_processes)}") return True except Exception as e: print(f"⚠️ 进程注册失败,但允许继续: {e}") return True # 宽松模式,即使注册失败也允许继续 # 原有的严格模式逻辑保持不变 return self._register_process_strict(process_type) def _register_process_strict(self, process_type: str) -> bool: """严格模式的进程注册""" try: with self.acquire_resource_lock("process_registry", "exclusive"): # ... 原有的严格检查逻辑 return True except Exception as e: print(f"进程注册失败: {e}") return False def unregister_process(self): """注销进程""" try: with self.acquire_resource_lock("process_registry", "exclusive"): registry_file = self.lock_dir / "process_registry.json" if not registry_file.exists(): return # 读取注册信息 with open(registry_file, 'r') as f: registry = json.load(f) # 移除当前进程 if str(self.current_pid) in registry: del registry[str(self.current_pid)] # 保存更新后的注册信息 with open(registry_file, 'w') as f: json.dump(registry, f, indent=2) print(f"✅ 进程注销成功: PID {self.current_pid}") except Exception as e: print(f"进程注销失败: {e}") def get_qdrant_config(self) -> Dict[str, Any]: """获取Qdrant配置""" if self.config["qdrant_mode"] == "server": config = { "mode": "server", "host": "localhost", "port": self.config["qdrant_port"], "url": f"http://localhost:{self.config['qdrant_port']}" } print(f"[DEBUG] get_qdrant_config returning server mode: {config}") return config else: config = { "mode": "embedded", "path": str(self.data_dir / "vectors") } print(f"[DEBUG] get_qdrant_config returning embedded mode: {config}") return config def start_qdrant_server_if_needed(self) -> bool: """如果需要,启动Qdrant服务器""" if self.config["qdrant_mode"] != "server": return True try: import requests # 检查Qdrant服务器是否已运行 response = requests.get(f"http://localhost:{self.config['qdrant_port']}/health", timeout=2) if response.status_code == 200: print("✅ Qdrant服务器已运行") return True except Exception: pass # 尝试启动Qdrant服务器 print("🚀 启动Qdrant服务器...") try: import subprocess # 使用Docker启动Qdrant cmd = [ "docker", "run", "-d", "--name", "qdrant-memos", "-p", f"{self.config['qdrant_port']}:6333", "-v", f"{self.data_dir}/qdrant_storage:/qdrant/storage", "qdrant/qdrant" ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: print("✅ Qdrant服务器启动成功") time.sleep(3) # 等待服务器启动 return True else: print(f"Qdrant服务器启动失败: {result.stderr}") return False except Exception as e: print(f"启动Qdrant服务器失败: {e}") return False def cleanup(self): """清理资源""" print("🧹 清理并发管理器资源...") self.unregister_process() def main(): """测试函数""" manager = MemOSConcurrentManager() if manager.register_process("test"): print("进程注册成功,按回车键注销...") input() manager.cleanup() else: print("进程注册失败") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server