We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/marc-shade/threat-intel-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Defense-Relevant Threat Feed Integration.
Provides integration hooks for defense-specific threat intelligence feeds
including CISA alerts/advisories, NSA cybersecurity advisories, US-CERT
feeds, and DISA IAVA format support. Implements ICD 203 confidence scoring
and DoD impact level classification.
References:
- ICD 203: Analytic Standards (IC Directive)
- CISA: https://www.cisa.gov/
- DISA IAVA: Defense Information Systems Agency
- NIST NVD: https://nvd.nist.gov/
"""
import json
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from xml.etree import ElementTree
import aiohttp
logger = logging.getLogger("threat-intel-mcp.defense-feeds")
# =============================================================================
# DoD Impact Level Classification
# =============================================================================
class DoDImpactLevel(str, Enum):
"""
Department of Defense Impact Levels per CNSSI 1253 and DoD CC SRG.
Maps threat severity to DoD information system impact levels used
for FedRAMP and DoD authorization decisions.
"""
IL2 = "IL2" # Publicly releasable information, low impact
IL4 = "IL4" # Controlled Unclassified Information (CUI)
IL5 = "IL5" # CUI and National Security Systems
IL6 = "IL6" # Classified (SECRET and below)
# Mapping from severity/priority to DoD Impact Levels
SEVERITY_TO_DOD_IMPACT: dict[str, DoDImpactLevel] = {
"critical": DoDImpactLevel.IL6,
"high": DoDImpactLevel.IL5,
"medium": DoDImpactLevel.IL4,
"low": DoDImpactLevel.IL2,
"info": DoDImpactLevel.IL2,
}
def classify_dod_impact(severity: str) -> DoDImpactLevel:
"""
Classify a threat severity to DoD Impact Level.
Args:
severity: Threat severity string (critical, high, medium, low, info)
Returns:
Corresponding DoDImpactLevel
"""
return SEVERITY_TO_DOD_IMPACT.get(severity.lower(), DoDImpactLevel.IL2)
# =============================================================================
# ICD 203: Intelligence Community Analytic Standards - Confidence Scoring
# =============================================================================
class ICD203ConfidenceLevel(str, Enum):
"""
Intelligence Community Directive 203 confidence levels.
ICD 203 establishes standards for analytic production, including
the requirement to express uncertainty through calibrated confidence levels.
The standard defines six levels of analytic confidence.
"""
ALMOST_NO_CONFIDENCE = "almost no confidence"
VERY_LOW = "very low confidence"
LOW = "low confidence"
MODERATE = "moderate confidence"
HIGH = "high confidence"
VERY_HIGH = "very high confidence"
@dataclass
class ICD203Assessment:
"""
Confidence assessment per ICD 203 standards.
Attributes:
confidence_level: ICD 203 confidence level
numeric_score: Numeric score 0-100
source_reliability: Assessment of source reliability (A-F)
information_credibility: Assessment of information credibility (1-6)
analytic_basis: Description of the analytic basis for the assessment
key_assumptions: Key assumptions underlying the assessment
information_gaps: Known information gaps
alternative_analyses: Alternative hypotheses considered
"""
confidence_level: ICD203ConfidenceLevel
numeric_score: int
source_reliability: str = "C" # Default: fairly reliable
information_credibility: str = "3" # Default: possibly true
analytic_basis: str = ""
key_assumptions: list[str] = field(default_factory=list)
information_gaps: list[str] = field(default_factory=list)
alternative_analyses: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"confidence_level": self.confidence_level.value,
"numeric_score": self.numeric_score,
"source_reliability": self.source_reliability,
"source_reliability_description": SOURCE_RELIABILITY_CODES.get(
self.source_reliability, "Unknown"
),
"information_credibility": self.information_credibility,
"information_credibility_description": INFO_CREDIBILITY_CODES.get(
self.information_credibility, "Unknown"
),
"analytic_basis": self.analytic_basis,
"key_assumptions": self.key_assumptions,
"information_gaps": self.information_gaps,
"alternative_analyses": self.alternative_analyses,
}
# NATO/IC Source Reliability Codes (Admiralty Code)
SOURCE_RELIABILITY_CODES: dict[str, str] = {
"A": "Completely Reliable - No doubt of authenticity, trustworthiness, or competency; has a history of complete reliability",
"B": "Usually Reliable - Minor doubt about authenticity, trustworthiness, or competency; has a history of valid information most of the time",
"C": "Fairly Reliable - Doubt of authenticity, trustworthiness, or competency but has provided valid information in the past",
"D": "Not Usually Reliable - Significant doubt about authenticity, trustworthiness, or competency but has provided valid information in the past",
"E": "Unreliable - Lacking in authenticity, trustworthiness, and competency; history of invalid information",
"F": "Reliability Cannot Be Judged - No basis exists for evaluating the reliability of the source",
}
# NATO/IC Information Credibility Codes
INFO_CREDIBILITY_CODES: dict[str, str] = {
"1": "Confirmed - Confirmed by other independent sources; logical in itself; consistent with other information on the subject",
"2": "Probably True - Not confirmed; logical in itself; consistent with other information on the subject",
"3": "Possibly True - Not confirmed; reasonably logical in itself; agrees with some other information on the subject",
"4": "Doubtful - Not confirmed; possible but not logical; no other information on the subject",
"5": "Improbable - Not confirmed; not logical in itself; contradicted by other information on the subject",
"6": "Truth Cannot Be Judged - No basis exists for evaluating the validity of the information",
}
class ICD203ConfidenceScorer:
"""
Confidence scoring engine aligned with ICD 203 Analytic Standards.
Produces calibrated confidence assessments considering source reliability,
information credibility, corroboration, and analytic tradecraft.
"""
# Source reliability scores (maps feed names to reliability grades)
SOURCE_RELIABILITY: dict[str, str] = {
# Government sources - highest reliability
"cisa_kev": "A",
"cisa_alerts": "A",
"cisa_advisories": "A",
"nsa_advisories": "A",
"us_cert": "A",
"disa_iava": "A",
"nvd": "A",
# Established commercial/community threat feeds
"virustotal": "B",
"abuseipdb": "B",
"shodan": "B",
"feodo_tracker": "B",
"urlhaus_recent": "B",
"sslbl_botnet": "B",
"threatfox_iocs": "B",
"emerging_threats_compromised": "B",
# Community feeds
"tor_exit_nodes": "B",
"blocklist_de_all": "C",
"cinsscore_badguys": "C",
"spamhaus_drop": "B",
# Unknown sources
"unknown": "F",
}
def score(
self,
feed_source: str,
raw_confidence: int = 50,
corroboration_count: int = 0,
age_hours: float = 0,
has_context: bool = False,
is_targeted: bool = False,
) -> ICD203Assessment:
"""
Compute an ICD 203-aligned confidence assessment.
Args:
feed_source: Name of the source feed
raw_confidence: Raw confidence from the feed (0-100)
corroboration_count: Number of independent sources confirming
age_hours: Age of the intelligence in hours
has_context: Whether contextual information (malware, TTP) is available
is_targeted: Whether this is targeted (vs. commodity) intelligence
Returns:
ICD203Assessment with calibrated confidence
"""
source_reliability = self.SOURCE_RELIABILITY.get(
feed_source.lower(), "F"
)
# Base score from raw confidence
base_score = max(0, min(100, raw_confidence))
# Source reliability modifier
reliability_modifier = {
"A": 1.0,
"B": 0.85,
"C": 0.7,
"D": 0.5,
"E": 0.3,
"F": 0.4,
}.get(source_reliability, 0.4)
score = base_score * reliability_modifier
# Corroboration bonus (diminishing returns)
if corroboration_count > 0:
corroboration_bonus = min(20, corroboration_count * 8)
score = min(100, score + corroboration_bonus)
# Age penalty (intelligence degrades over time)
if age_hours > 0:
if age_hours > 720: # >30 days
score *= 0.5
elif age_hours > 168: # >7 days
score *= 0.7
elif age_hours > 24: # >1 day
score *= 0.85
# Context bonus
if has_context:
score = min(100, score + 5)
# Targeted intelligence bonus
if is_targeted:
score = min(100, score + 10)
# Determine ICD 203 confidence level (6-level scale)
numeric_score = int(round(score))
if numeric_score >= 90:
level = ICD203ConfidenceLevel.VERY_HIGH
elif numeric_score >= 70:
level = ICD203ConfidenceLevel.HIGH
elif numeric_score >= 50:
level = ICD203ConfidenceLevel.MODERATE
elif numeric_score >= 30:
level = ICD203ConfidenceLevel.LOW
elif numeric_score >= 15:
level = ICD203ConfidenceLevel.VERY_LOW
else:
level = ICD203ConfidenceLevel.ALMOST_NO_CONFIDENCE
# Determine information credibility
if corroboration_count >= 2 and source_reliability in ("A", "B"):
info_credibility = "1" # Confirmed
elif source_reliability in ("A", "B"):
info_credibility = "2" # Probably True
elif corroboration_count >= 1:
info_credibility = "3" # Possibly True
elif source_reliability in ("D", "E"):
info_credibility = "4" # Doubtful
else:
info_credibility = "3" # Default: Possibly True
# Build analytic basis
basis_parts: list[str] = [
f"Source: {feed_source} (reliability: {source_reliability})",
]
if corroboration_count > 0:
basis_parts.append(
f"Corroborated by {corroboration_count} independent source(s)"
)
if age_hours > 0:
basis_parts.append(f"Intelligence age: {age_hours:.1f} hours")
if has_context:
basis_parts.append("Contextual information (TTP/malware) available")
# Information gaps
gaps: list[str] = []
if not has_context:
gaps.append("Limited contextual information (no malware/TTP association)")
if corroboration_count == 0:
gaps.append("No independent corroboration")
if source_reliability in ("E", "F"):
gaps.append("Source reliability is low or cannot be judged")
return ICD203Assessment(
confidence_level=level,
numeric_score=numeric_score,
source_reliability=source_reliability,
information_credibility=info_credibility,
analytic_basis="; ".join(basis_parts),
key_assumptions=[
"Source feed data is authentic and not compromised",
"IOC remains relevant to current threat landscape",
],
information_gaps=gaps,
)
# =============================================================================
# CISA Advisory Fetcher
# =============================================================================
class CISAAdvisoryFetcher:
"""
Fetch and parse CISA (Cybersecurity and Infrastructure Security Agency)
alerts and advisories.
Covers:
- CISA Known Exploited Vulnerabilities (KEV) catalog
- CISA Cybersecurity Alerts
- CISA Cybersecurity Advisories
- ICS-CERT Advisories
"""
# CISA Feed URLs
KEV_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
ALERTS_RSS_URL = "https://www.cisa.gov/cybersecurity-alerts/all.xml"
ADVISORIES_RSS_URL = "https://www.cisa.gov/cybersecurity-advisories/all.xml"
ICS_ADVISORIES_RSS_URL = "https://www.cisa.gov/uscert/ics/advisories/advisories.xml"
def __init__(self, timeout: int = 30) -> None:
self.timeout = timeout
async def fetch_kev(
self,
days: int = 30,
vendor: Optional[str] = None,
) -> dict[str, Any]:
"""
Fetch CISA Known Exploited Vulnerabilities catalog.
Args:
days: Get vulnerabilities added in last N days
vendor: Filter by vendor name (optional)
Returns:
KEV data with filtered vulnerabilities
"""
async with aiohttp.ClientSession() as session:
async with session.get(
self.KEV_URL,
timeout=aiohttp.ClientTimeout(total=self.timeout),
) as response:
response.raise_for_status()
data = await response.json()
vulnerabilities = data.get("vulnerabilities", [])
cutoff = datetime.now() - __import__("datetime").timedelta(days=days)
filtered: list[dict[str, Any]] = []
for vuln in vulnerabilities:
try:
date_added = datetime.strptime(
vuln.get("dateAdded", "2000-01-01"), "%Y-%m-%d"
)
if date_added >= cutoff:
if vendor is None or vendor.lower() in vuln.get("vendorProject", "").lower():
# Enhance with DoD impact classification
is_ransomware = vuln.get("knownRansomwareCampaignUse", "").lower() == "known"
severity = "critical" if is_ransomware else "high"
filtered.append({
"cve_id": vuln.get("cveID"),
"vendor": vuln.get("vendorProject"),
"product": vuln.get("product"),
"name": vuln.get("vulnerabilityName"),
"description": vuln.get("shortDescription"),
"date_added": vuln.get("dateAdded"),
"due_date": vuln.get("dueDate"),
"known_ransomware": is_ransomware,
"required_action": vuln.get("requiredAction"),
"notes": vuln.get("notes"),
"dod_impact_level": classify_dod_impact(severity).value,
"severity": severity,
})
except (ValueError, TypeError):
continue
return {
"source": "CISA KEV",
"source_url": self.KEV_URL,
"catalog_version": data.get("catalogVersion"),
"total_vulnerabilities": len(vulnerabilities),
"filtered_count": len(filtered),
"days_checked": days,
"vendor_filter": vendor,
"vulnerabilities": filtered,
"fetched_at": datetime.now(timezone.utc).isoformat(),
}
async def fetch_alerts_rss(self) -> list[dict[str, Any]]:
"""
Fetch CISA Cybersecurity Alerts via RSS feed.
Returns:
List of alert dicts
"""
return await self._fetch_rss(self.ALERTS_RSS_URL, "CISA Alert")
async def fetch_advisories_rss(self) -> list[dict[str, Any]]:
"""
Fetch CISA Cybersecurity Advisories via RSS feed.
Returns:
List of advisory dicts
"""
return await self._fetch_rss(self.ADVISORIES_RSS_URL, "CISA Advisory")
async def fetch_ics_advisories_rss(self) -> list[dict[str, Any]]:
"""
Fetch CISA ICS-CERT Advisories via RSS feed.
Returns:
List of ICS advisory dicts
"""
return await self._fetch_rss(self.ICS_ADVISORIES_RSS_URL, "ICS-CERT Advisory")
async def _fetch_rss(self, url: str, source_type: str) -> list[dict[str, Any]]:
"""
Fetch and parse an RSS feed from CISA.
Args:
url: RSS feed URL
source_type: Type label for the advisories
Returns:
List of parsed advisory dicts
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url,
timeout=aiohttp.ClientTimeout(total=self.timeout),
) as response:
response.raise_for_status()
content = await response.text()
root = ElementTree.fromstring(content)
items: list[dict[str, Any]] = []
# Handle both RSS 2.0 and Atom namespaces
channel = root.find("channel")
if channel is not None:
for item in channel.findall("item"):
entry = {
"source_type": source_type,
"title": self._get_text(item, "title"),
"link": self._get_text(item, "link"),
"description": self._get_text(item, "description"),
"published": self._get_text(item, "pubDate"),
"guid": self._get_text(item, "guid"),
}
# Extract CVE IDs from title and description
combined_text = f"{entry['title']} {entry['description']}"
cve_pattern = re.compile(r"CVE-\d{4}-\d{4,}")
cves = list(set(cve_pattern.findall(combined_text)))
if cves:
entry["cve_ids"] = cves
items.append(entry)
return items
except aiohttp.ClientError as e:
logger.error(f"Error fetching {source_type} RSS: {e}")
return []
except ElementTree.ParseError as e:
logger.error(f"Error parsing {source_type} RSS XML: {e}")
return []
@staticmethod
def _get_text(element: ElementTree.Element, tag: str) -> str:
"""Safely get text content from an XML element."""
child = element.find(tag)
return child.text.strip() if child is not None and child.text else ""
# =============================================================================
# DISA IAVA (Information Assurance Vulnerability Alert) Format
# =============================================================================
class DIVAFormatParser:
"""
Parser for DISA IAVA/IAVB/IAVT formats.
DISA issues three types of vulnerability notifications:
- IAVA: Information Assurance Vulnerability Alert (must comply)
- IAVB: Information Assurance Vulnerability Bulletin (should comply)
- IAVT: Information Assurance Vulnerability Technical Advisory (informational)
These are typically received as structured messages or documents
within DoD networks.
"""
class IAVAType(str, Enum):
IAVA = "IAVA" # Alert - mandatory compliance required
IAVB = "IAVB" # Bulletin - recommended compliance
IAVT = "IAVT" # Technical Advisory - informational
@dataclass
class IAVARecord:
"""Parsed IAVA/IAVB/IAVT record."""
iava_id: str
iava_type: str
title: str
description: str
severity: str
cve_ids: list[str]
affected_systems: list[str]
required_actions: list[str]
compliance_date: str
release_date: str
supersedes: list[str] = field(default_factory=list)
dod_impact_level: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"iava_id": self.iava_id,
"iava_type": self.iava_type,
"title": self.title,
"description": self.description,
"severity": self.severity,
"cve_ids": self.cve_ids,
"affected_systems": self.affected_systems,
"required_actions": self.required_actions,
"compliance_date": self.compliance_date,
"release_date": self.release_date,
"supersedes": self.supersedes,
"dod_impact_level": self.dod_impact_level,
}
def parse_iava_document(self, document: dict[str, Any]) -> "DIVAFormatParser.IAVARecord":
"""
Parse an IAVA document into a structured record.
Accepts a dict with IAVA fields. This is a framework for parsing
IAVA-formatted documents received via DoD channels.
Args:
document: Dict with IAVA document fields
Returns:
Parsed IAVARecord
"""
iava_id = document.get("iava_id", document.get("id", ""))
iava_type_str = document.get("iava_type", "IAVA").upper()
# Validate IAVA ID format: YYYY-X-NNNN
if not re.match(r"^\d{4}-[ABT]-\d{4}$", iava_id):
logger.warning(f"Non-standard IAVA ID format: {iava_id}")
severity = document.get("severity", "medium").lower()
dod_impact = classify_dod_impact(severity)
return self.IAVARecord(
iava_id=iava_id,
iava_type=iava_type_str,
title=document.get("title", ""),
description=document.get("description", ""),
severity=severity,
cve_ids=document.get("cve_ids", []),
affected_systems=document.get("affected_systems", []),
required_actions=document.get("required_actions", []),
compliance_date=document.get("compliance_date", ""),
release_date=document.get("release_date", ""),
supersedes=document.get("supersedes", []),
dod_impact_level=dod_impact.value,
)
def generate_iava_format(
self,
cve_id: str,
title: str,
description: str,
severity: str,
affected_systems: list[str],
required_actions: list[str],
compliance_date: str,
) -> dict[str, Any]:
"""
Generate an IAVA-format document from vulnerability data.
Useful for converting CISA KEV or NVD data into DISA-compatible format.
Args:
cve_id: CVE identifier
title: Vulnerability title
description: Vulnerability description
severity: Severity level
affected_systems: List of affected systems/products
required_actions: List of required remediation actions
compliance_date: Deadline for compliance
Returns:
IAVA-formatted document dict
"""
year = datetime.now().year
severity_lower = severity.lower()
# Determine IAVA type based on severity
if severity_lower in ("critical", "high"):
iava_type = "IAVA"
elif severity_lower == "medium":
iava_type = "IAVB"
else:
iava_type = "IAVT"
return {
"iava_type": iava_type,
"title": title,
"description": description,
"severity": severity_lower,
"cve_ids": [cve_id],
"affected_systems": affected_systems,
"required_actions": required_actions,
"compliance_date": compliance_date,
"release_date": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"dod_impact_level": classify_dod_impact(severity_lower).value,
"mandatory_compliance": iava_type == "IAVA",
}
# =============================================================================
# Defense Feed Manager
# =============================================================================
class DefenseFeedManager:
"""
Manages integration with multiple defense-specific threat intelligence feeds.
Aggregates data from CISA, NSA advisories, US-CERT, and DISA sources
into a unified threat picture with ICD 203 confidence scoring.
"""
def __init__(self, timeout: int = 30) -> None:
self.cisa_fetcher = CISAAdvisoryFetcher(timeout=timeout)
self.iava_parser = DIVAFormatParser()
self.confidence_scorer = ICD203ConfidenceScorer()
self.timeout = timeout
async def fetch_all_feeds(self) -> dict[str, Any]:
"""
Fetch all available defense feeds and aggregate results.
Returns:
Aggregated feed data with status for each source
"""
results: dict[str, Any] = {
"fetched_at": datetime.now(timezone.utc).isoformat(),
"feeds": {},
"totals": {
"critical_alerts": 0,
"high_alerts": 0,
"total_cves": 0,
"kev_count": 0,
},
}
# Fetch CISA KEV
try:
kev_data = await self.cisa_fetcher.fetch_kev(days=30)
results["feeds"]["cisa_kev"] = {
"status": "success",
"count": kev_data["filtered_count"],
"total_catalog": kev_data["total_vulnerabilities"],
}
results["totals"]["kev_count"] = kev_data["filtered_count"]
# Count by severity
for vuln in kev_data.get("vulnerabilities", []):
if vuln.get("severity") == "critical":
results["totals"]["critical_alerts"] += 1
elif vuln.get("severity") == "high":
results["totals"]["high_alerts"] += 1
results["totals"]["total_cves"] += 1
except Exception as e:
logger.error(f"Error fetching CISA KEV: {e}")
results["feeds"]["cisa_kev"] = {"status": "error", "error": str(e)}
# Fetch CISA Alerts RSS
try:
alerts = await self.cisa_fetcher.fetch_alerts_rss()
results["feeds"]["cisa_alerts"] = {
"status": "success",
"count": len(alerts),
}
except Exception as e:
logger.error(f"Error fetching CISA alerts: {e}")
results["feeds"]["cisa_alerts"] = {"status": "error", "error": str(e)}
# Fetch CISA Advisories RSS
try:
advisories = await self.cisa_fetcher.fetch_advisories_rss()
results["feeds"]["cisa_advisories"] = {
"status": "success",
"count": len(advisories),
}
except Exception as e:
logger.error(f"Error fetching CISA advisories: {e}")
results["feeds"]["cisa_advisories"] = {"status": "error", "error": str(e)}
# Fetch ICS-CERT Advisories
try:
ics_advisories = await self.cisa_fetcher.fetch_ics_advisories_rss()
results["feeds"]["ics_cert"] = {
"status": "success",
"count": len(ics_advisories),
}
except Exception as e:
logger.error(f"Error fetching ICS-CERT advisories: {e}")
results["feeds"]["ics_cert"] = {"status": "error", "error": str(e)}
return results
def score_indicator(
self,
feed_source: str,
raw_confidence: int = 50,
corroboration_count: int = 0,
age_hours: float = 0,
has_context: bool = False,
is_targeted: bool = False,
) -> dict[str, Any]:
"""
Score an indicator using ICD 203 standards.
Args:
feed_source: Source feed name
raw_confidence: Raw confidence from feed
corroboration_count: Number of corroborating sources
age_hours: Age of intelligence in hours
has_context: Whether TTP/malware context is available
is_targeted: Whether this is targeted intelligence
Returns:
ICD 203 assessment as dict
"""
assessment = self.confidence_scorer.score(
feed_source=feed_source,
raw_confidence=raw_confidence,
corroboration_count=corroboration_count,
age_hours=age_hours,
has_context=has_context,
is_targeted=is_targeted,
)
return assessment.to_dict()
def classify_priority(
self,
severity: str,
is_kev: bool = False,
has_active_exploitation: bool = False,
affects_critical_infrastructure: bool = False,
) -> dict[str, Any]:
"""
Classify threat priority per DoD standards.
Args:
severity: Base severity (critical, high, medium, low)
is_kev: Whether this is in the CISA KEV catalog
has_active_exploitation: Whether active exploitation is confirmed
affects_critical_infrastructure: Whether critical infrastructure is affected
Returns:
Priority classification dict
"""
# Start with base severity
priority = severity.lower()
# Elevate based on factors
elevation_factors: list[str] = []
if is_kev:
elevation_factors.append("CISA KEV listed (mandatory remediation)")
if priority in ("medium", "low"):
priority = "high"
if has_active_exploitation:
elevation_factors.append("Active exploitation confirmed")
if priority != "critical":
priority = "critical"
if affects_critical_infrastructure:
elevation_factors.append("Affects critical infrastructure")
if priority in ("medium", "low"):
priority = "high"
dod_impact = classify_dod_impact(priority)
# Determine response timeline
response_timelines: dict[str, str] = {
"critical": "Immediate action required (24 hours)",
"high": "Urgent action required (72 hours)",
"medium": "Action required within remediation window (30 days)",
"low": "Schedule remediation during next maintenance window",
}
return {
"priority": priority.upper(),
"dod_impact_level": dod_impact.value,
"response_timeline": response_timelines.get(priority, "Evaluate and schedule"),
"elevation_factors": elevation_factors,
"mandatory_compliance": is_kev or has_active_exploitation,
"notification_level": "FLASH" if priority == "critical" else "ROUTINE",
}