"""FastMCP server entrypoint and tool registration."""
from __future__ import annotations
import functools
import inspect
import logging
from typing import Any
from fastmcp import FastMCP
from mcp.types import ToolAnnotations
from .errors import SonarQubeError, error_response
from .settings import SonarQubeSettings, load_settings
from .sonarqube_client import SonarQubeClient
logger = logging.getLogger(__name__)
READ_ONLY_ANNOTATIONS = ToolAnnotations(
readOnlyHint=True,
idempotentHint=True,
openWorldHint=False,
destructiveHint=False,
)
DEFAULT_METRIC_KEYS = ",".join([
"bugs",
"vulnerabilities",
"code_smells",
"security_hotspots",
"coverage",
"duplicated_lines_density",
"ncloc",
"sqale_index",
"reliability_rating",
"security_rating",
"sqale_rating",
"alert_status",
"quality_gate_details",
])
VALID_SEVERITIES: frozenset[str] = frozenset(
{"INFO", "MINOR", "MAJOR", "CRITICAL", "BLOCKER"}
)
VALID_ISSUE_TYPES: frozenset[str] = frozenset(
{"CODE_SMELL", "BUG", "VULNERABILITY", "SECURITY_HOTSPOT"}
)
VALID_STATUSES: frozenset[str] = frozenset(
{"OPEN", "CONFIRMED", "REOPENED", "RESOLVED", "CLOSED"}
)
def _validate_csv_enum(
value: str | None, allowed: frozenset[str], label: str
) -> None:
"""Raise SonarQubeError if any CSV token is not in the allowed set."""
if not value:
return
bad = [v for v in value.split(",") if v.strip() not in allowed]
if bad:
raise SonarQubeError(
code="invalid_input",
message=f"Invalid {label}: {', '.join(bad)}. "
f"Allowed: {', '.join(sorted(allowed))}",
)
def _safe_tool(func: Any) -> Any:
"""Wrap a tool function so it never raises — returns error envelope instead."""
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> dict[str, Any]:
try:
return func(*args, **kwargs)
except SonarQubeError as exc:
return error_response(exc)
except Exception as exc:
logger.exception("Unexpected tool error in %s", func.__name__)
return error_response(
SonarQubeError(
code="internal_error",
message="Unexpected server error.",
details={"error": str(exc)},
)
)
wrapper.__signature__ = inspect.signature(func)
return wrapper
def _clamp(value: int, minimum: int, maximum: int) -> int:
"""Bound a value within [minimum, maximum]."""
return max(minimum, min(value, maximum))
def create_server(settings: SonarQubeSettings | None = None) -> FastMCP:
"""Create configured SonarQube MCP server."""
runtime_settings = settings or load_settings()
client = SonarQubeClient(runtime_settings)
mcp = FastMCP(
name="sonarqube-mcp-server",
instructions=(
"SonarQube MCP server. All tools are read-only. "
"Use check_status to verify connectivity, then explore projects, "
"issues, metrics, and rules."
),
)
# ------------------------------------------------------------------
# Tool 1: check_status
# ------------------------------------------------------------------
@mcp.tool(annotations=READ_ONLY_ANNOTATIONS)
@_safe_tool
def check_status() -> dict[str, Any]:
"""Verify SonarQube connectivity and return server version/status."""
result = client.get_system_status()
return {
"ok": True,
"server_url": runtime_settings.sonarqube_url,
"status": result.get("status"),
"version": result.get("version"),
}
# ------------------------------------------------------------------
# Tool 2: list_projects
# ------------------------------------------------------------------
@mcp.tool(annotations=READ_ONLY_ANNOTATIONS)
@_safe_tool
def list_projects(
query: str | None = None,
page: int = 1,
page_size: int = 20,
) -> dict[str, Any]:
"""List or search SonarQube projects with pagination.
Args:
query: Optional search string to filter project names/keys.
page: Page number (1-indexed).
page_size: Results per page (1–500).
"""
result = client.search_projects(
query=query,
page=max(1, page),
page_size=_clamp(page_size, 1, 500),
)
paging = result.get("paging", {})
return {
"ok": True,
"total": paging.get("total", 0),
"page": paging.get("pageIndex", page),
"page_size": paging.get("pageSize", page_size),
"projects": [
{
"key": c.get("key"),
"name": c.get("name"),
"qualifier": c.get("qualifier"),
}
for c in result.get("components", [])
],
}
# ------------------------------------------------------------------
# Tool 3: search_issues
# ------------------------------------------------------------------
@mcp.tool(annotations=READ_ONLY_ANNOTATIONS)
@_safe_tool
def search_issues(
project_key: str | None = None,
severities: str | None = None,
types: str | None = None,
statuses: str | None = None,
tags: str | None = None,
assigned: bool | None = None,
page: int = 1,
page_size: int = 20,
) -> dict[str, Any]:
"""Search SonarQube issues with filters.
Args:
project_key: Project key to scope issues.
severities: Comma-separated: INFO,MINOR,MAJOR,CRITICAL,BLOCKER.
types: Comma-separated: CODE_SMELL,BUG,VULNERABILITY,SECURITY_HOTSPOT.
statuses: Comma-separated: OPEN,CONFIRMED,REOPENED,RESOLVED,CLOSED.
tags: Comma-separated tag names.
assigned: Filter by assignment (true=assigned, false=unassigned).
page: Page number (1-indexed).
page_size: Results per page (1–500).
"""
_validate_csv_enum(severities, VALID_SEVERITIES, "severities")
_validate_csv_enum(types, VALID_ISSUE_TYPES, "types")
_validate_csv_enum(statuses, VALID_STATUSES, "statuses")
result = client.search_issues(
project_key=project_key,
severities=severities,
types=types,
statuses=statuses,
tags=tags,
assigned=assigned,
page=max(1, page),
page_size=_clamp(page_size, 1, 500),
)
paging = result.get("paging", {})
issues = [
{
"key": i.get("key"),
"rule": i.get("rule"),
"severity": i.get("severity"),
"component": i.get("component"),
"project": i.get("project"),
"line": i.get("line"),
"status": i.get("status"),
"message": i.get("message"),
"type": i.get("type"),
"tags": i.get("tags", []),
"creation_date": i.get("creationDate"),
}
for i in result.get("issues", [])
]
return {
"ok": True,
"total": paging.get("total", 0),
"page": paging.get("pageIndex", page),
"page_size": paging.get("pageSize", page_size),
"issues": issues,
}
# ------------------------------------------------------------------
# Tool 4: get_issue
# ------------------------------------------------------------------
@mcp.tool(annotations=READ_ONLY_ANNOTATIONS)
@_safe_tool
def get_issue(issue_key: str) -> dict[str, Any]:
"""Get detailed information for a single SonarQube issue by key.
Args:
issue_key: The issue key (e.g., AXy1k...).
"""
if not issue_key:
raise SonarQubeError(
code="invalid_input", message="issue_key is required."
)
result = client.get_issue(issue_key)
issue = result["issues"][0]
components = {
c["key"]: c for c in result.get("components", [])
}
rules = {r["key"]: r for r in result.get("rules", [])}
component_info = components.get(issue.get("component"), {})
rule_info = rules.get(issue.get("rule"), {})
return {
"ok": True,
"issue": {
"key": issue.get("key"),
"rule": issue.get("rule"),
"rule_name": rule_info.get("name"),
"severity": issue.get("severity"),
"component": issue.get("component"),
"component_name": component_info.get("name"),
"project": issue.get("project"),
"line": issue.get("line"),
"text_range": issue.get("textRange"),
"status": issue.get("status"),
"resolution": issue.get("resolution"),
"message": issue.get("message"),
"type": issue.get("type"),
"tags": issue.get("tags", []),
"effort": issue.get("effort"),
"debt": issue.get("debt"),
"author": issue.get("author"),
"assignee": issue.get("assignee"),
"creation_date": issue.get("creationDate"),
"update_date": issue.get("updateDate"),
"comments": issue.get("comments", []),
"flows": issue.get("flows", []),
},
}
# ------------------------------------------------------------------
# Tool 5: get_project_metrics
# ------------------------------------------------------------------
@mcp.tool(annotations=READ_ONLY_ANNOTATIONS)
@_safe_tool
def get_project_metrics(
project_key: str,
metric_keys: str | None = None,
) -> dict[str, Any]:
"""Get quality metrics for a SonarQube project.
Args:
project_key: The project key.
metric_keys: Comma-separated metric keys. Defaults to a standard
quality dashboard set (bugs, coverage, smells, ratings, etc.).
"""
if not project_key:
raise SonarQubeError(
code="invalid_input", message="project_key is required."
)
keys = metric_keys or DEFAULT_METRIC_KEYS
result = client.get_measures(project_key, keys)
component = result.get("component", {})
measures = {
m["metric"]: m.get("value")
for m in component.get("measures", [])
}
return {
"ok": True,
"project_key": component.get("key", project_key),
"project_name": component.get("name"),
"metrics": measures,
}
# ------------------------------------------------------------------
# Tool 6: get_rule
# ------------------------------------------------------------------
@mcp.tool(annotations=READ_ONLY_ANNOTATIONS)
@_safe_tool
def get_rule(rule_key: str) -> dict[str, Any]:
"""Get the description and metadata for a SonarQube rule.
Args:
rule_key: Rule key (e.g., python:S1192, java:S106).
"""
if not rule_key:
raise SonarQubeError(
code="invalid_input", message="rule_key is required."
)
result = client.get_rule(rule_key)
rule = result.get("rule", {})
return {
"ok": True,
"rule": {
"key": rule.get("key"),
"name": rule.get("name"),
"severity": rule.get("severity"),
"type": rule.get("type"),
"lang": rule.get("lang"),
"lang_name": rule.get("langName"),
"status": rule.get("status"),
"tags": rule.get("tags", []),
"system_tags": rule.get("sysTags", []),
"html_description": rule.get("htmlDesc"),
"markdown_description": rule.get("mdDesc"),
"debt_remediation": rule.get("debtRemFnCoeff"),
},
}
return mcp
def main() -> None:
"""Run the SonarQube MCP server."""
logging.basicConfig(level=logging.WARNING)
logging.getLogger("sonarqube_mcp").setLevel(logging.INFO)
create_server().run()
if __name__ == "__main__":
main()