Skip to main content
Glama

Chimera MCP Server

by Hank-coder
manual_sync.py11.8 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Chimera 手动同步脚本 专门用于手动全量同步和维护操作 """ import asyncio import sys import argparse from pathlib import Path # 添加项目根目录到路径 project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) from config.logging import setup_logging from config.settings import get_settings from sync_service.sync_service import SyncService from loguru import logger async def generate_cache(): """生成JSON缓存文件""" import json from pathlib import Path from datetime import datetime from core.graphiti_client import GraphitiClient try: logger.info("📄 生成JSON缓存...") graph_client = GraphitiClient() await graph_client.initialize() # 生成缓存数据 cache_data = await _build_cache_data(graph_client) # 写入缓存文件 cache_dir = Path("llm_cache") cache_dir.mkdir(exist_ok=True) cache_file = cache_dir / "chimera_cache.json" _write_cache_file(cache_file, cache_data) await graph_client.close() logger.info(f"✅ 缓存完成:{cache_data['metadata']['total_pages']} 页面,{cache_data['metadata']['total_paths']} 路径") return True except Exception as e: logger.error(f"❌ 缓存生成失败: {e}") return False async def _build_cache_data(graph_client): """构建缓存数据""" from datetime import datetime # 查询所有页面和关系 query = """ MATCH (p:NotionPage) OPTIONAL MATCH (p)-[:CHILD_OF]->(parent:NotionPage) OPTIONAL MATCH (child:NotionPage)-[:CHILD_OF]->(p) RETURN p { .notionId, .title, .type, .tags, .lastEditedTime, .url, .level } as page, parent.notionId as parent_id, collect(DISTINCT child.notionId) as children_ids """ cache_data = { "generated_at": datetime.now().isoformat(), "pages": {}, "paths": [], "metadata": {"total_pages": 0, "total_paths": 0} } pages_map = {} async with graph_client._driver.session() as session: result = await session.run(query) async for record in result: page = record["page"] parent_id = record["parent_id"] children_ids = record["children_ids"] or [] # 处理DateTime序列化 last_edited = page["lastEditedTime"] if hasattr(last_edited, 'isoformat'): last_edited = last_edited.isoformat() elif last_edited: last_edited = str(last_edited) cache_data["pages"][page["notionId"]] = { "title": page["title"], "type": page["type"], "tags": page["tags"] or [], "lastEditedTime": last_edited, "url": page["url"], "level": page.get("level", 0), "parent_id": parent_id, "children_ids": children_ids } pages_map[page["notionId"]] = cache_data["pages"][page["notionId"]] # 构建路径 cache_data["paths"] = _build_paths(pages_map) cache_data["metadata"]["total_pages"] = len(cache_data["pages"]) cache_data["metadata"]["total_paths"] = len(cache_data["paths"]) return cache_data def _build_paths(pages_map): """构建完整路径""" paths = [] # 找到所有叶子节点 leaf_nodes = [pid for pid, page in pages_map.items() if not page["children_ids"]] for leaf_id in leaf_nodes: # 从叶子节点向上构建路径 path_ids = [] path_titles = [] current_id = leaf_id while current_id and current_id in pages_map: page = pages_map[current_id] path_ids.insert(0, current_id) path_titles.insert(0, page["title"]) current_id = page["parent_id"] if len(path_ids) > 0: path_string = " -> ".join(path_titles) actual_path_length = len(path_ids) - 1 paths.append({ "path_string": path_string, "path_titles": path_titles, "path_ids": path_ids, "leaf_id": leaf_id, "leaf_title": pages_map[leaf_id]["title"], "leaf_last_edited_time": pages_map[leaf_id]["lastEditedTime"], "path_length": actual_path_length }) return paths def _write_cache_file(cache_file, cache_data): """写入缓存文件""" import json def json_encoder(obj): if hasattr(obj, 'isoformat'): return obj.isoformat() elif hasattr(obj, '__dict__'): return str(obj) return obj # 原子写入 temp_file = cache_file.with_suffix('.tmp') with open(temp_file, 'w', encoding='utf-8') as f: json.dump(cache_data, f, ensure_ascii=False, indent=2, default=json_encoder) temp_file.replace(cache_file) async def run_manual_sync(): """运行一次性手动同步""" logger.info("⚡ 执行手动同步...") sync_service = SyncService() try: await sync_service.initialize() success = await sync_service.run_manual_sync() if success: logger.info("✅ 手动同步完成") # 生成JSON缓存 await generate_cache() else: logger.error("❌ 手动同步失败") return False return True finally: await sync_service.stop() async def run_force_full_sync(): """强制执行全量同步(清空Neo4j数据后重新同步)""" logger.info("🧹 强制全量同步:清空Neo4j数据...") from core.graphiti_client import GraphitiClient # 清空Neo4j数据 graph_client = GraphitiClient() try: await graph_client.initialize() # 删除所有NotionPage节点和SyncMetadata clear_queries = [ "MATCH (n:NotionPage) DETACH DELETE n", "MATCH (m:SyncMetadata) DELETE m" ] async with graph_client._driver.session() as session: for query in clear_queries: result = await session.run(query) summary = await result.consume() logger.info(f"清理完成:删除了 {summary.counters.nodes_deleted} 个节点") await graph_client.close() logger.info("🧹 Neo4j数据已清空") except Exception as e: logger.error(f"❌ 清空Neo4j数据失败: {e}") return False # 现在运行同步(将触发全量同步) logger.info("🔄 开始全量同步...") sync_service = SyncService() try: await sync_service.initialize() success = await sync_service.run_manual_sync() if success: logger.info("✅ 强制全量同步完成") # 生成JSON缓存 await generate_cache() return True else: logger.error("❌ 强制全量同步失败") return False finally: await sync_service.stop() async def show_stats(): """显示Neo4j数据库统计信息""" logger.info("📊 检查Neo4j数据库统计...") try: from core.graphiti_client import GraphitiClient graph_client = GraphitiClient() await graph_client.initialize() stats_query = """ CALL { MATCH (p:NotionPage) RETURN count(p) as pages } CALL { MATCH ()-[r:CHILD_OF]->() RETURN count(r) as child_relations } CALL { MATCH ()-[r:HAS_TAG]->() RETURN count(r) as tag_relations } RETURN pages, child_relations, tag_relations """ async with graph_client._driver.session() as session: result = await session.run(stats_query) record = await result.single() if record: logger.info(f"📄 总页面数: {record['pages']}") logger.info(f"🔗 父子关系数: {record['child_relations']}") logger.info(f"🏷️ 标签关系数: {record['tag_relations']}") else: logger.info("📊 未找到统计数据") await graph_client.close() return True except Exception as e: logger.error(f"无法获取统计信息: {e}") return False def main(): """主函数""" parser = argparse.ArgumentParser( description="Chimera 手动同步和维护工具", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: python manual_sync.py # 默认:强制全量同步(清空后重建) python manual_sync.py --sync # 执行一次手动同步 python manual_sync.py --full-sync # 强制全量同步(清空后重建) python manual_sync.py --generate-cache # 仅生成JSON缓存 python manual_sync.py --stats # 显示数据库统计信息 注意: - 不带参数运行时,默认执行强制全量同步 - 手动同步用于测试或维护 - 生产环境应使用 webhook_server.py 进行实时同步 - MCP服务器请单独运行:python fastmcp_server.py """ ) parser.add_argument( "--sync", action="store_true", help="执行一次手动同步" ) parser.add_argument( "--full-sync", action="store_true", help="强制执行全量同步(清空Neo4j数据后重新同步)" ) parser.add_argument( "--generate-cache", action="store_true", help="生成JSON缓存文件" ) parser.add_argument( "--stats", action="store_true", help="显示Neo4j数据库统计信息(包含embedding覆盖率)" ) # 🆕 Embedding相关命令 parser.add_argument( "--generate-embeddings", action="store_true", help="为没有embedding的页面批量生成embedding" ) parser.add_argument( "--test-embedding-search", action="store_true", help="测试embedding搜索功能" ) parser.add_argument( "--debug", action="store_true", help="启用调试模式" ) args = parser.parse_args() # 设置日志 setup_logging() # 显示标题 logger.info("=" * 50) logger.info("🔧 Chimera 手动同步工具") logger.info("=" * 50) # 检查参数 - 如果没有指定参数,默认执行全量同步 if not any([args.sync, args.full_sync, args.generate_cache, args.stats]): logger.info("📄 未指定参数,默认执行强制全量同步") args.full_sync = True # 检查配置 settings = get_settings() if args.debug: logger.info(f"配置: Neo4j URI: {settings.neo4j_uri}") # 根据参数运行相应的功能 success = True try: if args.sync: success = asyncio.run(run_manual_sync()) elif args.full_sync: success = asyncio.run(run_force_full_sync()) elif args.generate_cache: success = asyncio.run(generate_cache()) elif args.stats: success = asyncio.run(show_stats()) except Exception as e: logger.exception(f"程序异常退出: {e}") success = False if success: logger.info("✅ 操作完成") sys.exit(0) else: logger.error("❌ 操作失败") sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Hank-coder/chimera_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server