Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
process_manager.py8.02 kB
#!/usr/bin/env python3 """ MemOS 进程管理器 解决多进程冲突和Qdrant锁定问题 """ import os import sys import time import signal import psutil import fcntl from pathlib import Path from typing import Optional, List, Dict class MemOSProcessManager: """MemOS进程管理器""" def __init__(self, data_dir: str = "./memos_data"): self.data_dir = Path(data_dir) self.pid_file = self.data_dir / "memos.pid" self.lock_file = self.data_dir / "memos.lock" self.current_pid = os.getpid() # 确保数据目录存在 self.data_dir.mkdir(exist_ok=True) def acquire_lock(self) -> bool: """获取进程锁""" try: # 创建锁文件 self.lock_fd = open(self.lock_file, 'w') # 尝试获取文件锁(非阻塞) fcntl.flock(self.lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) # 写入当前进程ID self.lock_fd.write(str(self.current_pid)) self.lock_fd.flush() return True except (IOError, OSError) as e: if hasattr(self, 'lock_fd'): self.lock_fd.close() return False def release_lock(self): """释放进程锁""" try: if hasattr(self, 'lock_fd'): fcntl.flock(self.lock_fd.fileno(), fcntl.LOCK_UN) self.lock_fd.close() # 删除锁文件 if self.lock_file.exists(): self.lock_file.unlink() except Exception as e: print(f"释放锁失败: {e}", file=sys.stderr) def write_pid_file(self): """写入PID文件""" try: with open(self.pid_file, 'w') as f: f.write(str(self.current_pid)) except Exception as e: print(f"写入PID文件失败: {e}", file=sys.stderr) def read_pid_file(self) -> Optional[int]: """读取PID文件""" try: if self.pid_file.exists(): with open(self.pid_file, 'r') as f: return int(f.read().strip()) except Exception: pass return None def is_process_running(self, pid: int) -> bool: """检查进程是否在运行""" try: return psutil.pid_exists(pid) except Exception: return False def find_memos_processes(self) -> List[Dict]: """查找所有MemOS相关进程""" processes = [] for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: cmdline = ' '.join(proc.info['cmdline'] or []) # 检查是否是MemOS相关进程 if any(keyword in cmdline.lower() for keyword in [ 'memos_cli.py', 'memos_mcp_server.py', 'advanced_examples.py', 'usage_examples.py', 'simple_test.py' ]): processes.append({ 'pid': proc.info['pid'], 'name': proc.info['name'], 'cmdline': cmdline }) except (psutil.NoSuchProcess, psutil.AccessDenied): continue return processes def kill_conflicting_processes(self, exclude_current: bool = True) -> int: """终止冲突的MemOS进程""" killed_count = 0 processes = self.find_memos_processes() for proc_info in processes: pid = proc_info['pid'] # 跳过当前进程 if exclude_current and pid == self.current_pid: continue try: proc = psutil.Process(pid) proc.terminate() # 等待进程结束 try: proc.wait(timeout=5) except psutil.TimeoutExpired: # 强制杀死 proc.kill() proc.wait(timeout=2) print(f"✅ 已终止进程: {pid} ({proc_info['cmdline'][:50]}...)") killed_count += 1 except (psutil.NoSuchProcess, psutil.AccessDenied) as e: print(f"⚠️ 无法终止进程 {pid}: {e}") return killed_count def cleanup_qdrant_locks(self): """清理Qdrant锁文件""" try: vectors_dir = self.data_dir / "vectors" if vectors_dir.exists(): # 查找并删除可能的锁文件 for lock_pattern in ["*.lock", ".lock*", "lock*"]: for lock_file in vectors_dir.rglob(lock_pattern): try: lock_file.unlink() print(f"✅ 已删除锁文件: {lock_file}") except Exception as e: print(f"⚠️ 无法删除锁文件 {lock_file}: {e}") except Exception as e: print(f"清理Qdrant锁失败: {e}", file=sys.stderr) def setup_signal_handlers(self): """设置信号处理器""" def signal_handler(signum, frame): print(f"\n🛑 收到信号 {signum},正在清理...") self.cleanup() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def cleanup(self): """清理资源""" print("🧹 清理进程资源...") self.release_lock() # 删除PID文件 try: if self.pid_file.exists(): self.pid_file.unlink() except Exception: pass def start_exclusive(self) -> bool: """以独占模式启动""" print("🔍 检查现有MemOS进程...") # 1. 查找现有进程(排除当前进程) existing_processes = [p for p in self.find_memos_processes() if p['pid'] != self.current_pid] if existing_processes: print(f"⚠️ 发现 {len(existing_processes)} 个MemOS进程正在运行") for proc in existing_processes: print(f" - PID {proc['pid']}: {proc['cmdline'][:60]}...") # 询问是否终止 response = input("\n是否终止这些进程? (y/N): ").strip().lower() if response in ['y', 'yes']: killed = self.kill_conflicting_processes(exclude_current=True) print(f"✅ 已终止 {killed} 个进程") time.sleep(2) # 等待进程完全退出 else: print("❌ 用户取消,退出启动") return False # 2. 清理Qdrant锁 print("🧹 清理Qdrant锁文件...") self.cleanup_qdrant_locks() # 3. 尝试获取进程锁 print("🔒 获取进程锁...") if not self.acquire_lock(): print("❌ 无法获取进程锁,可能有其他MemOS实例正在运行") return False # 4. 写入PID文件 self.write_pid_file() # 5. 设置信号处理器 self.setup_signal_handlers() print("✅ 进程管理器初始化完成") return True def main(): """主函数 - 用于测试""" manager = MemOSProcessManager() if manager.start_exclusive(): print("🎉 MemOS进程管理器启动成功") print("按 Ctrl+C 退出...") try: while True: time.sleep(1) except KeyboardInterrupt: print("\n👋 退出中...") else: print("❌ 启动失败") sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server