Skip to main content
Glama
marc-shade

Threat Intelligence MCP Server

by marc-shade
server.py29.9 kB
#!/usr/bin/env python3 """ Threat Intelligence MCP Server v0.2.0 Aggregates threat feeds from multiple sources: - Feodo Tracker, URLhaus, SSL Blacklist - Emerging Threats, CISA KEV, Tor Exit Nodes - ThreatFox, Blocklist.de, CI Army, Spamhaus - VirusTotal, AbuseIPDB, Shodan (with API keys) Provides tools for: - Checking IPs/domains/hashes against threat feeds - Getting latest threats and IOCs - Bulk reputation checks - Network scan integration - MITRE ATT&CK mapping """ import asyncio import json import sys from datetime import datetime, timedelta from typing import Any, Optional import aiohttp from fastmcp import FastMCP from .config import ( # Configuration API_KEYS, THREAT_FEEDS, FeedType, Severity, IOCType, # Functions setup_logging, ensure_dirs, get_cache_dir, get_feed, get_enabled_feeds, get_ip_feeds, get_timestamp, # Validation validate_ip, validate_hash, validate_domain, validate_ioc_type, # Cache threat_cache, # Constants DEFAULT_REQUEST_TIMEOUT, MAX_RESPONSE_ITEMS, DEFAULT_CACHE_TTL, ) # Configure logging logger = setup_logging("threat-intel-mcp") # Initialize FastMCP mcp = FastMCP("threat-intel") # ============================================================================= # HTTP Helpers # ============================================================================= async def fetch_url(url: str, headers: Optional[dict] = None, timeout: int = DEFAULT_REQUEST_TIMEOUT) -> str: """ Fetch URL content with proper error handling. Args: url: URL to fetch headers: Optional request headers timeout: Request timeout in seconds Returns: Response text content """ async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, timeout=timeout) as response: response.raise_for_status() return await response.text() async def fetch_json(url: str, headers: Optional[dict] = None, timeout: int = DEFAULT_REQUEST_TIMEOUT) -> dict: """ Fetch JSON from URL with proper error handling. Args: url: URL to fetch headers: Optional request headers timeout: Request timeout in seconds Returns: Parsed JSON as dict """ async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, timeout=timeout) as response: response.raise_for_status() return await response.json() # ============================================================================= # Parsing Helpers # ============================================================================= def parse_ip_list(content: str) -> list[str]: """ Parse IP list from text content with proper validation. Args: content: Raw text content with IPs Returns: List of valid IP addresses """ import ipaddress ips = [] for line in content.split('\n'): line = line.strip() if line and not line.startswith('#'): # Extract potential IP from line parts = line.split() if parts: potential_ip = parts[0] try: # Validate IP using ipaddress module ipaddress.ip_address(potential_ip) ips.append(potential_ip) except ValueError: continue return ips def parse_url_list(content: str) -> list[str]: """ Parse URL list from text content. Args: content: Raw text content with URLs Returns: List of URLs """ urls = [] for line in content.split('\n'): line = line.strip() if line and not line.startswith('#'): if line.startswith('http://') or line.startswith('https://'): urls.append(line) return urls def parse_cidr_list(content: str) -> list[str]: """ Parse CIDR notation IP ranges. Args: content: Raw text with CIDR ranges Returns: List of CIDR strings """ import ipaddress cidrs = [] for line in content.split('\n'): line = line.strip() if line and not line.startswith(';') and not line.startswith('#'): parts = line.split() if parts: try: ipaddress.ip_network(parts[0], strict=False) cidrs.append(parts[0]) except ValueError: continue return cidrs # ============================================================================= # MCP Tools # ============================================================================= @mcp.tool() async def get_threat_feeds() -> str: """ Get list of all available threat intelligence feeds. Returns: JSON with available feeds and their descriptions """ feeds = [feed.to_dict() for feed in get_enabled_feeds().values()] return json.dumps({ "success": True, "feeds": feeds, "total_feeds": len(feeds), "api_configured": API_KEYS.to_dict() }, indent=2) @mcp.tool() async def fetch_threat_feed(feed_name: str) -> str: """ Fetch and parse a specific threat intelligence feed. Args: feed_name: Name of the feed (feodo_tracker, urlhaus_recent, etc.) Returns: JSON with IOCs from the feed """ ensure_dirs() feed = get_feed(feed_name) if not feed: available = list(THREAT_FEEDS.keys()) return json.dumps({ "success": False, "error": f"Unknown feed: {feed_name}", "available_feeds": available }, indent=2) # Check cache cache_key = f"feed_{feed_name}" cached = threat_cache.get(cache_key) if cached: return json.dumps({ "success": True, "feed": feed_name, "description": feed.description, "cached": True, **cached }, indent=2) try: result: dict[str, Any] = {} if feed.feed_type == FeedType.JSON: data = await fetch_json(feed.url) result = { "type": "json", "data": data if isinstance(data, dict) else {"items": data}, "count": len(data) if isinstance(data, (list, dict)) else 1 } elif feed.feed_type == FeedType.IP_LIST: content = await fetch_url(feed.url) ips = parse_ip_list(content) result = { "type": "ip_list", "ips": ips[:MAX_RESPONSE_ITEMS], "count": len(ips), "truncated": len(ips) > MAX_RESPONSE_ITEMS } elif feed.feed_type == FeedType.URL_LIST: content = await fetch_url(feed.url) urls = parse_url_list(content) result = { "type": "url_list", "urls": urls[:MAX_RESPONSE_ITEMS], "count": len(urls), "truncated": len(urls) > MAX_RESPONSE_ITEMS } elif feed.feed_type == FeedType.TEXT: content = await fetch_url(feed.url) cidrs = parse_cidr_list(content) result = { "type": "cidr_list", "cidrs": cidrs[:MAX_RESPONSE_ITEMS], "count": len(cidrs), "truncated": len(cidrs) > MAX_RESPONSE_ITEMS } else: content = await fetch_url(feed.url) result = { "type": "text", "content": content[:10000], "count": len(content) } # Cache result result["fetched_at"] = get_timestamp() threat_cache.set(cache_key, result) return json.dumps({ "success": True, "feed": feed_name, "description": feed.description, "cached": False, **result }, indent=2) except aiohttp.ClientError as e: logger.error(f"Network error fetching {feed_name}: {e}") return json.dumps({ "success": False, "feed": feed_name, "error": f"Network error: {str(e)}" }, indent=2) except Exception as e: logger.error(f"Error fetching {feed_name}: {e}") return json.dumps({ "success": False, "feed": feed_name, "error": str(e) }, indent=2) @mcp.tool() async def check_ip_reputation(ip: str) -> str: """ Check an IP address against multiple threat intelligence sources. Args: ip: IP address to check Returns: JSON with reputation data from multiple sources """ # Validate IP is_valid, error = validate_ip(ip) if not is_valid: return json.dumps({ "success": False, "error": error }, indent=2) ensure_dirs() results: dict[str, Any] = { "ip": ip, "checked_at": get_timestamp(), "threats_found": [], "sources_checked": [] } # Check against cached IP feeds ip_feed_names = get_ip_feeds() for feed_name in ip_feed_names: cache_key = f"feed_{feed_name}" cached = threat_cache.get(cache_key) if cached: feed_ips = cached.get("ips", []) if ip in feed_ips: feed = get_feed(feed_name) results["threats_found"].append({ "source": feed_name, "description": feed.description if feed else feed_name, "severity": Severity.HIGH.value }) results["sources_checked"].append(feed_name) # Check AbuseIPDB if API key configured if API_KEYS.has_abuseipdb: try: async with aiohttp.ClientSession() as session: headers = { "Key": API_KEYS.abuseipdb, "Accept": "application/json" } params = {"ipAddress": ip, "maxAgeInDays": 90} async with session.get( "https://api.abuseipdb.com/api/v2/check", headers=headers, params=params, timeout=DEFAULT_REQUEST_TIMEOUT ) as response: if response.status == 200: data = await response.json() results["abuseipdb"] = data.get("data", {}) results["sources_checked"].append("abuseipdb") confidence = data.get("data", {}).get("abuseConfidenceScore", 0) if confidence > 50: results["threats_found"].append({ "source": "abuseipdb", "confidence": confidence, "severity": Severity.HIGH.value if confidence > 75 else Severity.MEDIUM.value }) except Exception as e: logger.warning(f"AbuseIPDB error: {e}") results["abuseipdb_error"] = str(e) # Check VirusTotal if API key configured if API_KEYS.has_virustotal: try: async with aiohttp.ClientSession() as session: headers = {"x-apikey": API_KEYS.virustotal} async with session.get( f"https://www.virustotal.com/api/v3/ip_addresses/{ip}", headers=headers, timeout=DEFAULT_REQUEST_TIMEOUT ) as response: if response.status == 200: data = await response.json() stats = data.get("data", {}).get("attributes", {}).get("last_analysis_stats", {}) results["virustotal"] = stats results["sources_checked"].append("virustotal") malicious = stats.get("malicious", 0) if malicious > 0: results["threats_found"].append({ "source": "virustotal", "detections": malicious, "severity": Severity.HIGH.value if malicious > 5 else Severity.MEDIUM.value }) except Exception as e: logger.warning(f"VirusTotal error: {e}") results["virustotal_error"] = str(e) # Check Shodan if API key configured if API_KEYS.has_shodan: try: async with aiohttp.ClientSession() as session: async with session.get( f"https://api.shodan.io/shodan/host/{ip}?key={API_KEYS.shodan}", timeout=DEFAULT_REQUEST_TIMEOUT ) as response: if response.status == 200: data = await response.json() results["shodan"] = { "ports": data.get("ports", []), "hostnames": data.get("hostnames", []), "country": data.get("country_name"), "org": data.get("org"), "vulns": data.get("vulns", []) } results["sources_checked"].append("shodan") if data.get("vulns"): results["threats_found"].append({ "source": "shodan", "vulns": len(data["vulns"]), "severity": Severity.HIGH.value }) except Exception as e: logger.warning(f"Shodan error: {e}") results["shodan_error"] = str(e) # Calculate overall threat level results["is_malicious"] = len(results["threats_found"]) > 0 results["threat_level"] = ( Severity.HIGH.value if any(t.get("severity") == Severity.HIGH.value for t in results["threats_found"]) else Severity.MEDIUM.value if results["threats_found"] else Severity.LOW.value ) return json.dumps({"success": True, **results}, indent=2) @mcp.tool() async def check_hash_reputation(file_hash: str) -> str: """ Check a file hash (MD5/SHA1/SHA256) against threat intelligence. Args: file_hash: File hash to check Returns: JSON with reputation data """ # Validate hash is_valid, hash_type, error = validate_hash(file_hash) if not is_valid: return json.dumps({ "success": False, "error": error }, indent=2) results: dict[str, Any] = { "hash": file_hash.lower(), "hash_type": hash_type, "checked_at": get_timestamp(), "threats_found": [] } # Check VirusTotal if API key configured if API_KEYS.has_virustotal: try: async with aiohttp.ClientSession() as session: headers = {"x-apikey": API_KEYS.virustotal} async with session.get( f"https://www.virustotal.com/api/v3/files/{file_hash}", headers=headers, timeout=DEFAULT_REQUEST_TIMEOUT ) as response: if response.status == 200: data = await response.json() attrs = data.get("data", {}).get("attributes", {}) stats = attrs.get("last_analysis_stats", {}) results["virustotal"] = { "stats": stats, "names": attrs.get("names", [])[:10], "type_description": attrs.get("type_description"), "reputation": attrs.get("reputation", 0), "tags": attrs.get("tags", [])[:20] } malicious = stats.get("malicious", 0) if malicious > 0: results["threats_found"].append({ "source": "virustotal", "detections": malicious, "total_scanners": sum(stats.values()), "severity": Severity.CRITICAL.value if malicious > 10 else Severity.HIGH.value }) elif response.status == 404: results["virustotal"] = {"status": "not_found"} except Exception as e: logger.warning(f"VirusTotal error: {e}") results["virustotal_error"] = str(e) else: results["note"] = "Configure VIRUSTOTAL_API_KEY for hash lookups" results["is_malicious"] = len(results["threats_found"]) > 0 return json.dumps({"success": True, **results}, indent=2) @mcp.tool() async def check_bulk_ips(ips: str) -> str: """ Check multiple IP addresses against threat feeds in bulk. Args: ips: JSON array of IP addresses or comma-separated list Returns: JSON with reputation results for all IPs """ # Parse input try: if ips.startswith('['): ip_list = json.loads(ips) else: ip_list = [ip.strip() for ip in ips.split(',') if ip.strip()] except json.JSONDecodeError: ip_list = [ip.strip() for ip in ips.split(',') if ip.strip()] if not ip_list: return json.dumps({ "success": False, "error": "No valid IPs provided" }, indent=2) if len(ip_list) > 100: return json.dumps({ "success": False, "error": "Maximum 100 IPs per request" }, indent=2) # First, ensure we have threat data loaded ip_feed_names = get_ip_feeds() threat_ips: set[str] = set() for feed_name in ip_feed_names: cache_key = f"feed_{feed_name}" cached = threat_cache.get(cache_key) if cached: threat_ips.update(cached.get("ips", [])) else: # Fetch if not cached try: result = await fetch_threat_feed(feed_name) data = json.loads(result) if data.get("success") and data.get("ips"): threat_ips.update(data["ips"]) except Exception as e: logger.warning(f"Error loading {feed_name}: {e}") # Check each IP results: list[dict] = [] malicious_count = 0 for ip in ip_list: is_valid, error = validate_ip(ip) if not is_valid: results.append({ "ip": ip, "valid": False, "error": error }) continue is_threat = ip in threat_ips if is_threat: malicious_count += 1 results.append({ "ip": ip, "valid": True, "is_malicious": is_threat, "threat_level": Severity.HIGH.value if is_threat else Severity.LOW.value }) return json.dumps({ "success": True, "checked_at": get_timestamp(), "total_checked": len(ip_list), "malicious_count": malicious_count, "clean_count": len(ip_list) - malicious_count, "feeds_loaded": len(ip_feed_names), "threat_ips_available": len(threat_ips), "results": results }, indent=2) @mcp.tool() async def get_cisa_kev( days: int = 30, vendor: Optional[str] = None ) -> str: """ Get CISA Known Exploited Vulnerabilities. Args: days: Get vulnerabilities added in last N days (default: 30) vendor: Filter by vendor name (optional) Returns: JSON with recent KEVs """ try: feed = get_feed("cisa_kev") if not feed: return json.dumps({"success": False, "error": "CISA KEV feed not configured"}, indent=2) data = await fetch_json(feed.url) vulnerabilities = data.get("vulnerabilities", []) cutoff = datetime.now() - timedelta(days=days) recent = [] for vuln in vulnerabilities: try: date_added = datetime.strptime(vuln.get("dateAdded", "2000-01-01"), "%Y-%m-%d") if date_added >= cutoff: if vendor is None or vendor.lower() in vuln.get("vendorProject", "").lower(): recent.append({ "cve_id": vuln.get("cveID"), "vendor": vuln.get("vendorProject"), "product": vuln.get("product"), "name": vuln.get("vulnerabilityName"), "description": vuln.get("shortDescription"), "date_added": vuln.get("dateAdded"), "due_date": vuln.get("dueDate"), "known_ransomware": vuln.get("knownRansomwareCampaignUse"), "notes": vuln.get("notes") }) except ValueError: continue return json.dumps({ "success": True, "total_kev": len(vulnerabilities), "recent_count": len(recent), "days_checked": days, "vendor_filter": vendor, "vulnerabilities": recent[:50] }, indent=2) except Exception as e: logger.error(f"Error fetching CISA KEV: {e}") return json.dumps({"success": False, "error": str(e)}, indent=2) @mcp.tool() async def get_dashboard_summary() -> str: """ Get a summary of all threat intelligence for dashboard display. Returns: JSON with aggregated threat data for visualization """ ensure_dirs() summary: dict[str, Any] = { "generated_at": get_timestamp(), "feeds": {}, "totals": { "malicious_ips": 0, "malicious_urls": 0, "cidr_blocks": 0, "recent_cves": 0 }, "alerts": [] } # Fetch all feeds in parallel feed_names = list(get_enabled_feeds().keys()) tasks = [fetch_threat_feed(name) for name in feed_names] results = await asyncio.gather(*tasks, return_exceptions=True) for feed_name, result in zip(feed_names, results): if isinstance(result, Exception): summary["feeds"][feed_name] = {"error": str(result)} else: try: data = json.loads(result) summary["feeds"][feed_name] = { "count": data.get("count", 0), "type": data.get("type"), "fetched_at": data.get("fetched_at"), "success": data.get("success", False) } if data.get("type") == "ip_list": summary["totals"]["malicious_ips"] += data.get("count", 0) elif data.get("type") == "url_list": summary["totals"]["malicious_urls"] += data.get("count", 0) elif data.get("type") == "cidr_list": summary["totals"]["cidr_blocks"] += data.get("count", 0) except json.JSONDecodeError as e: logger.error(f"Failed to parse {feed_name} result: {e}") summary["feeds"][feed_name] = {"error": "parse_failed"} # Get CISA KEV count try: kev_result = await get_cisa_kev(days=7) kev_data = json.loads(kev_result) summary["totals"]["recent_cves"] = kev_data.get("recent_count", 0) if kev_data.get("recent_count", 0) > 0: summary["alerts"].append({ "type": "new_kev", "message": f"{kev_data['recent_count']} new CISA KEV in last 7 days", "severity": Severity.HIGH.value }) except Exception as e: logger.warning(f"Error getting KEV summary: {e}") # Add cache stats summary["cache_stats"] = threat_cache.stats() return json.dumps({"success": True, **summary}, indent=2) @mcp.tool() async def get_recent_iocs( ioc_type: Optional[str] = None, limit: int = 100 ) -> str: """ Get recent IOCs (Indicators of Compromise) from ThreatFox. Args: ioc_type: Filter by type (ip:port, domain, url, md5, sha256) limit: Maximum IOCs to return (default: 100, max: 500) Returns: JSON with recent IOCs """ # Validate IOC type if provided if ioc_type: is_valid, error = validate_ioc_type(ioc_type) if not is_valid: return json.dumps({ "success": False, "error": error }, indent=2) # Clamp limit limit = min(max(1, limit), MAX_RESPONSE_ITEMS) try: feed = get_feed("threatfox_iocs") if not feed: return json.dumps({"success": False, "error": "ThreatFox feed not configured"}, indent=2) data = await fetch_json(feed.url) iocs = [] for item in data.get("data", []): ioc = { "ioc": item.get("ioc"), "ioc_type": item.get("ioc_type"), "threat_type": item.get("threat_type"), "malware": item.get("malware"), "malware_printable": item.get("malware_printable"), "confidence": item.get("confidence_level"), "first_seen": item.get("first_seen"), "last_seen": item.get("last_seen"), "tags": item.get("tags", []), "reference": item.get("reference") } if ioc_type is None or ioc.get("ioc_type") == ioc_type: iocs.append(ioc) if len(iocs) >= limit: break return json.dumps({ "success": True, "count": len(iocs), "filter_type": ioc_type, "iocs": iocs }, indent=2) except Exception as e: logger.error(f"Error fetching IOCs: {e}") return json.dumps({"success": False, "error": str(e)}, indent=2) @mcp.tool() async def check_network_against_threats(scan_results: str) -> str: """ Check network scan results against threat intelligence. Args: scan_results: JSON string from network scanner with device IPs Returns: JSON with any matched threats """ try: scan_data = json.loads(scan_results) except json.JSONDecodeError as e: return json.dumps({ "success": False, "error": f"Invalid JSON: {str(e)}" }, indent=2) # Extract devices from various formats devices = scan_data.get("devices", scan_data.get("new_devices", [])) if not devices: return json.dumps({ "success": False, "error": "No devices found in scan results" }, indent=2) # Load threat IPs from all feeds ip_feed_names = get_ip_feeds() threat_ips: set[str] = set() for feed_name in ip_feed_names: try: result = await fetch_threat_feed(feed_name) data = json.loads(result) if data.get("ips"): threat_ips.update(data["ips"]) except Exception as e: logger.warning(f"Error loading {feed_name}: {e}") # Check each device matches = [] for device in devices: ip = device.get("ip") if ip and ip in threat_ips: matches.append({ "device": device, "threat_match": True, "severity": Severity.CRITICAL.value, "recommendation": "Isolate device immediately and investigate" }) return json.dumps({ "success": True, "checked_at": get_timestamp(), "devices_checked": len(devices), "threat_ips_loaded": len(threat_ips), "matches_found": len(matches), "matches": matches, "all_clear": len(matches) == 0, "recommendation": "Network is clean" if len(matches) == 0 else f"ALERT: {len(matches)} devices matched threat intelligence!" }, indent=2) @mcp.tool() async def get_threat_stats() -> str: """ Get statistics about loaded threat data and cache status. Returns: JSON with threat intelligence statistics """ stats: dict[str, Any] = { "generated_at": get_timestamp(), "cache": threat_cache.stats(), "feeds_configured": len(THREAT_FEEDS), "feeds_enabled": len(get_enabled_feeds()), "api_keys": API_KEYS.to_dict(), "ip_feeds": get_ip_feeds() } # Count cached threat data total_ips = 0 total_urls = 0 for feed_name in get_ip_feeds(): cached = threat_cache.get(f"feed_{feed_name}") if cached: total_ips += len(cached.get("ips", [])) for feed_name in ["urlhaus_recent"]: cached = threat_cache.get(f"feed_{feed_name}") if cached: total_urls += len(cached.get("urls", [])) stats["cached_data"] = { "total_threat_ips": total_ips, "total_threat_urls": total_urls } return json.dumps({"success": True, **stats}, indent=2) @mcp.tool() async def clear_threat_cache() -> str: """ Clear the threat intelligence cache to force fresh data fetch. Returns: JSON confirmation """ threat_cache.clear() return json.dumps({ "success": True, "message": "Threat cache cleared", "timestamp": get_timestamp() }, indent=2) # ============================================================================= # Entry Point # ============================================================================= def main(): """Entry point for MCP server.""" ensure_dirs() logger.info("Starting Threat Intelligence MCP server v0.2.0") mcp.run(transport="stdio") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/marc-shade/threat-intel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server