#!/usr/bin/env python3
"""
Threat Intelligence MCP Server v0.3.0
Defense-contractor-grade threat intelligence aggregation with:
- Multi-source threat feeds (10+ sources)
- STIX/TAXII 2.1 protocol support (OASIS standard)
- MITRE ATT&CK framework mapping with Navigator export
- Traffic Light Protocol (TLP 2.0) classification and enforcement
- Tamper-evident IOC provenance chain (SHA-256 hash chain)
- Defense feed integration (CISA, NSA, DISA IAVA)
- NIST SP 800-53 Rev. 5 control mapping
- ICD 203 confidence scoring (Intelligence Community standard)
- IP/Hash/Domain reputation checking (VT, AbuseIPDB, Shodan)
"""
import asyncio
import json
from datetime import datetime, timedelta
from typing import Any, Optional
import aiohttp
from fastmcp import FastMCP
from .config import (
API_KEYS,
THREAT_FEEDS,
FeedType,
Severity,
setup_logging,
ensure_dirs,
get_feed,
get_enabled_feeds,
get_ip_feeds,
get_timestamp,
validate_ip,
validate_hash,
validate_ioc_type,
threat_cache,
DEFAULT_REQUEST_TIMEOUT,
MAX_RESPONSE_ITEMS,
)
from .compliance.stix_taxii import (
TLPMarking,
TLPLevel,
export_to_stix_bundle,
)
from .compliance.mitre_attack import (
map_ioc_to_attack,
generate_attack_navigator_layer,
)
from .compliance.provenance import (
record_provenance,
verify_provenance_chain,
export_provenance_report,
)
from .compliance.defense_feeds import (
DefenseFeedManager,
ICD203ConfidenceScorer,
classify_dod_impact,
)
from .compliance.nist_mapping import (
generate_compliance_report,
)
# Configure logging
logger = setup_logging("threat-intel-mcp")
# Initialize FastMCP
mcp = FastMCP("threat-intel")
# =============================================================================
# HTTP Helpers
# =============================================================================
async def fetch_url(url: str, headers: Optional[dict] = None, timeout: int = DEFAULT_REQUEST_TIMEOUT) -> str:
"""
Fetch URL content with proper error handling.
Args:
url: URL to fetch
headers: Optional request headers
timeout: Request timeout in seconds
Returns:
Response text content
"""
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=timeout) as response:
response.raise_for_status()
return await response.text()
async def fetch_json(url: str, headers: Optional[dict] = None, timeout: int = DEFAULT_REQUEST_TIMEOUT) -> dict:
"""
Fetch JSON from URL with proper error handling.
Args:
url: URL to fetch
headers: Optional request headers
timeout: Request timeout in seconds
Returns:
Parsed JSON as dict
"""
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=timeout) as response:
response.raise_for_status()
return await response.json()
# =============================================================================
# Parsing Helpers
# =============================================================================
def parse_ip_list(content: str) -> list[str]:
"""
Parse IP list from text content with proper validation.
Args:
content: Raw text content with IPs
Returns:
List of valid IP addresses
"""
import ipaddress
ips = []
for line in content.split('\n'):
line = line.strip()
if line and not line.startswith('#'):
# Extract potential IP from line
parts = line.split()
if parts:
potential_ip = parts[0]
try:
# Validate IP using ipaddress module
ipaddress.ip_address(potential_ip)
ips.append(potential_ip)
except ValueError:
continue
return ips
def parse_url_list(content: str) -> list[str]:
"""
Parse URL list from text content.
Args:
content: Raw text content with URLs
Returns:
List of URLs
"""
urls = []
for line in content.split('\n'):
line = line.strip()
if line and not line.startswith('#'):
if line.startswith('http://') or line.startswith('https://'):
urls.append(line)
return urls
def parse_cidr_list(content: str) -> list[str]:
"""
Parse CIDR notation IP ranges.
Args:
content: Raw text with CIDR ranges
Returns:
List of CIDR strings
"""
import ipaddress
cidrs = []
for line in content.split('\n'):
line = line.strip()
if line and not line.startswith(';') and not line.startswith('#'):
parts = line.split()
if parts:
try:
ipaddress.ip_network(parts[0], strict=False)
cidrs.append(parts[0])
except ValueError:
continue
return cidrs
# =============================================================================
# MCP Tools
# =============================================================================
@mcp.tool()
async def get_threat_feeds() -> str:
"""
Get list of all available threat intelligence feeds.
Returns:
JSON with available feeds and their descriptions
"""
feeds = [feed.to_dict() for feed in get_enabled_feeds().values()]
return json.dumps({
"success": True,
"feeds": feeds,
"total_feeds": len(feeds),
"api_configured": API_KEYS.to_dict()
}, indent=2)
@mcp.tool()
async def fetch_threat_feed(feed_name: str) -> str:
"""
Fetch and parse a specific threat intelligence feed.
Args:
feed_name: Name of the feed (feodo_tracker, urlhaus_recent, etc.)
Returns:
JSON with IOCs from the feed
"""
ensure_dirs()
feed = get_feed(feed_name)
if not feed:
available = list(THREAT_FEEDS.keys())
return json.dumps({
"success": False,
"error": f"Unknown feed: {feed_name}",
"available_feeds": available
}, indent=2)
# Check cache
cache_key = f"feed_{feed_name}"
cached = threat_cache.get(cache_key)
if cached:
return json.dumps({
"success": True,
"feed": feed_name,
"description": feed.description,
"cached": True,
**cached
}, indent=2)
try:
result: dict[str, Any] = {}
if feed.feed_type == FeedType.JSON:
data = await fetch_json(feed.url)
result = {
"type": "json",
"data": data if isinstance(data, dict) else {"items": data},
"count": len(data) if isinstance(data, (list, dict)) else 1
}
elif feed.feed_type == FeedType.IP_LIST:
content = await fetch_url(feed.url)
ips = parse_ip_list(content)
result = {
"type": "ip_list",
"ips": ips[:MAX_RESPONSE_ITEMS],
"count": len(ips),
"truncated": len(ips) > MAX_RESPONSE_ITEMS
}
elif feed.feed_type == FeedType.URL_LIST:
content = await fetch_url(feed.url)
urls = parse_url_list(content)
result = {
"type": "url_list",
"urls": urls[:MAX_RESPONSE_ITEMS],
"count": len(urls),
"truncated": len(urls) > MAX_RESPONSE_ITEMS
}
elif feed.feed_type == FeedType.TEXT:
content = await fetch_url(feed.url)
cidrs = parse_cidr_list(content)
result = {
"type": "cidr_list",
"cidrs": cidrs[:MAX_RESPONSE_ITEMS],
"count": len(cidrs),
"truncated": len(cidrs) > MAX_RESPONSE_ITEMS
}
else:
content = await fetch_url(feed.url)
result = {
"type": "text",
"content": content[:10000],
"count": len(content)
}
# Cache result
result["fetched_at"] = get_timestamp()
threat_cache.set(cache_key, result)
return json.dumps({
"success": True,
"feed": feed_name,
"description": feed.description,
"cached": False,
**result
}, indent=2)
except aiohttp.ClientError as e:
logger.error(f"Network error fetching {feed_name}: {e}")
return json.dumps({
"success": False,
"feed": feed_name,
"error": f"Network error: {str(e)}"
}, indent=2)
except Exception as e:
logger.error(f"Error fetching {feed_name}: {e}")
return json.dumps({
"success": False,
"feed": feed_name,
"error": str(e)
}, indent=2)
@mcp.tool()
async def check_ip_reputation(ip: str) -> str:
"""
Check an IP address against multiple threat intelligence sources.
Args:
ip: IP address to check
Returns:
JSON with reputation data from multiple sources
"""
# Validate IP
is_valid, error = validate_ip(ip)
if not is_valid:
return json.dumps({
"success": False,
"error": error
}, indent=2)
ensure_dirs()
results: dict[str, Any] = {
"ip": ip,
"checked_at": get_timestamp(),
"threats_found": [],
"sources_checked": []
}
# Check against cached IP feeds
ip_feed_names = get_ip_feeds()
for feed_name in ip_feed_names:
cache_key = f"feed_{feed_name}"
cached = threat_cache.get(cache_key)
if cached:
feed_ips = cached.get("ips", [])
if ip in feed_ips:
feed = get_feed(feed_name)
results["threats_found"].append({
"source": feed_name,
"description": feed.description if feed else feed_name,
"severity": Severity.HIGH.value
})
results["sources_checked"].append(feed_name)
# Check AbuseIPDB if API key configured
if API_KEYS.has_abuseipdb:
try:
async with aiohttp.ClientSession() as session:
headers = {
"Key": API_KEYS.abuseipdb,
"Accept": "application/json"
}
params = {"ipAddress": ip, "maxAgeInDays": 90}
async with session.get(
"https://api.abuseipdb.com/api/v2/check",
headers=headers,
params=params,
timeout=DEFAULT_REQUEST_TIMEOUT
) as response:
if response.status == 200:
data = await response.json()
results["abuseipdb"] = data.get("data", {})
results["sources_checked"].append("abuseipdb")
confidence = data.get("data", {}).get("abuseConfidenceScore", 0)
if confidence > 50:
results["threats_found"].append({
"source": "abuseipdb",
"confidence": confidence,
"severity": Severity.HIGH.value if confidence > 75 else Severity.MEDIUM.value
})
except Exception as e:
logger.warning(f"AbuseIPDB error: {e}")
results["abuseipdb_error"] = str(e)
# Check VirusTotal if API key configured
if API_KEYS.has_virustotal:
try:
async with aiohttp.ClientSession() as session:
headers = {"x-apikey": API_KEYS.virustotal}
async with session.get(
f"https://www.virustotal.com/api/v3/ip_addresses/{ip}",
headers=headers,
timeout=DEFAULT_REQUEST_TIMEOUT
) as response:
if response.status == 200:
data = await response.json()
stats = data.get("data", {}).get("attributes", {}).get("last_analysis_stats", {})
results["virustotal"] = stats
results["sources_checked"].append("virustotal")
malicious = stats.get("malicious", 0)
if malicious > 0:
results["threats_found"].append({
"source": "virustotal",
"detections": malicious,
"severity": Severity.HIGH.value if malicious > 5 else Severity.MEDIUM.value
})
except Exception as e:
logger.warning(f"VirusTotal error: {e}")
results["virustotal_error"] = str(e)
# Check Shodan if API key configured
if API_KEYS.has_shodan:
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://api.shodan.io/shodan/host/{ip}?key={API_KEYS.shodan}",
timeout=DEFAULT_REQUEST_TIMEOUT
) as response:
if response.status == 200:
data = await response.json()
results["shodan"] = {
"ports": data.get("ports", []),
"hostnames": data.get("hostnames", []),
"country": data.get("country_name"),
"org": data.get("org"),
"vulns": data.get("vulns", [])
}
results["sources_checked"].append("shodan")
if data.get("vulns"):
results["threats_found"].append({
"source": "shodan",
"vulns": len(data["vulns"]),
"severity": Severity.HIGH.value
})
except Exception as e:
logger.warning(f"Shodan error: {e}")
results["shodan_error"] = str(e)
# Calculate overall threat level
results["is_malicious"] = len(results["threats_found"]) > 0
results["threat_level"] = (
Severity.HIGH.value if any(t.get("severity") == Severity.HIGH.value for t in results["threats_found"])
else Severity.MEDIUM.value if results["threats_found"]
else Severity.LOW.value
)
return json.dumps({"success": True, **results}, indent=2)
@mcp.tool()
async def check_hash_reputation(file_hash: str) -> str:
"""
Check a file hash (MD5/SHA1/SHA256) against threat intelligence.
Args:
file_hash: File hash to check
Returns:
JSON with reputation data
"""
# Validate hash
is_valid, hash_type, error = validate_hash(file_hash)
if not is_valid:
return json.dumps({
"success": False,
"error": error
}, indent=2)
results: dict[str, Any] = {
"hash": file_hash.lower(),
"hash_type": hash_type,
"checked_at": get_timestamp(),
"threats_found": []
}
# Check VirusTotal if API key configured
if API_KEYS.has_virustotal:
try:
async with aiohttp.ClientSession() as session:
headers = {"x-apikey": API_KEYS.virustotal}
async with session.get(
f"https://www.virustotal.com/api/v3/files/{file_hash}",
headers=headers,
timeout=DEFAULT_REQUEST_TIMEOUT
) as response:
if response.status == 200:
data = await response.json()
attrs = data.get("data", {}).get("attributes", {})
stats = attrs.get("last_analysis_stats", {})
results["virustotal"] = {
"stats": stats,
"names": attrs.get("names", [])[:10],
"type_description": attrs.get("type_description"),
"reputation": attrs.get("reputation", 0),
"tags": attrs.get("tags", [])[:20]
}
malicious = stats.get("malicious", 0)
if malicious > 0:
results["threats_found"].append({
"source": "virustotal",
"detections": malicious,
"total_scanners": sum(stats.values()),
"severity": Severity.CRITICAL.value if malicious > 10 else Severity.HIGH.value
})
elif response.status == 404:
results["virustotal"] = {"status": "not_found"}
except Exception as e:
logger.warning(f"VirusTotal error: {e}")
results["virustotal_error"] = str(e)
else:
results["note"] = "Configure VIRUSTOTAL_API_KEY for hash lookups"
results["is_malicious"] = len(results["threats_found"]) > 0
return json.dumps({"success": True, **results}, indent=2)
@mcp.tool()
async def check_bulk_ips(ips: str) -> str:
"""
Check multiple IP addresses against threat feeds in bulk.
Args:
ips: JSON array of IP addresses or comma-separated list
Returns:
JSON with reputation results for all IPs
"""
# Parse input
try:
if ips.startswith('['):
ip_list = json.loads(ips)
else:
ip_list = [ip.strip() for ip in ips.split(',') if ip.strip()]
except json.JSONDecodeError:
ip_list = [ip.strip() for ip in ips.split(',') if ip.strip()]
if not ip_list:
return json.dumps({
"success": False,
"error": "No valid IPs provided"
}, indent=2)
if len(ip_list) > 100:
return json.dumps({
"success": False,
"error": "Maximum 100 IPs per request"
}, indent=2)
# First, ensure we have threat data loaded
ip_feed_names = get_ip_feeds()
threat_ips: set[str] = set()
for feed_name in ip_feed_names:
cache_key = f"feed_{feed_name}"
cached = threat_cache.get(cache_key)
if cached:
threat_ips.update(cached.get("ips", []))
else:
# Fetch if not cached
try:
result = await fetch_threat_feed(feed_name)
data = json.loads(result)
if data.get("success") and data.get("ips"):
threat_ips.update(data["ips"])
except Exception as e:
logger.warning(f"Error loading {feed_name}: {e}")
# Check each IP
results: list[dict] = []
malicious_count = 0
for ip in ip_list:
is_valid, error = validate_ip(ip)
if not is_valid:
results.append({
"ip": ip,
"valid": False,
"error": error
})
continue
is_threat = ip in threat_ips
if is_threat:
malicious_count += 1
results.append({
"ip": ip,
"valid": True,
"is_malicious": is_threat,
"threat_level": Severity.HIGH.value if is_threat else Severity.LOW.value
})
return json.dumps({
"success": True,
"checked_at": get_timestamp(),
"total_checked": len(ip_list),
"malicious_count": malicious_count,
"clean_count": len(ip_list) - malicious_count,
"feeds_loaded": len(ip_feed_names),
"threat_ips_available": len(threat_ips),
"results": results
}, indent=2)
@mcp.tool()
async def get_cisa_kev(
days: int = 30,
vendor: Optional[str] = None
) -> str:
"""
Get CISA Known Exploited Vulnerabilities.
Args:
days: Get vulnerabilities added in last N days (default: 30)
vendor: Filter by vendor name (optional)
Returns:
JSON with recent KEVs
"""
try:
feed = get_feed("cisa_kev")
if not feed:
return json.dumps({"success": False, "error": "CISA KEV feed not configured"}, indent=2)
data = await fetch_json(feed.url)
vulnerabilities = data.get("vulnerabilities", [])
cutoff = datetime.now() - timedelta(days=days)
recent = []
for vuln in vulnerabilities:
try:
date_added = datetime.strptime(vuln.get("dateAdded", "2000-01-01"), "%Y-%m-%d")
if date_added >= cutoff:
if vendor is None or vendor.lower() in vuln.get("vendorProject", "").lower():
recent.append({
"cve_id": vuln.get("cveID"),
"vendor": vuln.get("vendorProject"),
"product": vuln.get("product"),
"name": vuln.get("vulnerabilityName"),
"description": vuln.get("shortDescription"),
"date_added": vuln.get("dateAdded"),
"due_date": vuln.get("dueDate"),
"known_ransomware": vuln.get("knownRansomwareCampaignUse"),
"notes": vuln.get("notes")
})
except ValueError:
continue
return json.dumps({
"success": True,
"total_kev": len(vulnerabilities),
"recent_count": len(recent),
"days_checked": days,
"vendor_filter": vendor,
"vulnerabilities": recent[:50]
}, indent=2)
except Exception as e:
logger.error(f"Error fetching CISA KEV: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
@mcp.tool()
async def get_dashboard_summary() -> str:
"""
Get a summary of all threat intelligence for dashboard display.
Returns:
JSON with aggregated threat data for visualization
"""
ensure_dirs()
summary: dict[str, Any] = {
"generated_at": get_timestamp(),
"feeds": {},
"totals": {
"malicious_ips": 0,
"malicious_urls": 0,
"cidr_blocks": 0,
"recent_cves": 0
},
"alerts": []
}
# Fetch all feeds in parallel
feed_names = list(get_enabled_feeds().keys())
tasks = [fetch_threat_feed(name) for name in feed_names]
results = await asyncio.gather(*tasks, return_exceptions=True)
for feed_name, result in zip(feed_names, results):
if isinstance(result, Exception):
summary["feeds"][feed_name] = {"error": str(result)}
else:
try:
data = json.loads(result)
summary["feeds"][feed_name] = {
"count": data.get("count", 0),
"type": data.get("type"),
"fetched_at": data.get("fetched_at"),
"success": data.get("success", False)
}
if data.get("type") == "ip_list":
summary["totals"]["malicious_ips"] += data.get("count", 0)
elif data.get("type") == "url_list":
summary["totals"]["malicious_urls"] += data.get("count", 0)
elif data.get("type") == "cidr_list":
summary["totals"]["cidr_blocks"] += data.get("count", 0)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse {feed_name} result: {e}")
summary["feeds"][feed_name] = {"error": "parse_failed"}
# Get CISA KEV count
try:
kev_result = await get_cisa_kev(days=7)
kev_data = json.loads(kev_result)
summary["totals"]["recent_cves"] = kev_data.get("recent_count", 0)
if kev_data.get("recent_count", 0) > 0:
summary["alerts"].append({
"type": "new_kev",
"message": f"{kev_data['recent_count']} new CISA KEV in last 7 days",
"severity": Severity.HIGH.value
})
except Exception as e:
logger.warning(f"Error getting KEV summary: {e}")
# Add cache stats
summary["cache_stats"] = threat_cache.stats()
return json.dumps({"success": True, **summary}, indent=2)
@mcp.tool()
async def get_recent_iocs(
ioc_type: Optional[str] = None,
limit: int = 100
) -> str:
"""
Get recent IOCs (Indicators of Compromise) from ThreatFox.
Args:
ioc_type: Filter by type (ip:port, domain, url, md5, sha256)
limit: Maximum IOCs to return (default: 100, max: 500)
Returns:
JSON with recent IOCs
"""
# Validate IOC type if provided
if ioc_type:
is_valid, error = validate_ioc_type(ioc_type)
if not is_valid:
return json.dumps({
"success": False,
"error": error
}, indent=2)
# Clamp limit
limit = min(max(1, limit), MAX_RESPONSE_ITEMS)
try:
feed = get_feed("threatfox_iocs")
if not feed:
return json.dumps({"success": False, "error": "ThreatFox feed not configured"}, indent=2)
data = await fetch_json(feed.url)
iocs = []
for item in data.get("data", []):
ioc = {
"ioc": item.get("ioc"),
"ioc_type": item.get("ioc_type"),
"threat_type": item.get("threat_type"),
"malware": item.get("malware"),
"malware_printable": item.get("malware_printable"),
"confidence": item.get("confidence_level"),
"first_seen": item.get("first_seen"),
"last_seen": item.get("last_seen"),
"tags": item.get("tags", []),
"reference": item.get("reference")
}
if ioc_type is None or ioc.get("ioc_type") == ioc_type:
iocs.append(ioc)
if len(iocs) >= limit:
break
return json.dumps({
"success": True,
"count": len(iocs),
"filter_type": ioc_type,
"iocs": iocs
}, indent=2)
except Exception as e:
logger.error(f"Error fetching IOCs: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
@mcp.tool()
async def check_network_against_threats(scan_results: str) -> str:
"""
Check network scan results against threat intelligence.
Args:
scan_results: JSON string from network scanner with device IPs
Returns:
JSON with any matched threats
"""
try:
scan_data = json.loads(scan_results)
except json.JSONDecodeError as e:
return json.dumps({
"success": False,
"error": f"Invalid JSON: {str(e)}"
}, indent=2)
# Extract devices from various formats
devices = scan_data.get("devices", scan_data.get("new_devices", []))
if not devices:
return json.dumps({
"success": False,
"error": "No devices found in scan results"
}, indent=2)
# Load threat IPs from all feeds
ip_feed_names = get_ip_feeds()
threat_ips: set[str] = set()
for feed_name in ip_feed_names:
try:
result = await fetch_threat_feed(feed_name)
data = json.loads(result)
if data.get("ips"):
threat_ips.update(data["ips"])
except Exception as e:
logger.warning(f"Error loading {feed_name}: {e}")
# Check each device
matches = []
for device in devices:
ip = device.get("ip")
if ip and ip in threat_ips:
matches.append({
"device": device,
"threat_match": True,
"severity": Severity.CRITICAL.value,
"recommendation": "Isolate device immediately and investigate"
})
return json.dumps({
"success": True,
"checked_at": get_timestamp(),
"devices_checked": len(devices),
"threat_ips_loaded": len(threat_ips),
"matches_found": len(matches),
"matches": matches,
"all_clear": len(matches) == 0,
"recommendation": "Network is clean" if len(matches) == 0 else f"ALERT: {len(matches)} devices matched threat intelligence!"
}, indent=2)
@mcp.tool()
async def get_threat_stats() -> str:
"""
Get statistics about loaded threat data and cache status.
Returns:
JSON with threat intelligence statistics
"""
stats: dict[str, Any] = {
"generated_at": get_timestamp(),
"cache": threat_cache.stats(),
"feeds_configured": len(THREAT_FEEDS),
"feeds_enabled": len(get_enabled_feeds()),
"api_keys": API_KEYS.to_dict(),
"ip_feeds": get_ip_feeds()
}
# Count cached threat data
total_ips = 0
total_urls = 0
for feed_name in get_ip_feeds():
cached = threat_cache.get(f"feed_{feed_name}")
if cached:
total_ips += len(cached.get("ips", []))
for feed_name in ["urlhaus_recent"]:
cached = threat_cache.get(f"feed_{feed_name}")
if cached:
total_urls += len(cached.get("urls", []))
stats["cached_data"] = {
"total_threat_ips": total_ips,
"total_threat_urls": total_urls
}
return json.dumps({"success": True, **stats}, indent=2)
@mcp.tool()
async def clear_threat_cache() -> str:
"""
Clear the threat intelligence cache to force fresh data fetch.
Returns:
JSON confirmation
"""
threat_cache.clear()
return json.dumps({
"success": True,
"message": "Threat cache cleared",
"timestamp": get_timestamp()
}, indent=2)
# =============================================================================
# Defense Compliance MCP Tools
# =============================================================================
@mcp.tool()
async def threat_stix_export(
feed_name: Optional[str] = None,
ioc_type: Optional[str] = None,
tlp_level: str = "TLP:AMBER",
limit: int = 100,
) -> str:
"""
Export threat indicators as a STIX 2.1 bundle with TLP marking.
Generates OASIS-compliant STIX 2.1 objects including indicators,
malware objects, and relationships. Suitable for sharing via TAXII
or with sector ISACs.
Args:
feed_name: Specific feed to export (default: all recent IOCs)
ioc_type: Filter by IOC type (ip, domain, url, md5, sha256, ip:port)
tlp_level: TLP marking level (TLP:CLEAR, TLP:GREEN, TLP:AMBER, TLP:AMBER+STRICT, TLP:RED)
limit: Maximum indicators to include (default: 100, max: 500)
Returns:
JSON STIX 2.1 Bundle
"""
try:
# Parse TLP level
try:
tlp = TLPLevel(tlp_level)
except ValueError:
valid_levels = [level.value for level in TLPLevel]
return json.dumps({
"success": False,
"error": f"Invalid TLP level: {tlp_level}. Valid: {valid_levels}",
}, indent=2)
limit = min(max(1, limit), MAX_RESPONSE_ITEMS)
# Collect indicators
indicators: list[dict[str, Any]] = []
if feed_name:
# Export from specific feed
result = await fetch_threat_feed(feed_name)
data = json.loads(result)
if not data.get("success"):
return result
# Convert feed data to indicator format
if data.get("type") == "ip_list":
for ip in data.get("ips", [])[:limit]:
indicators.append({
"ioc_type": "ip",
"ioc": ip,
"name": f"Malicious IP from {feed_name}",
"confidence": 75,
"threat_type": "malicious-activity",
})
elif data.get("type") == "url_list":
for url in data.get("urls", [])[:limit]:
indicators.append({
"ioc_type": "url",
"ioc": url,
"name": f"Malicious URL from {feed_name}",
"confidence": 75,
"threat_type": "malicious-activity",
})
else:
# Export from ThreatFox (richest IOC data)
result = await get_recent_iocs(ioc_type=ioc_type, limit=limit)
data = json.loads(result)
if data.get("success") and data.get("iocs"):
indicators = data["iocs"]
if not indicators:
return json.dumps({
"success": True,
"message": "No indicators to export",
"bundle": {"type": "bundle", "objects": []},
}, indent=2)
# Generate STIX bundle
bundle = export_to_stix_bundle(
indicators=indicators,
tlp_marking=tlp,
include_relationships=True,
identity_name="Threat Intel MCP",
)
return json.dumps({
"success": True,
"tlp_level": tlp_level,
"indicators_exported": len(indicators),
"stix_objects_count": len(bundle.get("objects", [])),
"bundle": bundle,
}, indent=2)
except Exception as e:
logger.error(f"Error exporting STIX bundle: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
@mcp.tool()
async def threat_attack_map(
ioc_type: str,
ioc_value: str,
context: str = "",
) -> str:
"""
Map an IOC to MITRE ATT&CK techniques with context-aware matching.
Uses IOC type and contextual information (malware name, threat type,
tags) to identify relevant ATT&CK techniques across the Enterprise
matrix.
Args:
ioc_type: IOC type (ip, domain, url, md5, sha1, sha256, email, ip:port)
ioc_value: The IOC value to map
context: Additional context (malware name, tags, threat description)
Returns:
JSON with matched ATT&CK techniques and tactical breakdown
"""
try:
techniques = map_ioc_to_attack(ioc_type, ioc_value, context)
# Group by tactic
tactic_groups: dict[str, list[dict[str, Any]]] = {}
for tech in techniques:
for tactic in tech.get("tactics", []):
if tactic not in tactic_groups:
tactic_groups[tactic] = []
tactic_groups[tactic].append({
"technique_id": tech["technique_id"],
"name": tech["name"],
"url": tech["url"],
})
return json.dumps({
"success": True,
"ioc_type": ioc_type,
"ioc_value": ioc_value,
"context": context,
"techniques_matched": len(techniques),
"techniques": techniques,
"tactic_breakdown": tactic_groups,
"kill_chain_coverage": list(tactic_groups.keys()),
}, indent=2)
except Exception as e:
logger.error(f"Error mapping IOC to ATT&CK: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
@mcp.tool()
async def threat_attack_navigator(
feed_name: Optional[str] = None,
ioc_type: Optional[str] = None,
limit: int = 200,
layer_name: str = "Threat Intelligence Coverage",
) -> str:
"""
Generate a MITRE ATT&CK Navigator layer from threat indicators.
Produces a JSON file compatible with the ATT&CK Navigator tool,
showing technique coverage and heatmap based on indicator volume.
Args:
feed_name: Specific feed to analyze (default: ThreatFox recent IOCs)
ioc_type: Filter by IOC type
limit: Maximum indicators to analyze (default: 200)
layer_name: Name for the Navigator layer
Returns:
JSON ATT&CK Navigator layer (v4.5 format)
"""
try:
# Collect indicators
indicators: list[dict[str, Any]] = []
if feed_name:
result = await fetch_threat_feed(feed_name)
data = json.loads(result)
if data.get("success"):
if data.get("type") == "ip_list":
for ip in data.get("ips", [])[:limit]:
indicators.append({"ioc_type": "ip", "ioc": ip})
elif data.get("type") == "url_list":
for url in data.get("urls", [])[:limit]:
indicators.append({"ioc_type": "url", "ioc": url})
else:
result = await get_recent_iocs(ioc_type=ioc_type, limit=limit)
data = json.loads(result)
if data.get("success") and data.get("iocs"):
indicators = data["iocs"]
if not indicators:
return json.dumps({
"success": True,
"message": "No indicators available for mapping",
}, indent=2)
layer = generate_attack_navigator_layer(
indicators=indicators,
layer_name=layer_name,
)
return json.dumps({
"success": True,
"indicators_analyzed": len(indicators),
"techniques_mapped": len(layer.get("techniques", [])),
"navigator_layer": layer,
}, indent=2)
except Exception as e:
logger.error(f"Error generating ATT&CK Navigator layer: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
@mcp.tool()
async def threat_provenance(
action: str,
ioc_id: Optional[str] = None,
actor: str = "system",
details: str = "{}",
ioc_type: str = "",
ioc_value: str = "",
) -> str:
"""
Query or update the IOC provenance chain for chain-of-custody tracking.
Supports recording actions, verifying chain integrity, and exporting
legal/audit reports with tamper-evident SHA-256 hash chains.
Actions:
- 'record': Record a new provenance entry
- 'verify': Verify chain integrity for an IOC
- 'report': Export full provenance report
- 'query': Get chain summary for an IOC
Args:
action: One of 'record', 'verify', 'report', 'query'
ioc_id: IOC identifier (required for all actions)
actor: Entity performing the action (for 'record')
details: JSON string with action details (for 'record')
ioc_type: IOC type (required for first 'record' of a new IOC)
ioc_value: IOC value (required for first 'record' of a new IOC)
Returns:
JSON with provenance data
"""
try:
if not ioc_id and action != "record":
return json.dumps({
"success": False,
"error": "ioc_id is required",
}, indent=2)
if action == "record":
if not ioc_id:
return json.dumps({
"success": False,
"error": "ioc_id is required for recording",
}, indent=2)
try:
detail_dict = json.loads(details) if isinstance(details, str) else details
except json.JSONDecodeError:
detail_dict = {"raw": details}
# Determine provenance action type from details
prov_action = detail_dict.pop("provenance_action", "ingestion")
record = record_provenance(
ioc_id=ioc_id,
action=prov_action,
actor=actor,
details=detail_dict,
ioc_type=ioc_type,
ioc_value=ioc_value,
)
return json.dumps({
"success": True,
"action": "recorded",
"record": record,
}, indent=2)
elif action == "verify":
result = verify_provenance_chain(ioc_id)
return json.dumps({
"success": True,
"action": "verify",
**result,
}, indent=2)
elif action == "report":
report = export_provenance_report(ioc_id)
return json.dumps({
"success": True,
"action": "report",
"report": report,
}, indent=2)
elif action == "query":
from .compliance.provenance import _get_default_chain
chain = _get_default_chain()
summary = chain.get_ioc_summary(ioc_id)
return json.dumps({
"success": True,
"action": "query",
**summary,
}, indent=2)
else:
return json.dumps({
"success": False,
"error": f"Unknown action: {action}. Valid: record, verify, report, query",
}, indent=2)
except ValueError as e:
return json.dumps({"success": False, "error": str(e)}, indent=2)
except Exception as e:
logger.error(f"Error in provenance operation: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
@mcp.tool()
async def threat_compliance_report(
baseline: str = "MODERATE",
include_details: bool = True,
) -> str:
"""
Generate NIST SP 800-53 Rev. 5 compliance posture report.
Assesses the threat intelligence program's alignment with NIST
security controls and Cybersecurity Framework functions. Maps
active capabilities to specific controls for compliance evidence.
Args:
baseline: Target baseline level (LOW, MODERATE, HIGH)
include_details: Include full control details in report
Returns:
JSON compliance report with control mapping and gap analysis
"""
try:
if baseline.upper() not in ("LOW", "MODERATE", "HIGH"):
return json.dumps({
"success": False,
"error": f"Invalid baseline: {baseline}. Valid: LOW, MODERATE, HIGH",
}, indent=2)
# Build capabilities from actual runtime state
capability_status: dict[str, bool] = {
# Core capabilities: active if feeds are configured
"threat_feed_aggregation": len(get_enabled_feeds()) > 0,
"cisa_kev_monitoring": get_feed("cisa_kev") is not None,
"defense_feed_integration": get_feed("cisa_kev") is not None,
# Reputation checks: active if relevant API keys are set
"ip_reputation_check": API_KEYS.has_abuseipdb or API_KEYS.has_virustotal or API_KEYS.has_shodan,
"hash_reputation_check": API_KEYS.has_virustotal,
"domain_reputation_check": API_KEYS.has_virustotal,
"bulk_ip_check": len(get_ip_feeds()) > 0,
"network_threat_check": len(get_ip_feeds()) > 0,
# Standards capabilities: always available (built-in modules)
"stix_export": True,
"mitre_attack_mapping": True,
"provenance_tracking": True,
"tlp_enforcement": True,
"confidence_scoring": True,
"compliance_reporting": True,
"dashboard_visualization": True,
# TAXII sharing: independent of VT/AbuseIPDB keys
"taxii_sharing": True,
}
report = generate_compliance_report(
capability_status=capability_status,
include_control_details=include_details,
baseline=baseline,
)
return json.dumps({
"success": True,
**report,
}, indent=2)
except Exception as e:
logger.error(f"Error generating compliance report: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
@mcp.tool()
async def threat_tlp_classify(
tlp_level: str,
target_scope: str = "organization",
indicators: str = "[]",
) -> str:
"""
Classify and enforce Traffic Light Protocol (TLP 2.0) markings.
Validates that sharing at the requested scope is permitted under
TLP rules. Applies TLP markings to indicator sets for export.
TLP Levels:
- TLP:CLEAR - No restrictions
- TLP:GREEN - Community sharing only
- TLP:AMBER - Organization and clients (need-to-know)
- TLP:AMBER+STRICT - Organization only
- TLP:RED - Named recipients only
Args:
tlp_level: TLP marking level
target_scope: Sharing scope (public, community, organization, named_recipients)
indicators: JSON array of indicator dicts to mark (optional)
Returns:
JSON with TLP classification, sharing rules, and enforcement result
"""
try:
# Parse TLP level
try:
tlp = TLPLevel(tlp_level)
except ValueError:
valid_levels = [level.value for level in TLPLevel]
return json.dumps({
"success": False,
"error": f"Invalid TLP level: {tlp_level}. Valid: {valid_levels}",
}, indent=2)
marking = TLPMarking(level=tlp)
# Check sharing rules
valid_scopes = ["public", "community", "organization", "named_recipients"]
if target_scope not in valid_scopes:
return json.dumps({
"success": False,
"error": f"Invalid target scope: {target_scope}. Valid: {valid_scopes}",
}, indent=2)
can_share = marking.can_share_with(target_scope)
result: dict[str, Any] = {
"success": True,
"tlp_level": tlp_level,
"target_scope": target_scope,
"sharing_permitted": can_share,
"sharing_rules": marking.sharing_rules,
"marking_definition_id": marking.marking_definition_id,
}
if not can_share:
result["violation_warning"] = (
f"Sharing at scope '{target_scope}' would violate {tlp_level}. "
f"Restriction: {marking.sharing_rules.get('restrictions', '')}"
)
# Mark indicators if provided
try:
indicator_list = json.loads(indicators) if isinstance(indicators, str) else indicators
except json.JSONDecodeError:
indicator_list = []
if indicator_list:
result["marked_indicators_count"] = len(indicator_list)
result["stix_marking_object"] = marking.to_stix_object()
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error in TLP classification: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
@mcp.tool()
async def threat_taxii_fetch(
server_url: str,
collection_id: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
limit: int = 50,
added_after: Optional[str] = None,
) -> str:
"""
Fetch threat intelligence from a TAXII 2.1 server.
Connects to a TAXII 2.1 compliant server to discover collections
and retrieve STIX 2.1 objects. Supports HTTP Basic and Bearer token
authentication.
Args:
server_url: TAXII server base URL (e.g., https://taxii.example.com)
collection_id: Specific collection ID to fetch from. If not provided,
lists available collections from the first API root.
username: HTTP Basic auth username (optional)
password: HTTP Basic auth password (optional)
api_key: Bearer token API key (optional)
limit: Maximum objects to retrieve (default: 50, max: 500)
added_after: ISO 8601 timestamp to filter objects added after this date
Returns:
JSON with TAXII server info, collections, or STIX objects
"""
from .compliance.stix_taxii import TAXIIClient
try:
limit = min(max(1, limit), MAX_RESPONSE_ITEMS)
client = TAXIIClient(
server_url=server_url,
username=username,
password=password,
api_key=api_key,
)
# Step 1: Discover server
discovery = await client.discover()
api_roots = discovery.get("api_roots", [])
if not api_roots:
return json.dumps({
"success": True,
"server_title": discovery.get("title", "Unknown"),
"message": "No API roots available on this TAXII server",
"discovery": discovery,
}, indent=2)
# Step 2: List collections from first API root
api_root_path = api_roots[0]
if not api_root_path.startswith("/"):
# Handle full URLs by extracting the path
from urllib.parse import urlparse
api_root_path = urlparse(api_root_path).path
collections = await client.get_collections(api_root_path)
if not collection_id:
# Return available collections
return json.dumps({
"success": True,
"server_title": discovery.get("title", "Unknown"),
"api_root": api_root_path,
"collections": collections,
"collection_count": len(collections),
"message": "Provide collection_id to fetch objects from a specific collection",
}, indent=2)
# Step 3: Fetch objects from specific collection
objects = await client.get_objects(
api_root_path=api_root_path,
collection_id=collection_id,
added_after=added_after,
limit=limit,
)
stix_objects = objects.get("objects", [])
return json.dumps({
"success": True,
"server_title": discovery.get("title", "Unknown"),
"collection_id": collection_id,
"objects_retrieved": len(stix_objects),
"objects": stix_objects[:limit],
"more": objects.get("more", False),
}, indent=2)
except Exception as e:
logger.error(f"Error fetching from TAXII server: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
@mcp.tool()
async def threat_defense_feeds() -> str:
"""
Fetch and aggregate defense-specific threat intelligence feeds.
Retrieves data from CISA advisories, DISA IAVA bulletins, and other
defense-community sources. Applies ICD 203 confidence scoring to
all retrieved intelligence.
Returns:
JSON with aggregated defense feed data, confidence scores, and
DoD impact classifications
"""
try:
feed_manager = DefenseFeedManager()
results = await feed_manager.fetch_all_feeds()
# Apply ICD 203 confidence scoring to aggregated results
scorer = ICD203ConfidenceScorer()
feeds = results.get("feeds", {})
scored_items: list[dict[str, Any]] = []
for feed_name, feed_data in feeds.items():
if not isinstance(feed_data, dict):
continue
items = feed_data.get("advisories", feed_data.get("items", []))
if not isinstance(items, list):
continue
for item in items[:20]: # Top 20 per feed
if not isinstance(item, dict):
continue
severity = item.get("severity", item.get("cvss_severity", "medium"))
confidence = scorer.score(
feed_source=feed_name,
raw_confidence=item.get("confidence", 50),
has_context=bool(item.get("description")),
)
impact = classify_dod_impact(str(severity))
scored_items.append({
"source": feed_name,
"title": item.get("title", item.get("id", "Unknown")),
"confidence": {
"level": confidence.confidence_level.value,
"numeric": confidence.numeric_score,
"source_reliability": confidence.source_reliability,
},
"dod_impact": impact.value,
})
feed_summary: dict[str, Any] = {}
for name, data in feeds.items():
if isinstance(data, dict):
feed_summary[name] = {
"status": data.get("status", "unknown"),
"count": data.get("count", 0),
}
else:
feed_summary[name] = {"status": "error", "count": 0}
return json.dumps({
"success": True,
"feeds_fetched": len(feeds),
"feed_summary": feed_summary,
"totals": results.get("totals", {}),
"scored_items": scored_items,
"scored_item_count": len(scored_items),
"confidence_methodology": "ICD 203 (Intelligence Community Directive)",
}, indent=2)
except Exception as e:
logger.error(f"Error fetching defense feeds: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
# =============================================================================
# Entry Point
# =============================================================================
def main():
"""Entry point for MCP server."""
ensure_dirs()
logger.info("Starting Threat Intelligence MCP server v0.3.0 (Defense-Grade)")
mcp.run(transport="stdio")
if __name__ == "__main__":
main()