Skip to main content
Glama

build_communities_v1

Analyzes paper-entity relationships to identify research topic communities using graph clustering algorithms, helping researchers discover thematic connections in academic literature.

Instructions

构建主题社区

从 Paper->Entity 关系构建共现图,使用 Leiden 算法聚类。

Args: level: 社区层级,"macro" 或 "micro" min_df: 节点至少出现在 N 篇 paper,默认 3 resolution: Leiden 分辨率参数,默认 1.0 max_nodes: 最大节点数,默认 20000 rebuild: 是否重建(清除同 level 旧结果),默认 False

Returns: 社区列表,每个包含 comm_id、大小和 top entities

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
levelNomacro
min_dfNo
resolutionNo
max_nodesNo
rebuildNo

Implementation Reference

  • The @mcp.tool()-decorated function build_communities_v1 that implements the core logic: builds co-occurrence graph from paper-entity relations, filters nodes by document frequency, applies Leiden clustering with igraph/leidenalg, and stores communities/members in the database.
    @mcp.tool()
    def build_communities_v1(
        level: str = "macro",
        min_df: int = 3,
        resolution: float = 1.0,
        max_nodes: int = 20000,
        rebuild: bool = False,
    ) -> dict[str, Any]:
        """构建主题社区
        
        从 Paper->Entity 关系构建共现图,使用 Leiden 算法聚类。
        
        Args:
            level: 社区层级,"macro" 或 "micro"
            min_df: 节点至少出现在 N 篇 paper,默认 3
            resolution: Leiden 分辨率参数,默认 1.0
            max_nodes: 最大节点数,默认 20000
            rebuild: 是否重建(清除同 level 旧结果),默认 False
            
        Returns:
            社区列表,每个包含 comm_id、大小和 top entities
        """
        try:
            # 尝试导入社区发现库
            try:
                import igraph as ig
                import leidenalg
            except ImportError:
                return BuildCommunitiesOut(
                    error=MCPErrorModel(
                        code="DEPENDENCY_ERROR",
                        message="igraph and leidenalg are required. Install with: pip install igraph leidenalg"
                    ),
                ).model_dump()
            
            with get_db() as conn:
                # 如果 rebuild,先清除旧结果
                if rebuild:
                    with conn.cursor() as cur:
                        cur.execute(
                            """
                            DELETE FROM community_members 
                            WHERE comm_id IN (SELECT comm_id FROM communities WHERE level = %s)
                            """,
                            (level,)
                        )
                        cur.execute(
                            """
                            DELETE FROM community_summaries 
                            WHERE comm_id IN (SELECT comm_id FROM communities WHERE level = %s)
                            """,
                            (level,)
                        )
                        cur.execute(
                            "DELETE FROM communities WHERE level = %s",
                            (level,)
                        )
                
                # 1. 获取 Paper->Entity 关系
                relations = query_all(
                    """
                    SELECT 
                        p.entity_id AS paper_eid,
                        p.canonical_key AS doc_id,
                        x.entity_id AS node_eid,
                        x.type AS node_type,
                        x.canonical_name
                    FROM relations r
                    JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper'
                    JOIN entities x ON x.entity_id = r.obj_entity_id
                    WHERE r.predicate IN (
                        'PAPER_HAS_TOPIC', 'PAPER_USES_MEASURE', 'PAPER_IDENTIFIES_WITH',
                        'PAPER_IN_SETTING', 'PAPER_USES_DATA'
                    )
                    AND x.type = ANY(%s)
                    """,
                    (COMMUNITY_ENTITY_TYPES,)
                )
                
                if not relations:
                    return BuildCommunitiesOut(
                        communities=[],
                        error=MCPErrorModel(code="NOT_FOUND", message="No Paper->Entity relations found"),
                    ).model_dump()
                
                # 2. 计算节点 document frequency
                paper_to_nodes: dict[str, set[int]] = defaultdict(set)
                node_info: dict[int, dict] = {}
                
                for r in relations:
                    paper_to_nodes[r["doc_id"]].add(r["node_eid"])
                    if r["node_eid"] not in node_info:
                        node_info[r["node_eid"]] = {
                            "entity_id": r["node_eid"],
                            "type": r["node_type"],
                            "canonical_name": r["canonical_name"],
                        }
                
                # 计算 df
                node_df: dict[int, int] = defaultdict(int)
                for doc_id, nodes in paper_to_nodes.items():
                    for node_id in nodes:
                        node_df[node_id] += 1
                
                # 3. 过滤低频节点
                valid_nodes = {nid for nid, df in node_df.items() if df >= min_df}
                
                if not valid_nodes:
                    return BuildCommunitiesOut(
                        communities=[],
                        error=MCPErrorModel(
                            code="NOT_FOUND",
                            message=f"No nodes with df >= {min_df}"
                        ),
                    ).model_dump()
                
                # 限制节点数量
                if len(valid_nodes) > max_nodes:
                    # 保留 df 最高的节点
                    sorted_nodes = sorted(valid_nodes, key=lambda x: node_df[x], reverse=True)
                    valid_nodes = set(sorted_nodes[:max_nodes])
                
                # 4. 构建共现边
                edge_counts: dict[tuple[int, int], float] = defaultdict(float)
                
                for doc_id, nodes in paper_to_nodes.items():
                    valid_doc_nodes = [n for n in nodes if n in valid_nodes]
                    # 两两配对
                    for i, n1 in enumerate(valid_doc_nodes):
                        for n2 in valid_doc_nodes[i+1:]:
                            if n1 < n2:
                                key = (n1, n2)
                            else:
                                key = (n2, n1)
                            
                            # 获取边权重
                            weight = get_edge_weight(
                                node_info[n1]["type"],
                                node_info[n2]["type"]
                            )
                            edge_counts[key] += weight
                
                if not edge_counts:
                    return BuildCommunitiesOut(
                        communities=[],
                        error=MCPErrorModel(code="NOT_FOUND", message="No edges found"),
                    ).model_dump()
                
                # 5. 构建 igraph 图
                # 创建节点映射
                node_list = sorted(valid_nodes)
                node_to_idx = {nid: idx for idx, nid in enumerate(node_list)}
                
                edges = []
                weights = []
                for (n1, n2), w in edge_counts.items():
                    edges.append((node_to_idx[n1], node_to_idx[n2]))
                    weights.append(w)
                
                g = ig.Graph(n=len(node_list), edges=edges, directed=False)
                g.es["weight"] = weights
                
                # 6. Leiden 聚类
                partition = leidenalg.find_partition(
                    g,
                    leidenalg.RBConfigurationVertexPartition,
                    weights="weight",
                    resolution_parameter=resolution,
                )
                
                # 7. 写入数据库
                communities_result: list[CommunityBrief] = []
                
                # 收集每个社区的成员
                community_members_map: dict[int, list[tuple[int, float]]] = defaultdict(list)
                
                for node_idx, comm_idx in enumerate(partition.membership):
                    node_id = node_list[node_idx]
                    # 使用 df 作为权重
                    weight = float(node_df[node_id])
                    community_members_map[comm_idx].append((node_id, weight))
                
                # 写入社区
                with conn.cursor() as cur:
                    for comm_idx, members in community_members_map.items():
                        if len(members) < 2:  # 跳过太小的社区
                            continue
                        
                        # 创建社区
                        cur.execute(
                            """
                            INSERT INTO communities(level, method, params)
                            VALUES (%s, 'leiden', %s::jsonb)
                            RETURNING comm_id
                            """,
                            (level, json.dumps({
                                "resolution": resolution,
                                "min_df": min_df,
                                "original_community_idx": comm_idx,
                            }))
                        )
                        result = cur.fetchone()
                        comm_id = result["comm_id"]
                        
                        # 写入成员
                        for node_id, weight in members:
                            cur.execute(
                                """
                                INSERT INTO community_members(comm_id, entity_id, role, weight)
                                VALUES (%s, %s, 'member', %s)
                                """,
                                (comm_id, node_id, weight)
                            )
                        
                        # 排序获取 top entities
                        sorted_members = sorted(members, key=lambda x: x[1], reverse=True)
                        top_entities = []
                        for node_id, weight in sorted_members[:20]:
                            info = node_info.get(node_id, {})
                            top_entities.append({
                                "entity_id": node_id,
                                "type": info.get("type", ""),
                                "canonical_name": info.get("canonical_name", ""),
                                "weight": weight,
                            })
                        
                        communities_result.append(CommunityBrief(
                            comm_id=comm_id,
                            size=len(members),
                            top_entities=top_entities,
                        ))
                
                # 按大小排序
                communities_result.sort(key=lambda x: x.size, reverse=True)
                
                return BuildCommunitiesOut(
                    communities=communities_result,
                ).model_dump()
                
        except Exception as e:
            return BuildCommunitiesOut(
                error=MCPErrorModel(code="DB_CONN_ERROR", message=str(e)),
            ).model_dump()
  • Pydantic models defining input schema (BuildCommunitiesIn with params like level, min_df, resolution), supporting CommunityBrief, and output schema BuildCommunitiesOut.
    # build_communities_v1 工具模型
    # ============================================================
    
    
    class BuildCommunitiesIn(BaseModel):
        """build_communities_v1 输入"""
        level: Literal["macro", "micro"] = "macro"
        min_df: int = 3                         # 节点至少出现在 N 篇 paper
        resolution: float = 1.0                 # Leiden resolution
        max_nodes: int = 20000
        rebuild: bool = False                   # 是否清除同 level 旧结果
    
    
    class CommunityBrief(BaseModel):
        """社区简要信息"""
        comm_id: int
        size: int
        top_entities: list[dict[str, Any]]      # [{entity_id, type, canonical_name, weight}, ...]
    
    
    class BuildCommunitiesOut(BaseModel):
        """build_communities_v1 输出"""
        communities: list[CommunityBrief] = Field(default_factory=list)
        error: Optional[MCPErrorModel] = None
  • Top-level registration call register_graph_community_tools(mcp) in the MCP server entrypoint, which defines and registers the tool.
    register_graph_community_tools(mcp)
  • Helper constants (COMMUNITY_ENTITY_TYPES, EDGE_WEIGHTS) and get_edge_weight function used for graph construction and weighting.
    # v1 参与社区构建的实体类型
    COMMUNITY_ENTITY_TYPES = [
        EntityType.Topic.value,
        EntityType.MeasureProxy.value,
        EntityType.IdentificationStrategy.value,
        EntityType.Setting.value,
        EntityType.DataSource.value,
    ]
    
    # 边权重配置
    EDGE_WEIGHTS = {
        (EntityType.Topic.value, EntityType.MeasureProxy.value): 2.0,
        (EntityType.Topic.value, EntityType.IdentificationStrategy.value): 2.0,
        (EntityType.MeasureProxy.value, EntityType.IdentificationStrategy.value): 1.5,
        (EntityType.Topic.value, EntityType.Setting.value): 1.0,
        (EntityType.IdentificationStrategy.value, EntityType.Setting.value): 1.0,
    }
    
    
    def get_edge_weight(type1: str, type2: str) -> float:
        """获取两种实体类型之间的边权重"""
        key = (type1, type2)
        if key in EDGE_WEIGHTS:
            return EDGE_WEIGHTS[key]
        key = (type2, type1)
        if key in EDGE_WEIGHTS:
            return EDGE_WEIGHTS[key]
        return 1.0
  • The register_graph_community_tools function definition that uses @mcp.tool() decorator to register build_communities_v1.
    def register_graph_community_tools(mcp: FastMCP) -> None:
        """注册 GraphRAG 社区构建工具"""

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server