get_exploit_availability
Identify public exploits and PoCs for a specific CVE by analyzing sources like ExploitDB, Metasploit, GitHub, and NVD. Assess exploit availability, active exploitation indicators, and weaponization status to determine immediate risk.
Instructions
Check for public exploits and PoCs for a CVE
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| cve_id | Yes | Check for public exploits and proof-of-concepts (PoCs) for a CVE across multiple sources including ExploitDB, Metasploit, GitHub, and NVD references. Provide a CVE ID in the format CVE-YYYY-NNNN (e.g., CVE-2021-44228). Returns threat intelligence about exploit availability, active exploitation indicators, and weaponization status to assess immediate risk. |
Implementation Reference
- Main handler function implementing the tool logic: validates CVE, queries NVD API for references, checks MITRE page, provides manual search links for ExploitDB/Metasploit/GitHub, assesses risk level, and returns formatted markdown report.async def get_exploit_availability( cve_id: str, ) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ Check for the availability of public exploits for a CVE. Searches multiple databases and sources including ExploitDB, Metasploit, and others. Args: cve_id: CVE identifier in format CVE-YYYY-NNNN Returns: List of content containing exploit availability information or error messages """ # Clean up CVE ID format cve_id = cve_id.upper().strip() if not cve_id.startswith("CVE-"): cve_id = f"CVE-{cve_id}" # Validate CVE ID format (CVE-YYYY-NNNN) if not re.match(r"^CVE-\d{4}-\d{4,}$", cve_id): return [ types.TextContent( type="text", text=f"Error: Invalid CVE ID format. Expected format: CVE-YYYY-NNNN (e.g., CVE-2021-44228). Got: {cve_id}", ) ] headers = { "User-Agent": "MCP Exploit Availability Checker v1.0", "Accept": "application/json", } exploit_sources = {} try: timeout = httpx.Timeout(15.0, connect=10.0) async with httpx.AsyncClient( follow_redirects=True, headers=headers, timeout=timeout ) as client: # Check 1: NIST NVD for CVE references that might indicate exploits nvd_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}" try: nvd_response = await client.get(nvd_url) if nvd_response.status_code == 200: nvd_data = nvd_response.json() if nvd_data.get("totalResults", 0) > 0: cve_item = nvd_data["vulnerabilities"][0]["cve"] # Check references for exploit indicators exploit_indicators = [] references = cve_item.get("references", []) for ref in references: url = ref.get("url", "").lower() tags = [tag.lower() for tag in ref.get("tags", [])] # Look for exploit-related keywords if ( "exploit" in tags or "exploit" in url or "poc" in url or "proof" in url or "github.com" in url or "packetstorm" in url or "metasploit" in url or "rapid7" in url ): exploit_indicators.append( { "url": ref.get("url", ""), "source": ref.get("source", ""), "tags": ref.get("tags", []), } ) if exploit_indicators: exploit_sources["NVD_References"] = { "status": "POTENTIAL_EXPLOITS_FOUND", "count": len(exploit_indicators), "details": exploit_indicators[:5], # Limit to first 5 } else: exploit_sources["NVD_References"] = { "status": "NO_EXPLOIT_INDICATORS", "count": 0, "details": [], } except Exception as e: exploit_sources["NVD_References"] = { "status": "ERROR", "error": str(e), } # Check 2: Search CVE Mitre for additional exploit references try: mitre_url = f"https://cve.mitre.org/cgi-bin/cvekey.cgi?keyword={cve_id}" # Note: This is a basic check since MITRE doesn't have a simple API mitre_response = await client.get(mitre_url) if mitre_response.status_code == 200: content = mitre_response.text.lower() exploit_keywords = [ "exploit", "proof of concept", "poc", "metasploit", "weaponized", ] found_keywords = [kw for kw in exploit_keywords if kw in content] if found_keywords: exploit_sources["MITRE_Page"] = { "status": "EXPLOIT_KEYWORDS_FOUND", "keywords": found_keywords, } else: exploit_sources["MITRE_Page"] = { "status": "NO_EXPLOIT_KEYWORDS", "keywords": [], } except Exception as e: exploit_sources["MITRE_Page"] = { "status": "ERROR", "error": str(e), } # Check 3: Search GitHub for potential PoCs (indirect check) try: # Note: This would require GitHub API for full search # For now, we'll provide guidance on manual checking exploit_sources["GitHub_Search"] = { "status": "MANUAL_CHECK_RECOMMENDED", "search_url": f"https://github.com/search?q={cve_id}+exploit&type=repositories", "search_url_poc": f"https://github.com/search?q={cve_id}+poc&type=repositories", } except Exception as e: exploit_sources["GitHub_Search"] = { "status": "ERROR", "error": str(e), } # Check 4: ExploitDB search guidance (since they don't have a public API) exploit_sources["ExploitDB"] = { "status": "MANUAL_CHECK_RECOMMENDED", "search_url": f"https://www.exploit-db.com/search?cve={cve_id}", "description": "Check ExploitDB manually for verified exploits", } # Check 5: Metasploit modules guidance exploit_sources["Metasploit"] = { "status": "MANUAL_CHECK_RECOMMENDED", "search_guidance": f"Search for '{cve_id}' in Metasploit Framework", "command": f"msfconsole -q -x 'search cve:{cve_id}; exit'", } except Exception as e: return [ types.TextContent( type="text", text=f"Error: Failed to check exploit availability: {str(e)}", ) ] # Analyze results and determine overall risk exploit_found = False potential_exploits = False for source, data in exploit_sources.items(): if isinstance(data, dict): status = data.get("status", "") if "FOUND" in status or "POTENTIAL" in status: if "POTENTIAL" in status: potential_exploits = True else: exploit_found = True # Format the response result = f"π‘οΈ **Exploit Availability Report: {cve_id}**\n\n" # Overall assessment if exploit_found: risk_level = "π΄ HIGH RISK" risk_desc = "Public exploits appear to be available" elif potential_exploits: risk_level = "π MEDIUM RISK" risk_desc = "Potential exploit indicators found" else: risk_level = "π’ LOW RISK" risk_desc = "No obvious public exploits found" result += f"β οΈ **Risk Assessment:** {risk_level}\n" result += f"π **Assessment:** {risk_desc}\n\n" result += "π **Detailed Source Analysis:**\n\n" # NVD References Analysis nvd_data = exploit_sources.get("NVD_References", {}) if nvd_data.get("status") == "POTENTIAL_EXPLOITS_FOUND": result += f"**π NVD References:** β Found {nvd_data.get('count', 0)} potential exploit references\n" for detail in nvd_data.get("details", [])[:3]: # Show first 3 result += f" β’ {detail.get('url', '')}\n" if detail.get("tags"): result += f" Tags: {', '.join(detail.get('tags', []))}\n" elif nvd_data.get("status") == "NO_EXPLOIT_INDICATORS": result += "**π NVD References:** βͺ No exploit indicators in references\n" elif nvd_data.get("status") == "ERROR": result += f"**π NVD References:** β Error checking ({nvd_data.get('error', 'Unknown')})\n" # MITRE Analysis mitre_data = exploit_sources.get("MITRE_Page", {}) if mitre_data.get("status") == "EXPLOIT_KEYWORDS_FOUND": result += f"**ποΈ MITRE Page:** β οΈ Exploit-related keywords found: {', '.join(mitre_data.get('keywords', []))}\n" elif mitre_data.get("status") == "NO_EXPLOIT_KEYWORDS": result += "**ποΈ MITRE Page:** βͺ No exploit keywords detected\n" result += "\nπ **Manual Verification Recommended:**\n\n" # GitHub Search github_data = exploit_sources.get("GitHub_Search", {}) result += "**π GitHub Search:**\n" result += f" β’ Repository search: {github_data.get('search_url', '')}\n" result += f" β’ PoC search: {github_data.get('search_url_poc', '')}\n\n" # ExploitDB exploitdb_data = exploit_sources.get("ExploitDB", {}) result += "**π₯ ExploitDB:**\n" result += f" β’ Search URL: {exploitdb_data.get('search_url', '')}\n" result += f" β’ Description: {exploitdb_data.get('description', '')}\n\n" # Metasploit metasploit_data = exploit_sources.get("Metasploit", {}) result += "**π― Metasploit Framework:**\n" result += f" β’ Command: `{metasploit_data.get('command', '')}`\n" result += f" β’ Guidance: {metasploit_data.get('search_guidance', '')}\n\n" result += "π‘ **Additional Resources:**\n" result += f" β’ VulnCheck: https://vulncheck.com/search?q={cve_id}\n" result += f" β’ PacketStorm: https://packetstormsecurity.com/search/?q={cve_id}\n" result += " β’ SecLists: Check security mailing lists and advisories\n\n" result += "π¨ **Security Recommendations:**\n" if risk_level.startswith("π΄"): result += " β’ π¨ **URGENT:** Treat as actively exploited\n" result += " β’ π‘οΈ Implement immediate mitigations\n" result += " β’ π Enhanced monitoring for attack attempts\n" result += " β’ β‘ Emergency patching procedures\n" elif risk_level.startswith("π "): result += " β’ β οΈ **HIGH PRIORITY:** Assume exploitable\n" result += " β’ π Accelerated patching timeline\n" result += " β’ π Monitor for new exploit releases\n" result += " β’ π Additional security controls\n" else: result += " β’ π **STANDARD:** Follow normal patching schedule\n" result += " β’ π Continue monitoring threat landscape\n" result += " β’ π Periodic re-evaluation recommended\n" result += ( f"\nπ **Report Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) return [types.TextContent(type="text", text=result)]
- mcp_simple_tool/server.py:247-256 (schema)Input schema definition for the get_exploit_availability tool, specifying required 'cve_id' parameter.inputSchema={ "type": "object", "required": ["cve_id"], "properties": { "cve_id": { "type": "string", "description": exploit_description, } }, },
- mcp_simple_tool/server.py:244-257 (registration)Tool registration in the list_tools() function, defining name, description, and input schema.types.Tool( name="get_exploit_availability", description="Check for public exploits and PoCs for a CVE", inputSchema={ "type": "object", "required": ["cve_id"], "properties": { "cve_id": { "type": "string", "description": exploit_description, } }, }, ),
- mcp_simple_tool/server.py:132-139 (registration)Dispatch logic in fetch_tool() that routes calls to the get_exploit_availability handler.elif name == "get_exploit_availability": if "cve_id" not in arguments: return [ types.TextContent( type="text", text="Error: Missing required argument 'cve_id'" ) ] return await get_exploit_availability(arguments["cve_id"])
- mcp_simple_tool/tools/__init__.py:10-22 (registration)Import and export of the handler function in the tools package __init__.from .exploit_availability import get_exploit_availability from .package_vulnerability import check_package_vulnerabilities from .vex_status import get_vex_status from .vulnerability_search import search_vulnerabilities from .vulnerability_timeline import get_vulnerability_timeline __all__ = [ "lookup_cve", "check_package_vulnerabilities", "get_epss_score", "calculate_cvss_score", "search_vulnerabilities", "get_exploit_availability",