get_exploit_availability
Check for public exploits and proof-of-concepts for a CVE across multiple sources to assess immediate risk and weaponization status.
Instructions
Check for public exploits and PoCs for a CVE
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| cve_id | Yes | Check for public exploits and proof-of-concepts (PoCs) for a CVE across multiple sources including ExploitDB, Metasploit, GitHub, and NVD references. Provide a CVE ID in the format CVE-YYYY-NNNN (e.g., CVE-2021-44228). Returns threat intelligence about exploit availability, active exploitation indicators, and weaponization status to assess immediate risk. |
Implementation Reference
- The main asynchronous handler function that implements the 'get_exploit_availability' tool logic. It validates the CVE ID, queries NVD API for references, checks MITRE page, and provides links and guidance for manual checks on ExploitDB, GitHub, Metasploit, etc. Generates a formatted risk assessment report.async def get_exploit_availability( cve_id: str, ) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ Check for the availability of public exploits for a CVE. Searches multiple databases and sources including ExploitDB, Metasploit, and others. Args: cve_id: CVE identifier in format CVE-YYYY-NNNN Returns: List of content containing exploit availability information or error messages """ # Clean up CVE ID format cve_id = cve_id.upper().strip() if not cve_id.startswith("CVE-"): cve_id = f"CVE-{cve_id}" # Validate CVE ID format (CVE-YYYY-NNNN) if not re.match(r"^CVE-\d{4}-\d{4,}$", cve_id): return [ types.TextContent( type="text", text=f"Error: Invalid CVE ID format. Expected format: CVE-YYYY-NNNN (e.g., CVE-2021-44228). Got: {cve_id}", ) ] headers = { "User-Agent": "MCP Exploit Availability Checker v1.0", "Accept": "application/json", } exploit_sources = {} try: timeout = httpx.Timeout(15.0, connect=10.0) async with httpx.AsyncClient( follow_redirects=True, headers=headers, timeout=timeout ) as client: # Check 1: NIST NVD for CVE references that might indicate exploits nvd_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}" try: nvd_response = await client.get(nvd_url) if nvd_response.status_code == 200: nvd_data = nvd_response.json() if nvd_data.get("totalResults", 0) > 0: cve_item = nvd_data["vulnerabilities"][0]["cve"] # Check references for exploit indicators exploit_indicators = [] references = cve_item.get("references", []) for ref in references: url = ref.get("url", "").lower() tags = [tag.lower() for tag in ref.get("tags", [])] # Look for exploit-related keywords if ( "exploit" in tags or "exploit" in url or "poc" in url or "proof" in url or "github.com" in url or "packetstorm" in url or "metasploit" in url or "rapid7" in url ): exploit_indicators.append( { "url": ref.get("url", ""), "source": ref.get("source", ""), "tags": ref.get("tags", []), } ) if exploit_indicators: exploit_sources["NVD_References"] = { "status": "POTENTIAL_EXPLOITS_FOUND", "count": len(exploit_indicators), "details": exploit_indicators[:5], # Limit to first 5 } else: exploit_sources["NVD_References"] = { "status": "NO_EXPLOIT_INDICATORS", "count": 0, "details": [], } except Exception as e: exploit_sources["NVD_References"] = { "status": "ERROR", "error": str(e), } # Check 2: Search CVE Mitre for additional exploit references try: mitre_url = f"https://cve.mitre.org/cgi-bin/cvekey.cgi?keyword={cve_id}" # Note: This is a basic check since MITRE doesn't have a simple API mitre_response = await client.get(mitre_url) if mitre_response.status_code == 200: content = mitre_response.text.lower() exploit_keywords = [ "exploit", "proof of concept", "poc", "metasploit", "weaponized", ] found_keywords = [kw for kw in exploit_keywords if kw in content] if found_keywords: exploit_sources["MITRE_Page"] = { "status": "EXPLOIT_KEYWORDS_FOUND", "keywords": found_keywords, } else: exploit_sources["MITRE_Page"] = { "status": "NO_EXPLOIT_KEYWORDS", "keywords": [], } except Exception as e: exploit_sources["MITRE_Page"] = { "status": "ERROR", "error": str(e), } # Check 3: Search GitHub for potential PoCs (indirect check) try: # Note: This would require GitHub API for full search # For now, we'll provide guidance on manual checking exploit_sources["GitHub_Search"] = { "status": "MANUAL_CHECK_RECOMMENDED", "search_url": f"https://github.com/search?q={cve_id}+exploit&type=repositories", "search_url_poc": f"https://github.com/search?q={cve_id}+poc&type=repositories", } except Exception as e: exploit_sources["GitHub_Search"] = { "status": "ERROR", "error": str(e), } # Check 4: ExploitDB search guidance (since they don't have a public API) exploit_sources["ExploitDB"] = { "status": "MANUAL_CHECK_RECOMMENDED", "search_url": f"https://www.exploit-db.com/search?cve={cve_id}", "description": "Check ExploitDB manually for verified exploits", } # Check 5: Metasploit modules guidance exploit_sources["Metasploit"] = { "status": "MANUAL_CHECK_RECOMMENDED", "search_guidance": f"Search for '{cve_id}' in Metasploit Framework", "command": f"msfconsole -q -x 'search cve:{cve_id}; exit'", } except Exception as e: return [ types.TextContent( type="text", text=f"Error: Failed to check exploit availability: {str(e)}", ) ] # Analyze results and determine overall risk exploit_found = False potential_exploits = False for source, data in exploit_sources.items(): if isinstance(data, dict): status = data.get("status", "") if "FOUND" in status or "POTENTIAL" in status: if "POTENTIAL" in status: potential_exploits = True else: exploit_found = True # Format the response result = f"š”ļø **Exploit Availability Report: {cve_id}**\n\n" # Overall assessment if exploit_found: risk_level = "š“ HIGH RISK" risk_desc = "Public exploits appear to be available" elif potential_exploits: risk_level = "š MEDIUM RISK" risk_desc = "Potential exploit indicators found" else: risk_level = "š¢ LOW RISK" risk_desc = "No obvious public exploits found" result += f"ā ļø **Risk Assessment:** {risk_level}\n" result += f"š **Assessment:** {risk_desc}\n\n" result += "š **Detailed Source Analysis:**\n\n" # NVD References Analysis nvd_data = exploit_sources.get("NVD_References", {}) if nvd_data.get("status") == "POTENTIAL_EXPLOITS_FOUND": result += f"**š NVD References:** ā Found {nvd_data.get('count', 0)} potential exploit references\n" for detail in nvd_data.get("details", [])[:3]: # Show first 3 result += f" ⢠{detail.get('url', '')}\n" if detail.get("tags"): result += f" Tags: {', '.join(detail.get('tags', []))}\n" elif nvd_data.get("status") == "NO_EXPLOIT_INDICATORS": result += "**š NVD References:** āŖ No exploit indicators in references\n" elif nvd_data.get("status") == "ERROR": result += f"**š NVD References:** ā Error checking ({nvd_data.get('error', 'Unknown')})\n" # MITRE Analysis mitre_data = exploit_sources.get("MITRE_Page", {}) if mitre_data.get("status") == "EXPLOIT_KEYWORDS_FOUND": result += f"**šļø MITRE Page:** ā ļø Exploit-related keywords found: {', '.join(mitre_data.get('keywords', []))}\n" elif mitre_data.get("status") == "NO_EXPLOIT_KEYWORDS": result += "**šļø MITRE Page:** āŖ No exploit keywords detected\n" result += "\nš **Manual Verification Recommended:**\n\n" # GitHub Search github_data = exploit_sources.get("GitHub_Search", {}) result += "**š GitHub Search:**\n" result += f" ⢠Repository search: {github_data.get('search_url', '')}\n" result += f" ⢠PoC search: {github_data.get('search_url_poc', '')}\n\n" # ExploitDB exploitdb_data = exploit_sources.get("ExploitDB", {}) result += "**š„ ExploitDB:**\n" result += f" ⢠Search URL: {exploitdb_data.get('search_url', '')}\n" result += f" ⢠Description: {exploitdb_data.get('description', '')}\n\n" # Metasploit metasploit_data = exploit_sources.get("Metasploit", {}) result += "**šÆ Metasploit Framework:**\n" result += f" ⢠Command: `{metasploit_data.get('command', '')}`\n" result += f" ⢠Guidance: {metasploit_data.get('search_guidance', '')}\n\n" result += "š” **Additional Resources:**\n" result += f" ⢠VulnCheck: https://vulncheck.com/search?q={cve_id}\n" result += f" ⢠PacketStorm: https://packetstormsecurity.com/search/?q={cve_id}\n" result += " ⢠SecLists: Check security mailing lists and advisories\n\n" result += "šØ **Security Recommendations:**\n" if risk_level.startswith("š“"): result += " ⢠šØ **URGENT:** Treat as actively exploited\n" result += " ⢠š”ļø Implement immediate mitigations\n" result += " ⢠š Enhanced monitoring for attack attempts\n" result += " ⢠┠Emergency patching procedures\n" elif risk_level.startswith("š "): result += " ⢠ā ļø **HIGH PRIORITY:** Assume exploitable\n" result += " ⢠š Accelerated patching timeline\n" result += " ⢠š Monitor for new exploit releases\n" result += " ⢠š Additional security controls\n" else: result += " ⢠š **STANDARD:** Follow normal patching schedule\n" result += " ⢠š Continue monitoring threat landscape\n" result += " ⢠š Periodic re-evaluation recommended\n" result += ( f"\nš **Report Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) return [types.TextContent(type="text", text=result)]
- mcp_simple_tool/server.py:244-257 (registration)Registers the 'get_exploit_availability' tool in the MCP server's list_tools() function, including its name, description, and input schema requiring 'cve_id'.types.Tool( name="get_exploit_availability", description="Check for public exploits and PoCs for a CVE", inputSchema={ "type": "object", "required": ["cve_id"], "properties": { "cve_id": { "type": "string", "description": exploit_description, } }, }, ),
- mcp_simple_tool/server.py:132-139 (registration)Dispatch logic in the server's fetch_tool() handler that routes calls to 'get_exploit_availability' by invoking the imported handler function with the 'cve_id' argument.elif name == "get_exploit_availability": if "cve_id" not in arguments: return [ types.TextContent( type="text", text="Error: Missing required argument 'cve_id'" ) ] return await get_exploit_availability(arguments["cve_id"])
- mcp_simple_tool/server.py:247-256 (schema)Input schema definition for the tool, specifying the required 'cve_id' string parameter with description.inputSchema={ "type": "object", "required": ["cve_id"], "properties": { "cve_id": { "type": "string", "description": exploit_description, } }, },
- Re-exports the get_exploit_availability handler from its module for convenient import in the server.from .exploit_availability import get_exploit_availability from .package_vulnerability import check_package_vulnerabilities from .vex_status import get_vex_status from .vulnerability_search import search_vulnerabilities from .vulnerability_timeline import get_vulnerability_timeline __all__ = [ "lookup_cve", "check_package_vulnerabilities", "get_epss_score", "calculate_cvss_score", "search_vulnerabilities", "get_exploit_availability",