We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/sarveshkapre/cve-risk-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
from __future__ import annotations
import json
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
from . import __version__
from .clients import CveClient, EpssClient, KevClient
from .errors import StructuredError
from .tools import CveRiskService, handle_tool_call, tool_definitions
@dataclass(slots=True)
class McpServer:
cve_client: CveClient
kev_client: KevClient
epss_client: EpssClient | None = None
def serve(self) -> None:
status_provider = StatusProvider(
cve_client=self.cve_client,
kev_client=self.kev_client,
epss_client=self.epss_client,
)
service = CveRiskService(
self.cve_client,
self.kev_client,
epss_client=self.epss_client,
status_provider=status_provider,
)
for line in sys.stdin:
if not line.strip():
continue
try:
request = json.loads(line)
except json.JSONDecodeError as exc:
self._write_error(None, -32700, f"Invalid JSON: {exc}")
continue
self._handle_request(request, service)
def _handle_request(self, request: dict[str, Any], service: CveRiskService) -> None:
method = request.get("method")
request_id = request.get("id")
if method == "initialize":
params = request.get("params", {})
protocol_version = params.get("protocolVersion", "2025-06-18")
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": protocol_version,
"serverInfo": {
"name": "cve-risk-mcp",
"version": __version__,
},
"capabilities": {
"tools": {
"listChanged": False,
}
},
},
}
self._write(response)
return
if method in {"initialized", "notifications/initialized"}:
return
if method == "tools/list":
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": tool_definitions(),
},
}
self._write(response)
return
if method == "tools/call":
params = request.get("params", {})
name = params.get("name")
arguments = params.get("arguments", {})
if not isinstance(name, str):
self._write_error(request_id, -32602, "Invalid tool name.")
return
if not isinstance(arguments, dict):
self._write_error(request_id, -32602, "Invalid tool arguments.")
return
try:
content = handle_tool_call(name, arguments, service)
except StructuredError as exc:
self._write_error(request_id, -32000, exc.message, data=exc.to_data())
return
except Exception as exc: # pragma: no cover - exercised via error path test
self._write_error(request_id, -32000, str(exc))
return
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": content,
"isError": False,
},
}
self._write(response)
return
if request_id is not None:
self._write_error(request_id, -32601, f"Unknown method: {method}")
def _write(self, payload: dict[str, Any]) -> None:
sys.stdout.write(json.dumps(payload))
sys.stdout.write("\n")
sys.stdout.flush()
def _write_error(
self, request_id: Any, code: int, message: str, *, data: dict[str, Any] | None = None
) -> None:
payload = {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": code, "message": message},
}
if data is not None:
payload["error"]["data"] = data
self._write(payload)
@dataclass(slots=True)
class StatusProvider:
cve_client: CveClient
kev_client: KevClient
epss_client: EpssClient | None = None
def status(self) -> dict[str, Any]:
return {
"sources": {
"cve": _status_for_client(
self.cve_client.last_success_at,
self.cve_client.last_error_at,
self.cve_client.last_error,
cache_entries=len(self.cve_client._cache),
url=self.cve_client.base_url,
),
"kev": _status_for_client(
self.kev_client.last_success_at,
self.kev_client.last_error_at,
self.kev_client.last_error,
cache_entries=len(self.kev_client._cache or {}),
url=self.kev_client.primary_url,
),
"epss": _status_for_client(
self.epss_client.last_success_at if self.epss_client else None,
self.epss_client.last_error_at if self.epss_client else None,
self.epss_client.last_error if self.epss_client else None,
cache_entries=len(self.epss_client._cache) if self.epss_client else 0,
enabled=self.epss_client is not None,
url=self.epss_client.base_url if self.epss_client else None,
),
}
}
def _status_for_client(
last_success_at: float | None,
last_error_at: float | None,
last_error: dict[str, Any] | None,
*,
cache_entries: int,
enabled: bool = True,
url: str | None = None,
) -> dict[str, Any]:
return {
"enabled": enabled,
"last_success_at": last_success_at,
"last_success_iso": _format_iso(last_success_at),
"last_error_at": last_error_at,
"last_error_iso": _format_iso(last_error_at),
"last_error": last_error,
"last_error_summary": _summarize_error(last_error),
"cache_entries": cache_entries,
"url": url,
}
def _format_iso(timestamp: float | None) -> str | None:
if timestamp is None:
return None
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
def _summarize_error(error: dict[str, Any] | None) -> str | None:
if not error:
return None
error_type = error.get("type")
status_code = error.get("status_code")
if status_code is not None:
return f"{error_type} (HTTP {status_code})"
return str(error_type) if error_type else None