Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
import_feishu_docs.py9.96 kB
#!/usr/bin/env python3 """ 飞书文档合集导入MemOS脚本 智能处理Markdown文档,自动分块和标签化 """ import os import sys import re from pathlib import Path from typing import List, Dict, Tuple # 添加src路径 sys.path.insert(0, str(Path(__file__).parent / "src")) from memos.configs.embedder import EmbedderConfigFactory from memos.embedders.factory import EmbedderFactory from usage_examples import load_env_file class FeishuDocImporter: """飞书文档导入器""" def __init__(self, docs_path: str, memos_data_path: str = "./feishu_memos_data"): self.docs_path = Path(docs_path) self.memos_data_path = memos_data_path # 加载环境变量 load_env_file() # 初始化MemOS(使用简化版本) from advanced_examples import AdvancedMemOS self.memos = AdvancedMemOS(self.memos_data_path) print(f"📁 文档路径: {self.docs_path}") print(f"💾 MemOS数据路径: {self.memos_data_path}") def extract_metadata(self, content: str, filename: str) -> Dict[str, any]: """提取文档元数据""" metadata = { "filename": filename, "title": "", "category": "", "tags": [], "sections": 0, "length": len(content) } # 提取标题 title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE) if title_match: metadata["title"] = title_match.group(1).strip() else: metadata["title"] = filename.replace('.md', '').replace('-', ' ') # 分类识别 if "MCP" in filename: metadata["category"] = "MCP技术" elif "知识片段库" in filename: metadata["category"] = "知识库" elif "配置" in filename or "指南" in filename: metadata["category"] = "配置指南" elif "问题" in filename or "解决" in filename: metadata["category"] = "问题解决" else: metadata["category"] = "技术文档" # 提取标签 tags = set() # 从文件名提取标签 if "飞书" in filename: tags.add("飞书") if "MCP" in filename: tags.add("MCP") if "OAuth" in filename: tags.add("OAuth") if "配置" in filename: tags.add("配置") if "问题" in filename: tags.add("问题解决") if "指南" in filename: tags.add("指南") # 从内容提取标签 if "token" in content.lower(): tags.add("token") if "权限" in content: tags.add("权限") if "api" in content.lower(): tags.add("API") if "工具" in content: tags.add("工具") metadata["tags"] = list(tags) # 统计章节数 metadata["sections"] = len(re.findall(r'^#{1,6}\s+', content, re.MULTILINE)) return metadata def smart_chunk_document(self, content: str, metadata: Dict) -> List[Dict]: """智能分块文档""" chunks = [] # 按章节分块 sections = re.split(r'\n(?=#{1,6}\s+)', content) for i, section in enumerate(sections): if not section.strip(): continue # 提取章节标题 title_match = re.search(r'^(#{1,6})\s+(.+)$', section, re.MULTILINE) section_title = title_match.group(2).strip() if title_match else f"Section {i+1}" section_level = len(title_match.group(1)) if title_match else 1 # 如果章节太长,进一步分块 if len(section) > 2000: # 按段落分块 paragraphs = section.split('\n\n') current_chunk = "" for para in paragraphs: if len(current_chunk + para) > 1500: if current_chunk: chunks.append({ "content": current_chunk.strip(), "title": section_title, "section_level": section_level, "chunk_type": "paragraph_group", "metadata": metadata }) current_chunk = para else: current_chunk += "\n\n" + para if current_chunk else para if current_chunk: chunks.append({ "content": current_chunk.strip(), "title": section_title, "section_level": section_level, "chunk_type": "paragraph_group", "metadata": metadata }) else: chunks.append({ "content": section.strip(), "title": section_title, "section_level": section_level, "chunk_type": "section", "metadata": metadata }) return chunks def process_single_document(self, file_path: Path) -> List[str]: """处理单个文档""" print(f"\n📄 处理文档: {file_path.name}") try: # 读取文档内容 with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # 提取元数据 metadata = self.extract_metadata(content, file_path.name) print(f" 📊 元数据: {metadata['category']} | {len(metadata['tags'])}个标签 | {metadata['sections']}个章节") # 智能分块 chunks = self.smart_chunk_document(content, metadata) print(f" 🔪 分块结果: {len(chunks)}个块") # 添加到MemOS memory_ids = [] for i, chunk in enumerate(chunks): # 构建记忆内容 memory_content = f"【{metadata['title']} - {chunk['title']}】\n\n{chunk['content']}" # 构建标签 chunk_tags = metadata['tags'].copy() chunk_tags.extend([ metadata['category'], f"文档:{metadata['title'][:20]}", f"章节:{chunk['title'][:20]}" ]) # 添加记忆 memory_id = self.memos.add_memory( content=memory_content, tags=chunk_tags ) memory_ids.append(memory_id) if (i + 1) % 5 == 0: print(f" ✅ 已添加 {i+1}/{len(chunks)} 个记忆块") print(f" 🎉 完成! 添加了 {len(memory_ids)} 个记忆") return memory_ids except Exception as e: print(f" ❌ 处理失败: {e}") return [] def import_all_documents(self) -> Dict[str, any]: """导入所有文档""" print("🚀 开始导入飞书文档合集到MemOS") print("=" * 60) # 获取所有Markdown文件 md_files = list(self.docs_path.glob("*.md")) print(f"📚 发现 {len(md_files)} 个Markdown文档") results = { "total_files": len(md_files), "processed_files": 0, "total_memories": 0, "failed_files": [], "file_details": {} } # 处理每个文档 for file_path in md_files: memory_ids = self.process_single_document(file_path) if memory_ids: results["processed_files"] += 1 results["total_memories"] += len(memory_ids) results["file_details"][file_path.name] = { "memory_count": len(memory_ids), "memory_ids": memory_ids } else: results["failed_files"].append(file_path.name) return results def print_summary(self, results: Dict): """打印导入总结""" print("\n" + "=" * 60) print("📊 导入总结") print("=" * 60) print(f"📁 总文档数: {results['total_files']}") print(f"✅ 成功处理: {results['processed_files']}") print(f"❌ 失败文档: {len(results['failed_files'])}") print(f"💾 总记忆数: {results['total_memories']}") if results['failed_files']: print(f"\n❌ 失败文档列表:") for filename in results['failed_files']: print(f" - {filename}") print(f"\n📈 平均每文档记忆数: {results['total_memories'] / max(results['processed_files'], 1):.1f}") # 显示前5个文档的详情 print(f"\n📄 文档详情 (前5个):") for i, (filename, details) in enumerate(list(results['file_details'].items())[:5]): print(f" {i+1}. {filename}: {details['memory_count']}个记忆") def main(): """主函数""" docs_path = "/home/qqinshu/视频/飞书文档合集" if not Path(docs_path).exists(): print(f"❌ 文档路径不存在: {docs_path}") return # 创建导入器 importer = FeishuDocImporter(docs_path) # 导入所有文档 results = importer.import_all_documents() # 打印总结 importer.print_summary(results) print(f"\n🎉 导入完成! 现在可以通过MemOS搜索飞书MCP相关知识了!") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server