#!/usr/bin/env python3
"""MCP Server for CVE database queries."""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .tools import (
tool_get_cve_details,
tool_search_cves,
tool_get_statistics
)
app = Server("cve-database")
@app.list_tools()
async def list_tools() -> list[Tool]:
# TODO: add filtering by severity/CVSS score
return [
Tool(
name="get_cve_details",
description="Get detailed information about a specific CVE by its ID. "
"Returns severity, CVSS score, description, and references.",
inputSchema={
"type": "object",
"properties": {
"cve_id": {
"type": "string",
"description": "The CVE identifier (e.g., 'CVE-2024-0001')"
}
},
"required": ["cve_id"]
}
),
Tool(
name="search_cves",
description="Search CVEs by keyword in description",
inputSchema={
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "Search term to look for in CVE descriptions"
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 10, max: 50)",
"default": 10
}
},
"required": ["keyword"]
}
),
Tool(
name="get_statistics",
description="Get database stats (count, date range, last update)",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "get_cve_details":
result = tool_get_cve_details(arguments.get("cve_id", ""))
elif name == "search_cves":
result = tool_search_cves(
arguments.get("keyword", ""),
arguments.get("limit", 10)
)
elif name == "get_statistics":
result = tool_get_statistics()
else:
result = f'{{"error": "Unknown tool: {name}"}}'
return [TextContent(type="text", text=result)]
async def run_server():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)