cluster_sync_check
Assess cluster sync health by reviewing capabilities, sharing tags, and topology. Answer whether replication is healthy across all nodes in a single call.
Instructions
Report cluster sync health: capabilities, sharing tags, topology.
Combines clusterer_list, clusterer_list_cap and
clusterer_list_shtags into one view so an operator can answer
"is replication healthy across all nodes?" from a single call.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| cluster_id | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- The main handler function for the 'cluster_sync_check' tool. It's an async function decorated with @mcp.tool() and @require_permission('mi.read'). It queries four MI commands (clusterer_list, clusterer_list_cap, clusterer_list_shtags, clusterer_list_topology) and derives a health signal by checking node statuses.
@mcp.tool() @require_permission("mi.read") async def cluster_sync_check(ctx: Context, cluster_id: int) -> dict[str, Any]: """Report cluster sync health: capabilities, sharing tags, topology. Combines ``clusterer_list``, ``clusterer_list_cap`` and ``clusterer_list_shtags`` into one view so an operator can answer "is replication healthy across all nodes?" from a single call. """ app = ctx.request_context.lifespan_context out: dict[str, Any] = {"cluster_id": cluster_id} for field, cmd in ( ("nodes", "clusterer_list"), ("capabilities", "clusterer_list_cap"), ("sharing_tags", "clusterer_list_shtags"), ("topology", "clusterer_list_topology"), ): try: out[field] = await app.mi_client.execute(cmd) except Exception as exc: out[field] = {"error": str(exc)} # Derive a simple health signal: any non-OK node status is a red flag. unhealthy_nodes: list[dict[str, Any]] = [] nodes = out.get("nodes") if isinstance(nodes, dict): for v in nodes.values(): if isinstance(v, list): for n in v: if isinstance(n, dict): status = n.get("Status") or n.get("status") if status not in (None, "OK", "Active", _STATUS_OK, "1", 1): unhealthy_nodes.append(n) out["unhealthy_nodes"] = unhealthy_nodes out["healthy"] = not unhealthy_nodes return out - src/opensips_mcp/tools/cluster_tools.py:205-206 (registration)Registration of the cluster_sync_check tool via the @mcp.tool() decorator, which registers it as an MCP tool.
@mcp.tool() @require_permission("mi.read") - The @require_permission('mi.read') decorator serves as an access control/permission schema entry point. The function signature defines the input schema: ctx (Context) and cluster_id (int).
@require_permission("mi.read")