Skip to main content
Glama
marc-shade

Threat Intelligence MCP Server

by marc-shade
data_fetcher.py6.17 kB
#!/usr/bin/env python3 """ Threat Intelligence Data Fetcher Background service that fetches and caches threat data. """ import asyncio import json import re from datetime import datetime, timedelta from pathlib import Path import aiohttp import os # Paths DATA_DIR = Path(os.path.join(os.environ.get("AGENTIC_SYSTEM_PATH", "${AGENTIC_SYSTEM_PATH:-/opt/agentic}"), "mcp-servers/threat-intel-mcp/data")) CACHE_DIR = DATA_DIR / "cache" THREAT_FEEDS = { "feodo_tracker": { "url": "https://feodotracker.abuse.ch/downloads/ipblocklist_recommended.txt", "type": "ip_list" }, "urlhaus_recent": { "url": "https://urlhaus.abuse.ch/downloads/text_recent/", "type": "url_list" }, "sslbl_botnet": { "url": "https://sslbl.abuse.ch/blacklist/sslipblacklist.txt", "type": "ip_list" }, "tor_exit_nodes": { "url": "https://check.torproject.org/torbulkexitlist", "type": "ip_list" }, } CISA_KEV_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json" async def fetch_url(url: str) -> str: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=30) as response: response.raise_for_status() return await response.text() async def fetch_json(url: str) -> dict: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=30) as response: response.raise_for_status() return await response.json() def parse_ip_list(content: str) -> list: ips = [] for line in content.split('\n'): line = line.strip() if line and not line.startswith('#'): match = re.match(r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', line) if match: ips.append(match.group(1)) return ips def parse_url_list(content: str) -> list: urls = [] for line in content.split('\n'): line = line.strip() if line and not line.startswith('#') and line.startswith('http'): urls.append(line) return urls async def fetch_all_feeds(): """Fetch all threat feeds and cache results.""" CACHE_DIR.mkdir(parents=True, exist_ok=True) summary = { "totals": {"malicious_ips": 0, "malicious_urls": 0, "recent_cves": 0}, "feeds": {}, "alerts": [] } print(f"[{datetime.now().isoformat()}] Fetching threat feeds...") for name, info in THREAT_FEEDS.items(): try: print(f" Fetching {name}...") content = await fetch_url(info["url"]) if info["type"] == "ip_list": items = parse_ip_list(content) summary["feeds"][name] = {"count": len(items), "type": "ip_list"} summary["totals"]["malicious_ips"] += len(items) elif info["type"] == "url_list": items = parse_url_list(content) summary["feeds"][name] = {"count": len(items), "type": "url_list"} summary["totals"]["malicious_urls"] += len(items) # Cache individual feed feed_cache = CACHE_DIR / f"{name}.json" with open(feed_cache, 'w') as f: json.dump({ "timestamp": datetime.now().isoformat(), "count": len(items) if 'items' in dir() else 0, "items": items[:1000] if 'items' in dir() else [] }, f) print(f" {name}: {summary['feeds'][name]['count']} items") except Exception as e: print(f" {name}: ERROR - {e}") summary["feeds"][name] = {"count": 0, "error": str(e)} # Fetch CISA KEV try: print(" Fetching CISA KEV...") kev_data = await fetch_json(CISA_KEV_URL) vulnerabilities = kev_data.get("vulnerabilities", []) # Filter recent cutoff = datetime.now() - timedelta(days=7) recent = [] for vuln in vulnerabilities: try: date_added = datetime.strptime(vuln.get("dateAdded", "2000-01-01"), "%Y-%m-%d") if date_added >= cutoff: recent.append({ "cve_id": vuln.get("cveID"), "vendor": vuln.get("vendorProject"), "product": vuln.get("product"), "name": vuln.get("vulnerabilityName"), "description": vuln.get("shortDescription"), "date_added": vuln.get("dateAdded"), }) except: continue summary["totals"]["recent_cves"] = len(recent) # Cache KEV kev_cache = CACHE_DIR / "kev_cache.json" with open(kev_cache, 'w') as f: json.dump({ "timestamp": datetime.now().isoformat(), "total": len(vulnerabilities), "recent_count": len(recent), "vulnerabilities": recent }, f) print(f" CISA KEV: {len(recent)} recent / {len(vulnerabilities)} total") if len(recent) > 0: summary["alerts"].append({ "type": "new_kev", "message": f"{len(recent)} new CISA KEV in last 7 days", "severity": "high" }) except Exception as e: print(f" CISA KEV: ERROR - {e}") # Save summary cache summary_cache = CACHE_DIR / "summary_cache.json" with open(summary_cache, 'w') as f: json.dump({ "timestamp": datetime.now().isoformat(), "data": summary }, f) print(f"[{datetime.now().isoformat()}] Fetch complete. Summary cached.") return summary async def main(): """Main loop - fetch data every 30 minutes.""" print("Threat Intelligence Data Fetcher starting...") while True: try: await fetch_all_feeds() except Exception as e: print(f"Error in fetch cycle: {e}") # Wait 30 minutes print("Sleeping for 30 minutes...") await asyncio.sleep(1800) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/marc-shade/threat-intel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server