Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
capacity_manager.py14.8 kB
""" 智能容量管理器 基于MemOS官方文档的容量规划建议,实现智能的内存容量管理和性能监控 """ import time import json import threading from pathlib import Path from typing import Dict, Any, List, Optional, Tuple from datetime import datetime, timedelta from dataclasses import dataclass, asdict from enum import Enum class MemoryType(Enum): """内存类型枚举""" WORKING = "working" USER = "user" LONG_TERM = "long_term" @dataclass class CapacityConfig: """容量配置""" working_memory_capacity: int = 20 user_memory_capacity: int = 500 long_term_memory_capacity: int = 2000 # 触发阈值(百分比) warning_threshold: float = 0.8 # 80%时警告 critical_threshold: float = 0.9 # 90%时触发压缩 # 批处理配置 batch_size: int = 32 max_concurrent_requests: int = 4 # 性能监控配置 monitoring_interval: int = 60 # 监控间隔(秒) performance_history_size: int = 100 # 保留的性能历史记录数 @dataclass class MemoryStats: """内存统计信息""" memory_type: str current_count: int capacity: int usage_percentage: float last_updated: str def is_warning(self, threshold: float = 0.8) -> bool: return self.usage_percentage >= threshold def is_critical(self, threshold: float = 0.9) -> bool: return self.usage_percentage >= threshold @dataclass class PerformanceMetrics: """性能指标""" timestamp: str memory_add_avg_time: float memory_search_avg_time: float memory_total_count: int memory_usage_by_type: Dict[str, MemoryStats] system_health: str # 资源使用情况 cpu_usage: float = 0.0 memory_usage_mb: float = 0.0 disk_usage_mb: float = 0.0 class IntelligentCapacityManager: """智能容量管理器""" def __init__(self, data_dir: str = "./memos_data", config: Optional[CapacityConfig] = None): self.data_dir = Path(data_dir) self.data_dir.mkdir(exist_ok=True) # 配置 self.config = config or CapacityConfig() # 状态管理 self.memory_stats: Dict[str, MemoryStats] = {} self.performance_history: List[PerformanceMetrics] = [] self.is_monitoring = False self.monitor_thread: Optional[threading.Thread] = None # 性能计时器 self.operation_times: Dict[str, List[float]] = { "memory_add": [], "memory_search": [] } # 文件路径 self.stats_file = self.data_dir / "capacity_stats.json" self.performance_file = self.data_dir / "performance_metrics.json" # 加载历史数据 self._load_historical_data() print(f"✅ 智能容量管理器初始化完成") print(f"📊 容量配置: working={self.config.working_memory_capacity}, " f"user={self.config.user_memory_capacity}, " f"long_term={self.config.long_term_memory_capacity}") def update_memory_stats(self, memory_type: MemoryType, current_count: int) -> MemoryStats: """更新内存统计信息""" capacity_map = { MemoryType.WORKING: self.config.working_memory_capacity, MemoryType.USER: self.config.user_memory_capacity, MemoryType.LONG_TERM: self.config.long_term_memory_capacity } capacity = capacity_map[memory_type] usage_percentage = (current_count / capacity) * 100 if capacity > 0 else 0 stats = MemoryStats( memory_type=memory_type.value, current_count=current_count, capacity=capacity, usage_percentage=usage_percentage, last_updated=datetime.now().isoformat() ) self.memory_stats[memory_type.value] = stats # 检查是否需要触发压缩 if stats.is_critical(self.config.critical_threshold): self._trigger_compression(memory_type, stats) elif stats.is_warning(self.config.warning_threshold): self._trigger_warning(memory_type, stats) return stats def record_operation_time(self, operation: str, duration: float): """记录操作时间""" if operation in self.operation_times: self.operation_times[operation].append(duration) # 保持历史记录在合理范围内 if len(self.operation_times[operation]) > 100: self.operation_times[operation] = self.operation_times[operation][-50:] def get_performance_metrics(self) -> PerformanceMetrics: """获取当前性能指标""" # 计算平均操作时间 add_times = self.operation_times.get("memory_add", []) search_times = self.operation_times.get("memory_search", []) add_avg = sum(add_times) / len(add_times) if add_times else 0 search_avg = sum(search_times) / len(search_times) if search_times else 0 # 计算总内存数量 total_count = sum(stats.current_count for stats in self.memory_stats.values()) # 评估系统健康状态 health = self._evaluate_system_health() # 获取系统资源使用情况 cpu_usage, memory_usage, disk_usage = self._get_system_resources() metrics = PerformanceMetrics( timestamp=datetime.now().isoformat(), memory_add_avg_time=add_avg, memory_search_avg_time=search_avg, memory_total_count=total_count, memory_usage_by_type=self.memory_stats.copy(), system_health=health, cpu_usage=cpu_usage, memory_usage_mb=memory_usage, disk_usage_mb=disk_usage ) return metrics def start_monitoring(self): """启动性能监控""" if self.is_monitoring: print("⚠️ 监控已在运行中") return self.is_monitoring = True self.monitor_thread = threading.Thread(target=self._monitoring_loop, daemon=True) self.monitor_thread.start() print(f"🔍 性能监控已启动,间隔: {self.config.monitoring_interval}秒") def stop_monitoring(self): """停止性能监控""" self.is_monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=5) print("⏹️ 性能监控已停止") def _monitoring_loop(self): """监控循环""" while self.is_monitoring: try: # 收集性能指标 metrics = self.get_performance_metrics() # 添加到历史记录 self.performance_history.append(metrics) # 保持历史记录在合理范围内 if len(self.performance_history) > self.config.performance_history_size: self.performance_history = self.performance_history[-self.config.performance_history_size//2:] # 保存数据 self._save_performance_data() # 检查是否需要自动优化 self._check_auto_optimization(metrics) except Exception as e: print(f"❌ 监控循环错误: {e}") time.sleep(self.config.monitoring_interval) def _trigger_compression(self, memory_type: MemoryType, stats: MemoryStats): """触发压缩操作""" print(f"🗜️ 触发{memory_type.value}内存压缩: {stats.usage_percentage:.1f}% > {self.config.critical_threshold*100}%") # 这里可以集成自动摘要压缩管线 # 暂时记录压缩事件 compression_event = { "timestamp": datetime.now().isoformat(), "memory_type": memory_type.value, "usage_percentage": stats.usage_percentage, "action": "compression_triggered" } self._log_capacity_event(compression_event) def _trigger_warning(self, memory_type: MemoryType, stats: MemoryStats): """触发警告""" print(f"⚠️ {memory_type.value}内存使用警告: {stats.usage_percentage:.1f}% > {self.config.warning_threshold*100}%") warning_event = { "timestamp": datetime.now().isoformat(), "memory_type": memory_type.value, "usage_percentage": stats.usage_percentage, "action": "warning_triggered" } self._log_capacity_event(warning_event) def _evaluate_system_health(self) -> str: """评估系统健康状态""" if not self.memory_stats: return "unknown" critical_count = sum(1 for stats in self.memory_stats.values() if stats.is_critical(self.config.critical_threshold)) warning_count = sum(1 for stats in self.memory_stats.values() if stats.is_warning(self.config.warning_threshold)) if critical_count > 0: return "critical" elif warning_count > 0: return "warning" else: return "healthy" def _get_system_resources(self) -> Tuple[float, float, float]: """获取系统资源使用情况""" try: import psutil # CPU使用率 cpu_usage = psutil.cpu_percent(interval=1) # 内存使用情况 memory_info = psutil.virtual_memory() memory_usage_mb = memory_info.used / (1024 * 1024) # 磁盘使用情况 disk_info = psutil.disk_usage(str(self.data_dir)) disk_usage_mb = disk_info.used / (1024 * 1024) return cpu_usage, memory_usage_mb, disk_usage_mb except ImportError: # 如果psutil不可用,返回默认值 return 0.0, 0.0, 0.0 except Exception as e: print(f"⚠️ 获取系统资源失败: {e}") return 0.0, 0.0, 0.0 def _check_auto_optimization(self, metrics: PerformanceMetrics): """检查是否需要自动优化""" # 检查响应时间是否过慢 if metrics.memory_add_avg_time > 2.0: # 2秒阈值 print(f"⚠️ 记忆添加响应时间过慢: {metrics.memory_add_avg_time:.2f}s") if metrics.memory_search_avg_time > 1.0: # 1秒阈值 print(f"⚠️ 记忆检索响应时间过慢: {metrics.memory_search_avg_time:.2f}s") # 检查系统资源使用 if metrics.cpu_usage > 80: print(f"⚠️ CPU使用率过高: {metrics.cpu_usage:.1f}%") if metrics.memory_usage_mb > 1024: # 1GB阈值 print(f"⚠️ 内存使用过高: {metrics.memory_usage_mb:.1f}MB") def _log_capacity_event(self, event: Dict[str, Any]): """记录容量事件""" events_file = self.data_dir / "capacity_events.jsonl" try: with open(events_file, 'a', encoding='utf-8') as f: f.write(json.dumps(event, ensure_ascii=False) + '\n') except Exception as e: print(f"❌ 记录容量事件失败: {e}") def _load_historical_data(self): """加载历史数据""" try: if self.performance_file.exists(): with open(self.performance_file, 'r', encoding='utf-8') as f: data = json.load(f) # 只加载最近的数据 self.performance_history = data[-self.config.performance_history_size:] print(f"📊 加载了 {len(self.performance_history)} 条历史性能数据") except Exception as e: print(f"⚠️ 加载历史数据失败: {e}") def _save_performance_data(self): """保存性能数据""" try: # 转换为可序列化的格式 serializable_data = [] for metrics in self.performance_history: data = asdict(metrics) # 转换MemoryStats对象 data['memory_usage_by_type'] = { k: asdict(v) for k, v in data['memory_usage_by_type'].items() } serializable_data.append(data) with open(self.performance_file, 'w', encoding='utf-8') as f: json.dump(serializable_data, f, indent=2, ensure_ascii=False) except Exception as e: print(f"❌ 保存性能数据失败: {e}") def get_capacity_report(self) -> Dict[str, Any]: """获取容量报告""" current_metrics = self.get_performance_metrics() return { "timestamp": datetime.now().isoformat(), "capacity_config": asdict(self.config), "current_metrics": asdict(current_metrics), "memory_stats": {k: asdict(v) for k, v in self.memory_stats.items()}, "system_health": current_metrics.system_health, "monitoring_status": "active" if self.is_monitoring else "inactive", "performance_history_count": len(self.performance_history) } def create_capacity_manager(data_dir: str = "./memos_data", config: Optional[CapacityConfig] = None) -> IntelligentCapacityManager: """创建容量管理器的工厂函数""" return IntelligentCapacityManager(data_dir, config) if __name__ == "__main__": # 测试容量管理器 print("🚀 测试智能容量管理器") print("=" * 50) # 创建管理器 manager = create_capacity_manager() # 模拟内存使用 manager.update_memory_stats(MemoryType.WORKING, 15) # 75% manager.update_memory_stats(MemoryType.USER, 450) # 90% manager.update_memory_stats(MemoryType.LONG_TERM, 1800) # 90% # 模拟操作时间 manager.record_operation_time("memory_add", 0.5) manager.record_operation_time("memory_search", 0.3) # 获取性能指标 metrics = manager.get_performance_metrics() print(f"\n📊 性能指标:") print(f" 系统健康: {metrics.system_health}") print(f" 总记忆数: {metrics.memory_total_count}") print(f" 添加平均时间: {metrics.memory_add_avg_time:.3f}s") print(f" 检索平均时间: {metrics.memory_search_avg_time:.3f}s") # 获取容量报告 report = manager.get_capacity_report() print(f"\n📋 容量报告已生成") print("\n✅ 智能容量管理器测试完成!")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server