summarize_all_communities
Generate summaries for academic communities in bulk or parallel processing. Specify community levels, control concurrency, and optionally target specific communities for automated literature review generation.
Instructions
批量/并行生成社区摘要
Args: level: 社区层级,"macro" 或 "micro"(或整数 1/2) concurrency: 并发数,默认 5 comm_ids: 指定社区 ID 列表 force: 是否强制重新生成
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| level | No | ||
| concurrency | No | ||
| comm_ids | No | ||
| force | No |
Implementation Reference
- Core implementation of the summarize_all_communities tool. Normalizes input level, queries target community IDs (filtering by level and whether already summarized unless force), limits concurrency with semaphore, processes each community by calling summarize_community_v1_run in parallel using asyncio.gather, and returns summary statistics.async def summarize_all_communities_run( level: str | int | None = None, concurrency: int = 5, comm_ids: list[int] | None = None, force: bool = False, ) -> dict[str, Any]: """批量/并行生成社区摘要 (Core Implementation)""" # Normalize level to string "macro" or "micro" to match DB column type (TEXT) level_str: str | None = None if level is not None: l_lower = str(level).lower() if l_lower in ["macro", "1"]: level_str = "macro" elif l_lower in ["micro", "2"]: level_str = "micro" else: level_str = l_lower # 1. 确定要处理的 communities if comm_ids: target_ids = comm_ids else: query = "SELECT comm_id FROM communities WHERE 1=1" params = [] if level_str is not None: query += " AND level = %s" params.append(level_str) if not force: query += " AND NOT EXISTS (SELECT 1 FROM community_summaries s WHERE s.comm_id = communities.comm_id)" rows = query_all(query, tuple(params)) target_ids = [r["comm_id"] for r in rows] if not target_ids: return {"message": "No communities to summarize", "count": 0} # 2. 并行执行 sem = asyncio.Semaphore(concurrency) async def process_one(cid): async with sem: res = await summarize_community_v1_run(comm_id=cid) return { "comm_id": cid, "success": not res.get("error"), "error": res.get("error") } tasks = [process_one(cid) for cid in target_ids] results = await asyncio.gather(*tasks) success_count = sum(1 for r in results if r["success"]) failed_count = len(results) - success_count return { "total": len(target_ids), "success": success_count, "failed": failed_count, "details": results[:20] # 返回前 20 个结果 }
- src/paperlib_mcp/tools/graph_summarize.py:510-530 (registration)MCP tool registration via @mcp.tool() decorator. Defines the tool interface with type-hinted parameters and docstring, acting as a thin wrapper that delegates execution to summarize_all_communities_run.@mcp.tool() async def summarize_all_communities( level: str | None = None, concurrency: int = 5, comm_ids: list[int] | None = None, force: bool = False, ) -> dict[str, Any]: """批量/并行生成社区摘要 Args: level: 社区层级,"macro" 或 "micro"(或整数 1/2) concurrency: 并发数,默认 5 comm_ids: 指定社区 ID 列表 force: 是否强制重新生成 """ return await summarize_all_communities_run( level=level, concurrency=concurrency, comm_ids=comm_ids, force=force )
- src/paperlib_mcp/server.py:43-43 (registration)Top-level registration call in the main MCP server setup that invokes the module's register function to add the summarize_all_communities tool (among others) to the FastMCP instance.register_graph_summarize_tools(mcp)