Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
compression_pipeline.py23.8 kB
""" 自动摘要压缩管线 当记忆数量超过阈值时,自动将长文本通过LLM摘要后重新嵌入,原文归档为JSONL """ import json import time import asyncio from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, List, Optional, Tuple from dataclasses import dataclass import threading import schedule from mvp_memory import MVPMemoryManager from capacity_manager import MemoryType @dataclass class CompressionConfig: """压缩配置""" # 触发阈值 token_threshold: int = 1000 # 1k token阈值 memory_count_threshold: int = 100 # 记忆数量阈值 # 压缩策略 compression_ratio: float = 0.3 # 压缩比例(保留30%的原始长度) min_content_length: int = 200 # 最小内容长度(低于此长度不压缩) max_content_length: int = 5000 # 最大内容长度(超过此长度强制压缩) # 调度配置 schedule_time: str = "02:00" # 夜间2点执行 enable_auto_schedule: bool = True # 启用自动调度 # 归档配置 archive_dir: str = "memory_archives" # 归档目录 keep_original_days: int = 30 # 原文保留天数 @dataclass class CompressionResult: """压缩结果""" original_id: str original_content: str compressed_content: str compression_ratio: float token_saved: int processing_time: float timestamp: str class LLMSummaryService: """LLM摘要服务""" def __init__(self, mvp_manager: MVPMemoryManager): self.mvp_manager = mvp_manager # 获取LLM配置(从官方配置中获取) if hasattr(mvp_manager, 'official_config') and mvp_manager.official_config: llm_config = mvp_manager.official_config.get("memory", {}).get("config", {}).get("extractor_llm", {}) self.model_name = llm_config.get("config", {}).get("model_name_or_path", "deepseek-ai/DeepSeek-V3") self.api_base = llm_config.get("config", {}).get("api_base", "https://api.siliconflow.cn/v1") self.api_key = llm_config.get("config", {}).get("api_key", "") else: # 使用默认配置 self.model_name = "deepseek-ai/DeepSeek-V3" self.api_base = "https://api.siliconflow.cn/v1" self.api_key = "" print(f"[摘要服务] 初始化LLM摘要服务: {self.model_name}") def estimate_tokens(self, text: str) -> int: """估算文本token数量(简化版)""" # 简化的token估算:中文按字符数,英文按单词数*1.3 chinese_chars = sum(1 for char in text if '\u4e00' <= char <= '\u9fff') english_words = len([word for word in text.split() if word.isalpha()]) return int(chinese_chars + english_words * 1.3) def generate_summary(self, content: str, target_ratio: float = 0.3) -> str: """生成内容摘要""" try: # 计算目标长度 original_tokens = self.estimate_tokens(content) target_tokens = int(original_tokens * target_ratio) target_length = max(100, min(target_tokens, 500)) # 限制在100-500字符之间 # 构建摘要提示 summary_prompt = f"""请将以下内容压缩为大约{target_length}字符的摘要,保留核心信息和关键细节: 原文内容: {content} 要求: 1. 保留最重要的信息和关键词 2. 保持原文的主要观点和结论 3. 使用简洁明了的语言 4. 长度控制在{target_length}字符左右 摘要:""" # 使用SiliconFlow API生成摘要 if hasattr(self.mvp_manager, 'memory') and hasattr(self.mvp_manager.memory, 'llm_client'): try: # 使用现有的LLM客户端 response = self.mvp_manager.memory.llm_client.chat.completions.create( model=self.model_name, messages=[ {"role": "system", "content": "你是一个专业的文本摘要助手,擅长提取核心信息并生成简洁的摘要。"}, {"role": "user", "content": summary_prompt} ], temperature=0.1, max_tokens=target_tokens + 100 ) summary = response.choices[0].message.content.strip() # 验证摘要质量 if len(summary) < 50: # 摘要太短,使用备用方案 summary = self._fallback_summary(content, target_length) return summary except Exception as e: print(f"[摘要服务] LLM摘要失败: {e}") return self._fallback_summary(content, target_length) else: # 没有LLM客户端,使用备用方案 return self._fallback_summary(content, target_length) except Exception as e: print(f"[摘要服务] 摘要生成异常: {e}") return self._fallback_summary(content, target_length) def _fallback_summary(self, content: str, target_length: int) -> str: """备用摘要方案(基于规则的简单摘要)""" try: # 简单的摘要策略:取前面部分+关键句子 sentences = content.replace('。', '。\n').replace('!', '!\n').replace('?', '?\n').split('\n') sentences = [s.strip() for s in sentences if s.strip()] # 选择重要句子 important_sentences = [] current_length = 0 # 优先选择包含关键词的句子 keywords = ['重要', '关键', '核心', '主要', '总结', '结论', '发现', '问题', '解决', '方法'] # 先添加包含关键词的句子 for sentence in sentences: if any(keyword in sentence for keyword in keywords): if current_length + len(sentence) <= target_length: important_sentences.append(sentence) current_length += len(sentence) # 如果还有空间,添加前面的句子 for sentence in sentences: if sentence not in important_sentences: if current_length + len(sentence) <= target_length: important_sentences.append(sentence) current_length += len(sentence) else: break # 如果没有选中任何句子,直接截取前面部分 if not important_sentences: summary = content[:target_length] + "..." else: summary = "。".join(important_sentences) if len(summary) > target_length: summary = summary[:target_length] + "..." return summary except Exception as e: print(f"[摘要服务] 备用摘要失败: {e}") # 最后的备用方案:简单截取 return content[:target_length] + "..." if len(content) > target_length else content class MemoryCompressionPipeline: """记忆压缩管线""" def __init__(self, mvp_manager: MVPMemoryManager, config: Optional[CompressionConfig] = None): self.mvp_manager = mvp_manager self.config = config or CompressionConfig() self.summary_service = LLMSummaryService(mvp_manager) # 创建归档目录 self.archive_dir = Path(self.config.archive_dir) self.archive_dir.mkdir(exist_ok=True) # 压缩统计 self.compression_stats = { "total_compressed": 0, "total_tokens_saved": 0, "last_compression_time": None, "compression_history": [] } # 调度器 self.scheduler_thread = None self.is_running = False print(f"[压缩管线] 初始化完成,归档目录: {self.archive_dir}") def should_compress_memory(self, memory: Dict[str, Any], test_mode: bool = False) -> bool: """判断记忆是否需要压缩""" content = memory.get("content", "") # 检查内容长度 if len(content) < self.config.min_content_length: return False # 检查token数量 token_count = self.summary_service.estimate_tokens(content) if token_count < self.config.token_threshold: return False # 检查是否已经是压缩过的内容 metadata = memory.get("metadata", {}) if metadata.get("compressed", False): return False # 测试模式下跳过时间检查 if not test_mode: # 检查创建时间(避免压缩太新的记忆) created_time = metadata.get("timestamp", "") if created_time: try: created_dt = datetime.fromisoformat(created_time.replace('Z', '+00:00')) if datetime.now() - created_dt < timedelta(hours=24): return False except: pass return True def get_memories_for_compression(self, test_mode: bool = False) -> List[Dict[str, Any]]: """获取需要压缩的记忆列表""" try: # 获取所有记忆 all_memories = [] # 从MVP管理器获取记忆(这里需要实现获取所有记忆的方法) if hasattr(self.mvp_manager.memory, 'get_all_memories'): all_memories = self.mvp_manager.memory.get_all_memories() else: # 备用方案:通过搜索获取记忆 search_results = self.mvp_manager.recall("测试", top_k=100) # 获取测试记忆 all_memories = search_results # 筛选需要压缩的记忆 memories_to_compress = [] for memory in all_memories: if self.should_compress_memory(memory, test_mode=test_mode): memories_to_compress.append(memory) # 按token数量排序,优先压缩最大的 memories_to_compress.sort( key=lambda m: self.summary_service.estimate_tokens(m.get("content", "")), reverse=True ) return memories_to_compress except Exception as e: print(f"[压缩管线] 获取压缩候选记忆失败: {e}") return [] def archive_original_memory(self, memory: Dict[str, Any]) -> str: """归档原始记忆到JSONL文件""" try: # 生成归档文件名(按日期分组) today = datetime.now().strftime("%Y-%m-%d") archive_file = self.archive_dir / f"memories_{today}.jsonl" # 准备归档数据 archive_data = { "id": memory.get("id", ""), "content": memory.get("content", ""), "tags": memory.get("tags", []), "metadata": memory.get("metadata", {}), "archived_at": datetime.now().isoformat(), "archive_reason": "auto_compression" } # 写入JSONL文件 with open(archive_file, 'a', encoding='utf-8') as f: f.write(json.dumps(archive_data, ensure_ascii=False) + '\n') return str(archive_file) except Exception as e: print(f"[压缩管线] 归档记忆失败: {e}") return "" def compress_memory(self, memory: Dict[str, Any]) -> Optional[CompressionResult]: """压缩单个记忆""" start_time = time.time() try: original_content = memory.get("content", "") original_tokens = self.summary_service.estimate_tokens(original_content) print(f"[压缩管线] 开始压缩记忆 #{memory.get('id', 'unknown')} ({original_tokens} tokens)") # 生成摘要 compressed_content = self.summary_service.generate_summary( original_content, self.config.compression_ratio ) compressed_tokens = self.summary_service.estimate_tokens(compressed_content) actual_ratio = compressed_tokens / original_tokens if original_tokens > 0 else 0 tokens_saved = original_tokens - compressed_tokens # 归档原始记忆 archive_path = self.archive_original_memory(memory) # 更新记忆内容和元数据 new_metadata = memory.get("metadata", {}).copy() new_metadata.update({ "compressed": True, "compression_timestamp": datetime.now().isoformat(), "original_tokens": original_tokens, "compressed_tokens": compressed_tokens, "compression_ratio": actual_ratio, "archive_path": archive_path }) # 这里需要实现更新记忆的方法 # 由于MVP管理器可能没有直接的更新方法,我们先记录压缩结果 processing_time = time.time() - start_time result = CompressionResult( original_id=str(memory.get("id", "")), original_content=original_content, compressed_content=compressed_content, compression_ratio=actual_ratio, token_saved=tokens_saved, processing_time=processing_time, timestamp=datetime.now().isoformat() ) print(f"[压缩管线] 压缩完成: {original_tokens} → {compressed_tokens} tokens " f"(节省 {tokens_saved} tokens, {actual_ratio:.1%})") return result except Exception as e: print(f"[压缩管线] 压缩记忆失败: {e}") return None def run_compression_batch(self, max_memories: int = 50, test_mode: bool = False) -> Dict[str, Any]: """运行批量压缩""" print(f"[压缩管线] 开始批量压缩,最大处理数量: {max_memories}{'(测试模式)' if test_mode else ''}") start_time = time.time() # 获取需要压缩的记忆 memories_to_compress = self.get_memories_for_compression(test_mode=test_mode) if not memories_to_compress: print("[压缩管线] 没有需要压缩的记忆") return { "processed": 0, "compressed": 0, "tokens_saved": 0, "duration": time.time() - start_time, "message": "没有需要压缩的记忆" } # 限制处理数量 memories_to_process = memories_to_compress[:max_memories] print(f"[压缩管线] 找到 {len(memories_to_compress)} 条候选记忆,将处理 {len(memories_to_process)} 条") # 批量压缩 compression_results = [] total_tokens_saved = 0 for i, memory in enumerate(memories_to_process, 1): print(f"[压缩管线] 处理进度: {i}/{len(memories_to_process)}") result = self.compress_memory(memory) if result: compression_results.append(result) total_tokens_saved += result.token_saved # 更新统计 self.compression_stats["total_compressed"] += 1 self.compression_stats["total_tokens_saved"] += result.token_saved self.compression_stats["compression_history"].append(result) # 更新统计信息 self.compression_stats["last_compression_time"] = datetime.now().isoformat() # 保存压缩报告 self._save_compression_report(compression_results) duration = time.time() - start_time summary = { "processed": len(memories_to_process), "compressed": len(compression_results), "tokens_saved": total_tokens_saved, "duration": duration, "compression_results": [ { "id": r.original_id, "tokens_saved": r.token_saved, "compression_ratio": r.compression_ratio } for r in compression_results ] } print(f"[压缩管线] 批量压缩完成: 处理 {len(memories_to_process)} 条," f"成功压缩 {len(compression_results)} 条,节省 {total_tokens_saved} tokens," f"耗时 {duration:.2f}s") return summary def _save_compression_report(self, results: List[CompressionResult]): """保存压缩报告""" try: report_file = self.archive_dir / f"compression_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" report_data = { "timestamp": datetime.now().isoformat(), "total_processed": len(results), "total_tokens_saved": sum(r.token_saved for r in results), "average_compression_ratio": sum(r.compression_ratio for r in results) / len(results) if results else 0, "results": [ { "original_id": r.original_id, "compression_ratio": r.compression_ratio, "token_saved": r.token_saved, "processing_time": r.processing_time, "timestamp": r.timestamp } for r in results ] } with open(report_file, 'w', encoding='utf-8') as f: json.dump(report_data, f, indent=2, ensure_ascii=False) print(f"[压缩管线] 压缩报告已保存: {report_file}") except Exception as e: print(f"[压缩管线] 保存压缩报告失败: {e}") def start_scheduler(self): """启动定时调度器""" if not self.config.enable_auto_schedule: print("[压缩管线] 自动调度已禁用") return if self.is_running: print("[压缩管线] 调度器已在运行") return # 设置定时任务 schedule.clear() schedule.every().day.at(self.config.schedule_time).do(self._scheduled_compression) self.is_running = True def run_scheduler(): while self.is_running: schedule.run_pending() time.sleep(60) # 每分钟检查一次 self.scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) self.scheduler_thread.start() print(f"[压缩管线] 定时调度器已启动,每天 {self.config.schedule_time} 执行压缩") def stop_scheduler(self): """停止定时调度器""" self.is_running = False if self.scheduler_thread: self.scheduler_thread.join(timeout=5) schedule.clear() print("[压缩管线] 定时调度器已停止") def _scheduled_compression(self): """定时压缩任务""" print("[压缩管线] 执行定时压缩任务") try: result = self.run_compression_batch() print(f"[压缩管线] 定时压缩完成: {result}") except Exception as e: print(f"[压缩管线] 定时压缩失败: {e}") def get_compression_stats(self) -> Dict[str, Any]: """获取压缩统计信息""" return self.compression_stats.copy() def cleanup_old_archives(self, days: int = None): """清理旧的归档文件""" days = days or self.config.keep_original_days cutoff_date = datetime.now() - timedelta(days=days) try: cleaned_files = 0 for archive_file in self.archive_dir.glob("*.jsonl"): # 从文件名提取日期 try: date_str = archive_file.stem.split('_')[1] # memories_2025-01-15.jsonl file_date = datetime.strptime(date_str, "%Y-%m-%d") if file_date < cutoff_date: archive_file.unlink() cleaned_files += 1 print(f"[压缩管线] 清理旧归档文件: {archive_file}") except (ValueError, IndexError): # 文件名格式不匹配,跳过 continue print(f"[压缩管线] 清理完成,删除了 {cleaned_files} 个旧归档文件") except Exception as e: print(f"[压缩管线] 清理归档文件失败: {e}") def create_compression_pipeline(mvp_manager: MVPMemoryManager, config: Optional[CompressionConfig] = None) -> MemoryCompressionPipeline: """创建压缩管线的工厂函数""" return MemoryCompressionPipeline(mvp_manager, config) if __name__ == "__main__": # 测试压缩管线 print("🚀 测试自动摘要压缩管线") print("=" * 50) try: from mvp_memory import create_mvp_memory_manager # 创建MVP管理器 mvp_manager = create_mvp_memory_manager(use_official_config=True) # 创建压缩管线 config = CompressionConfig( token_threshold=100, # 降低阈值用于测试 enable_auto_schedule=False # 测试时禁用自动调度 ) pipeline = create_compression_pipeline(mvp_manager, config) # 添加一些测试记忆 test_memories = [ "这是一个很长的测试记忆内容,包含了大量的详细信息和描述。" * 10, "另一个长文本记忆,用于测试自动摘要压缩功能的效果。" * 8, "第三个测试记忆,验证压缩管线是否能够正确处理不同长度的文本内容。" * 12 ] for i, content in enumerate(test_memories): success = mvp_manager.remember(content, tags=[f"测试{i+1}", "压缩测试"]) print(f"测试记忆 {i+1}: {'✅' if success else '❌'}") # 运行压缩测试 print("\n🗜️ 运行压缩测试:") result = pipeline.run_compression_batch(max_memories=10, test_mode=True) print(f"\n📊 压缩结果:") print(f" 处理记忆数: {result['processed']}") print(f" 成功压缩: {result['compressed']}") print(f" 节省tokens: {result['tokens_saved']}") print(f" 处理时间: {result['duration']:.2f}s") # 获取统计信息 stats = pipeline.get_compression_stats() print(f"\n📈 压缩统计:") print(f" 总压缩数: {stats['total_compressed']}") print(f" 总节省tokens: {stats['total_tokens_saved']}") print("\n✅ 自动摘要压缩管线测试完成!") except Exception as e: print(f"❌ 测试失败: {e}") import traceback traceback.print_exc()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server