#!/usr/bin/env python3
"""MCP server for Exploit-DB searchsploit tool."""
import asyncio
import subprocess
from typing import Any
from mcp.server import Server
from mcp.types import Tool, TextContent
app = Server("mcp-exploitdb")
def run_searchsploit(args: list[str]) -> tuple[str, str, int]:
"""Run searchsploit command and return stdout, stderr, and return code."""
try:
result = subprocess.run(
["searchsploit"] + args,
capture_output=True,
text=True,
timeout=30
)
return result.stdout, result.stderr, result.returncode
except subprocess.TimeoutExpired:
return "", "Command timed out after 30 seconds", 1
except FileNotFoundError:
return "", "searchsploit command not found. Please install exploitdb package.", 1
except Exception as e:
return "", f"Error running searchsploit: {str(e)}", 1
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available searchsploit tools."""
return [
Tool(
name="search_exploits",
description="Search for exploits in Exploit-DB using search terms. Returns a formatted list of matching exploits with titles and paths.",
inputSchema={
"type": "object",
"properties": {
"terms": {
"type": "array",
"items": {"type": "string"},
"description": "Search terms to find exploits (e.g., ['afd', 'windows', 'local'])"
},
"case_sensitive": {
"type": "boolean",
"description": "Perform case-sensitive search (default: false)",
"default": False
},
"exact": {
"type": "boolean",
"description": "Perform exact & order match on exploit title (default: false)",
"default": False
},
"strict": {
"type": "boolean",
"description": "Strict search - input values must exist exactly (default: false)",
"default": False
},
"title_only": {
"type": "boolean",
"description": "Search only in exploit titles, not file paths (default: false)",
"default": False
},
"exclude": {
"type": "string",
"description": "Exclude terms from results (use | to separate multiple terms)",
"default": ""
},
"json_output": {
"type": "boolean",
"description": "Return results in JSON format (default: false)",
"default": False
},
"show_urls": {
"type": "boolean",
"description": "Show Exploit-DB.com URLs instead of local paths (default: false)",
"default": False
}
},
"required": ["terms"]
}
),
Tool(
name="search_cve",
description="Search for exploits by CVE (Common Vulnerabilities and Exposures) identifier.",
inputSchema={
"type": "object",
"properties": {
"cve": {
"type": "string",
"description": "CVE identifier (e.g., '2021-44228' or 'CVE-2021-44228')"
},
"json_output": {
"type": "boolean",
"description": "Return results in JSON format (default: false)",
"default": False
}
},
"required": ["cve"]
}
),
Tool(
name="get_exploit_path",
description="Get the full local filesystem path to an exploit by its EDB-ID.",
inputSchema={
"type": "object",
"properties": {
"edb_id": {
"type": "string",
"description": "Exploit-DB ID (e.g., '39446')"
}
},
"required": ["edb_id"]
}
),
Tool(
name="get_exploit_content",
description="Retrieve and display the full content of an exploit by its EDB-ID.",
inputSchema={
"type": "object",
"properties": {
"edb_id": {
"type": "string",
"description": "Exploit-DB ID (e.g., '39446')"
}
},
"required": ["edb_id"]
}
),
Tool(
name="mirror_exploit",
description="Copy an exploit to a specified directory by its EDB-ID.",
inputSchema={
"type": "object",
"properties": {
"edb_id": {
"type": "string",
"description": "Exploit-DB ID (e.g., '39446')"
},
"destination": {
"type": "string",
"description": "Destination directory (default: current directory)",
"default": "."
}
},
"required": ["edb_id"]
}
),
Tool(
name="update_exploitdb",
description="Update the local Exploit-DB database to the latest version.",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls for searchsploit operations."""
if name == "search_exploits":
terms = arguments.get("terms", [])
if not terms:
return [TextContent(type="text", text="Error: No search terms provided")]
args = []
# Add search options
if arguments.get("case_sensitive"):
args.append("-c")
if arguments.get("exact"):
args.append("-e")
if arguments.get("strict"):
args.append("-s")
if arguments.get("title_only"):
args.append("-t")
if arguments.get("json_output"):
args.append("-j")
if arguments.get("show_urls"):
args.append("-w")
# Add exclude filter
if arguments.get("exclude"):
args.append(f"--exclude={arguments['exclude']}")
# Add search terms
args.extend(terms)
stdout, stderr, returncode = run_searchsploit(args)
if returncode != 0:
return [TextContent(type="text", text=f"Error: {stderr}\n{stdout}")]
return [TextContent(type="text", text=stdout if stdout else "No results found")]
elif name == "search_cve":
cve = arguments.get("cve", "")
if not cve:
return [TextContent(type="text", text="Error: No CVE provided")]
# Remove CVE- prefix if present for consistency
cve = cve.replace("CVE-", "").replace("cve-", "")
args = ["--cve", cve]
if arguments.get("json_output"):
args.append("-j")
stdout, stderr, returncode = run_searchsploit(args)
if returncode != 0:
return [TextContent(type="text", text=f"Error: {stderr}\n{stdout}")]
return [TextContent(type="text", text=stdout if stdout else "No results found")]
elif name == "get_exploit_path":
edb_id = arguments.get("edb_id", "")
if not edb_id:
return [TextContent(type="text", text="Error: No EDB-ID provided")]
args = ["-p", edb_id]
stdout, stderr, returncode = run_searchsploit(args)
if returncode != 0:
return [TextContent(type="text", text=f"Error: {stderr}\n{stdout}")]
return [TextContent(type="text", text=stdout.strip())]
elif name == "get_exploit_content":
edb_id = arguments.get("edb_id", "")
if not edb_id:
return [TextContent(type="text", text="Error: No EDB-ID provided")]
# First get the path
stdout, stderr, returncode = run_searchsploit(["-p", edb_id])
if returncode != 0:
return [TextContent(type="text", text=f"Error: {stderr}\n{stdout}")]
# Extract the file path from the output
# Look for the line starting with " Path:"
path = None
for line in stdout.split('\n'):
if line.strip().startswith('Path:'):
path = line.split('Path:', 1)[1].strip()
break
if not path:
return [TextContent(type="text", text=f"Error: Could not find exploit path\n{stdout}")]
# Read the file content
try:
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return [TextContent(type="text", text=f"Exploit: {path}\n\n{content}")]
except Exception as e:
return [TextContent(type="text", text=f"Error reading exploit file: {str(e)}")]
elif name == "mirror_exploit":
edb_id = arguments.get("edb_id", "")
destination = arguments.get("destination", ".")
if not edb_id:
return [TextContent(type="text", text="Error: No EDB-ID provided")]
# Change to destination directory and mirror
try:
import os
original_dir = os.getcwd()
os.chdir(destination)
args = ["-m", edb_id]
stdout, stderr, returncode = run_searchsploit(args)
os.chdir(original_dir)
if returncode != 0:
return [TextContent(type="text", text=f"Error: {stderr}\n{stdout}")]
return [TextContent(type="text", text=f"Exploit mirrored successfully to {destination}\n{stdout}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
elif name == "update_exploitdb":
args = ["-u"]
stdout, stderr, returncode = run_searchsploit(args)
if returncode != 0:
return [TextContent(type="text", text=f"Error: {stderr}\n{stdout}")]
return [TextContent(type="text", text=stdout)]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
async def main():
"""Run the MCP server."""
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())