import_feishu_docs.py•9.96 kB
#!/usr/bin/env python3
"""
飞书文档合集导入MemOS脚本
智能处理Markdown文档,自动分块和标签化
"""
import os
import sys
import re
from pathlib import Path
from typing import List, Dict, Tuple
# 添加src路径
sys.path.insert(0, str(Path(__file__).parent / "src"))
from memos.configs.embedder import EmbedderConfigFactory
from memos.embedders.factory import EmbedderFactory
from usage_examples import load_env_file
class FeishuDocImporter:
"""飞书文档导入器"""
def __init__(self, docs_path: str, memos_data_path: str = "./feishu_memos_data"):
self.docs_path = Path(docs_path)
self.memos_data_path = memos_data_path
# 加载环境变量
load_env_file()
# 初始化MemOS(使用简化版本)
from advanced_examples import AdvancedMemOS
self.memos = AdvancedMemOS(self.memos_data_path)
print(f"📁 文档路径: {self.docs_path}")
print(f"💾 MemOS数据路径: {self.memos_data_path}")
def extract_metadata(self, content: str, filename: str) -> Dict[str, any]:
"""提取文档元数据"""
metadata = {
"filename": filename,
"title": "",
"category": "",
"tags": [],
"sections": 0,
"length": len(content)
}
# 提取标题
title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
if title_match:
metadata["title"] = title_match.group(1).strip()
else:
metadata["title"] = filename.replace('.md', '').replace('-', ' ')
# 分类识别
if "MCP" in filename:
metadata["category"] = "MCP技术"
elif "知识片段库" in filename:
metadata["category"] = "知识库"
elif "配置" in filename or "指南" in filename:
metadata["category"] = "配置指南"
elif "问题" in filename or "解决" in filename:
metadata["category"] = "问题解决"
else:
metadata["category"] = "技术文档"
# 提取标签
tags = set()
# 从文件名提取标签
if "飞书" in filename:
tags.add("飞书")
if "MCP" in filename:
tags.add("MCP")
if "OAuth" in filename:
tags.add("OAuth")
if "配置" in filename:
tags.add("配置")
if "问题" in filename:
tags.add("问题解决")
if "指南" in filename:
tags.add("指南")
# 从内容提取标签
if "token" in content.lower():
tags.add("token")
if "权限" in content:
tags.add("权限")
if "api" in content.lower():
tags.add("API")
if "工具" in content:
tags.add("工具")
metadata["tags"] = list(tags)
# 统计章节数
metadata["sections"] = len(re.findall(r'^#{1,6}\s+', content, re.MULTILINE))
return metadata
def smart_chunk_document(self, content: str, metadata: Dict) -> List[Dict]:
"""智能分块文档"""
chunks = []
# 按章节分块
sections = re.split(r'\n(?=#{1,6}\s+)', content)
for i, section in enumerate(sections):
if not section.strip():
continue
# 提取章节标题
title_match = re.search(r'^(#{1,6})\s+(.+)$', section, re.MULTILINE)
section_title = title_match.group(2).strip() if title_match else f"Section {i+1}"
section_level = len(title_match.group(1)) if title_match else 1
# 如果章节太长,进一步分块
if len(section) > 2000:
# 按段落分块
paragraphs = section.split('\n\n')
current_chunk = ""
for para in paragraphs:
if len(current_chunk + para) > 1500:
if current_chunk:
chunks.append({
"content": current_chunk.strip(),
"title": section_title,
"section_level": section_level,
"chunk_type": "paragraph_group",
"metadata": metadata
})
current_chunk = para
else:
current_chunk += "\n\n" + para if current_chunk else para
if current_chunk:
chunks.append({
"content": current_chunk.strip(),
"title": section_title,
"section_level": section_level,
"chunk_type": "paragraph_group",
"metadata": metadata
})
else:
chunks.append({
"content": section.strip(),
"title": section_title,
"section_level": section_level,
"chunk_type": "section",
"metadata": metadata
})
return chunks
def process_single_document(self, file_path: Path) -> List[str]:
"""处理单个文档"""
print(f"\n📄 处理文档: {file_path.name}")
try:
# 读取文档内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 提取元数据
metadata = self.extract_metadata(content, file_path.name)
print(f" 📊 元数据: {metadata['category']} | {len(metadata['tags'])}个标签 | {metadata['sections']}个章节")
# 智能分块
chunks = self.smart_chunk_document(content, metadata)
print(f" 🔪 分块结果: {len(chunks)}个块")
# 添加到MemOS
memory_ids = []
for i, chunk in enumerate(chunks):
# 构建记忆内容
memory_content = f"【{metadata['title']} - {chunk['title']}】\n\n{chunk['content']}"
# 构建标签
chunk_tags = metadata['tags'].copy()
chunk_tags.extend([
metadata['category'],
f"文档:{metadata['title'][:20]}",
f"章节:{chunk['title'][:20]}"
])
# 添加记忆
memory_id = self.memos.add_memory(
content=memory_content,
tags=chunk_tags
)
memory_ids.append(memory_id)
if (i + 1) % 5 == 0:
print(f" ✅ 已添加 {i+1}/{len(chunks)} 个记忆块")
print(f" 🎉 完成! 添加了 {len(memory_ids)} 个记忆")
return memory_ids
except Exception as e:
print(f" ❌ 处理失败: {e}")
return []
def import_all_documents(self) -> Dict[str, any]:
"""导入所有文档"""
print("🚀 开始导入飞书文档合集到MemOS")
print("=" * 60)
# 获取所有Markdown文件
md_files = list(self.docs_path.glob("*.md"))
print(f"📚 发现 {len(md_files)} 个Markdown文档")
results = {
"total_files": len(md_files),
"processed_files": 0,
"total_memories": 0,
"failed_files": [],
"file_details": {}
}
# 处理每个文档
for file_path in md_files:
memory_ids = self.process_single_document(file_path)
if memory_ids:
results["processed_files"] += 1
results["total_memories"] += len(memory_ids)
results["file_details"][file_path.name] = {
"memory_count": len(memory_ids),
"memory_ids": memory_ids
}
else:
results["failed_files"].append(file_path.name)
return results
def print_summary(self, results: Dict):
"""打印导入总结"""
print("\n" + "=" * 60)
print("📊 导入总结")
print("=" * 60)
print(f"📁 总文档数: {results['total_files']}")
print(f"✅ 成功处理: {results['processed_files']}")
print(f"❌ 失败文档: {len(results['failed_files'])}")
print(f"💾 总记忆数: {results['total_memories']}")
if results['failed_files']:
print(f"\n❌ 失败文档列表:")
for filename in results['failed_files']:
print(f" - {filename}")
print(f"\n📈 平均每文档记忆数: {results['total_memories'] / max(results['processed_files'], 1):.1f}")
# 显示前5个文档的详情
print(f"\n📄 文档详情 (前5个):")
for i, (filename, details) in enumerate(list(results['file_details'].items())[:5]):
print(f" {i+1}. {filename}: {details['memory_count']}个记忆")
def main():
"""主函数"""
docs_path = "/home/qqinshu/视频/飞书文档合集"
if not Path(docs_path).exists():
print(f"❌ 文档路径不存在: {docs_path}")
return
# 创建导入器
importer = FeishuDocImporter(docs_path)
# 导入所有文档
results = importer.import_all_documents()
# 打印总结
importer.print_summary(results)
print(f"\n🎉 导入完成! 现在可以通过MemOS搜索飞书MCP相关知识了!")
if __name__ == "__main__":
main()