"""
MITRE ATT&CK Framework Mapping.
Maps threat indicators to MITRE ATT&CK techniques, tactics, and procedures.
Generates ATT&CK Navigator layers for visualization.
References:
- MITRE ATT&CK: https://attack.mitre.org/
- ATT&CK Navigator: https://mitre-attack.github.io/attack-navigator/
- Enterprise Matrix: https://attack.mitre.org/matrices/enterprise/
"""
import json
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
logger = logging.getLogger("threat-intel-mcp.mitre")
# =============================================================================
# ATT&CK Tactic Definitions (Enterprise Matrix)
# =============================================================================
class ATTACKTactic(str, Enum):
"""MITRE ATT&CK Enterprise tactics in kill chain order."""
RECONNAISSANCE = "reconnaissance"
RESOURCE_DEVELOPMENT = "resource-development"
INITIAL_ACCESS = "initial-access"
EXECUTION = "execution"
PERSISTENCE = "persistence"
PRIVILEGE_ESCALATION = "privilege-escalation"
DEFENSE_EVASION = "defense-evasion"
CREDENTIAL_ACCESS = "credential-access"
DISCOVERY = "discovery"
LATERAL_MOVEMENT = "lateral-movement"
COLLECTION = "collection"
COMMAND_AND_CONTROL = "command-and-control"
EXFILTRATION = "exfiltration"
IMPACT = "impact"
TACTIC_METADATA: dict[ATTACKTactic, dict[str, str]] = {
ATTACKTactic.RECONNAISSANCE: {
"id": "TA0043",
"name": "Reconnaissance",
"description": "Gathering information to plan future adversary operations.",
},
ATTACKTactic.RESOURCE_DEVELOPMENT: {
"id": "TA0042",
"name": "Resource Development",
"description": "Establishing resources for adversary operations.",
},
ATTACKTactic.INITIAL_ACCESS: {
"id": "TA0001",
"name": "Initial Access",
"description": "Trying to get into your network.",
},
ATTACKTactic.EXECUTION: {
"id": "TA0002",
"name": "Execution",
"description": "Trying to run malicious code.",
},
ATTACKTactic.PERSISTENCE: {
"id": "TA0003",
"name": "Persistence",
"description": "Trying to maintain their foothold.",
},
ATTACKTactic.PRIVILEGE_ESCALATION: {
"id": "TA0004",
"name": "Privilege Escalation",
"description": "Trying to gain higher-level permissions.",
},
ATTACKTactic.DEFENSE_EVASION: {
"id": "TA0005",
"name": "Defense Evasion",
"description": "Trying to avoid being detected.",
},
ATTACKTactic.CREDENTIAL_ACCESS: {
"id": "TA0006",
"name": "Credential Access",
"description": "Trying to steal account names and passwords.",
},
ATTACKTactic.DISCOVERY: {
"id": "TA0007",
"name": "Discovery",
"description": "Trying to figure out your environment.",
},
ATTACKTactic.LATERAL_MOVEMENT: {
"id": "TA0008",
"name": "Lateral Movement",
"description": "Trying to move through your environment.",
},
ATTACKTactic.COLLECTION: {
"id": "TA0009",
"name": "Collection",
"description": "Trying to gather data of interest to their goal.",
},
ATTACKTactic.COMMAND_AND_CONTROL: {
"id": "TA0011",
"name": "Command and Control",
"description": "Trying to communicate with compromised systems to control them.",
},
ATTACKTactic.EXFILTRATION: {
"id": "TA0010",
"name": "Exfiltration",
"description": "Trying to steal data.",
},
ATTACKTactic.IMPACT: {
"id": "TA0040",
"name": "Impact",
"description": "Trying to manipulate, interrupt, or destroy systems and data.",
},
}
# =============================================================================
# ATT&CK Technique Database
# =============================================================================
@dataclass
class ATTACKTechnique:
"""MITRE ATT&CK technique definition."""
technique_id: str
name: str
tactics: list[ATTACKTactic]
description: str = ""
platforms: list[str] = field(default_factory=lambda: ["Windows", "Linux", "macOS"])
data_sources: list[str] = field(default_factory=list)
detection: str = ""
url: str = ""
subtechniques: list[str] = field(default_factory=list)
def __post_init__(self) -> None:
if not self.url:
self.url = f"https://attack.mitre.org/techniques/{self.technique_id.replace('.', '/')}/"
def to_dict(self) -> dict[str, Any]:
return {
"technique_id": self.technique_id,
"name": self.name,
"tactics": [t.value for t in self.tactics],
"description": self.description,
"platforms": self.platforms,
"url": self.url,
"subtechniques": self.subtechniques,
}
def to_stix_kill_chain_phases(self) -> list[dict[str, str]]:
"""Convert to STIX 2.1 kill chain phases."""
return [
{
"kill_chain_name": "mitre-attack",
"phase_name": tactic.value,
}
for tactic in self.tactics
]
# Comprehensive technique database with real MITRE ATT&CK IDs
TECHNIQUE_DATABASE: dict[str, ATTACKTechnique] = {
# Initial Access (TA0001)
"T1566": ATTACKTechnique(
technique_id="T1566",
name="Phishing",
tactics=[ATTACKTactic.INITIAL_ACCESS],
description="Adversaries may send phishing messages to gain access to victim systems.",
data_sources=["Application Log", "Network Traffic"],
detection="Monitor for suspicious email attachments and links.",
subtechniques=["T1566.001", "T1566.002", "T1566.003"],
),
"T1566.001": ATTACKTechnique(
technique_id="T1566.001",
name="Phishing: Spearphishing Attachment",
tactics=[ATTACKTactic.INITIAL_ACCESS],
description="Adversaries send spearphishing emails with a malicious attachment.",
data_sources=["Application Log", "File", "Network Traffic"],
),
"T1566.002": ATTACKTechnique(
technique_id="T1566.002",
name="Phishing: Spearphishing Link",
tactics=[ATTACKTactic.INITIAL_ACCESS],
description="Adversaries send spearphishing emails with a malicious link.",
data_sources=["Application Log", "Network Traffic"],
),
"T1190": ATTACKTechnique(
technique_id="T1190",
name="Exploit Public-Facing Application",
tactics=[ATTACKTactic.INITIAL_ACCESS],
description="Adversaries exploit vulnerabilities in internet-facing systems.",
data_sources=["Application Log", "Network Traffic"],
),
"T1133": ATTACKTechnique(
technique_id="T1133",
name="External Remote Services",
tactics=[ATTACKTactic.INITIAL_ACCESS, ATTACKTactic.PERSISTENCE],
description="Adversaries leverage external-facing remote services to initially access or persist.",
data_sources=["Logon Session", "Network Traffic"],
),
"T1078": ATTACKTechnique(
technique_id="T1078",
name="Valid Accounts",
tactics=[ATTACKTactic.INITIAL_ACCESS, ATTACKTactic.PERSISTENCE,
ATTACKTactic.PRIVILEGE_ESCALATION, ATTACKTactic.DEFENSE_EVASION],
description="Adversaries obtain and abuse credentials of existing accounts.",
data_sources=["Logon Session", "User Account"],
subtechniques=["T1078.001", "T1078.002", "T1078.003", "T1078.004"],
),
"T1195": ATTACKTechnique(
technique_id="T1195",
name="Supply Chain Compromise",
tactics=[ATTACKTactic.INITIAL_ACCESS],
description="Adversaries manipulate products or delivery mechanisms prior to receipt.",
data_sources=["File"],
subtechniques=["T1195.001", "T1195.002", "T1195.003"],
),
# Execution (TA0002)
"T1059": ATTACKTechnique(
technique_id="T1059",
name="Command and Scripting Interpreter",
tactics=[ATTACKTactic.EXECUTION],
description="Adversaries abuse command and script interpreters to execute commands.",
data_sources=["Command", "Process", "Script"],
subtechniques=["T1059.001", "T1059.003", "T1059.004", "T1059.005", "T1059.006"],
),
"T1059.001": ATTACKTechnique(
technique_id="T1059.001",
name="Command and Scripting Interpreter: PowerShell",
tactics=[ATTACKTactic.EXECUTION],
description="Adversaries abuse PowerShell commands and scripts for execution.",
data_sources=["Command", "Module", "Process", "Script"],
),
"T1059.003": ATTACKTechnique(
technique_id="T1059.003",
name="Command and Scripting Interpreter: Windows Command Shell",
tactics=[ATTACKTactic.EXECUTION],
description="Adversaries abuse the Windows command shell (cmd) for execution.",
data_sources=["Command", "Process"],
),
"T1203": ATTACKTechnique(
technique_id="T1203",
name="Exploitation for Client Execution",
tactics=[ATTACKTactic.EXECUTION],
description="Adversaries exploit software vulnerabilities in client applications.",
data_sources=["Application Log", "Process"],
),
"T1053": ATTACKTechnique(
technique_id="T1053",
name="Scheduled Task/Job",
tactics=[ATTACKTactic.EXECUTION, ATTACKTactic.PERSISTENCE, ATTACKTactic.PRIVILEGE_ESCALATION],
description="Adversaries abuse task scheduling to execute malicious code.",
data_sources=["Command", "File", "Process", "Scheduled Job"],
subtechniques=["T1053.002", "T1053.003", "T1053.005"],
),
"T1047": ATTACKTechnique(
technique_id="T1047",
name="Windows Management Instrumentation",
tactics=[ATTACKTactic.EXECUTION],
description="Adversaries abuse WMI to execute malicious commands and payloads.",
data_sources=["Command", "Network Traffic", "Process"],
platforms=["Windows"],
),
# Persistence (TA0003)
"T1547": ATTACKTechnique(
technique_id="T1547",
name="Boot or Logon Autostart Execution",
tactics=[ATTACKTactic.PERSISTENCE, ATTACKTactic.PRIVILEGE_ESCALATION],
description="Adversaries configure system settings to automatically execute a program during boot or logon.",
data_sources=["Command", "File", "Process", "Windows Registry"],
subtechniques=["T1547.001", "T1547.004", "T1547.009"],
),
"T1136": ATTACKTechnique(
technique_id="T1136",
name="Create Account",
tactics=[ATTACKTactic.PERSISTENCE],
description="Adversaries create accounts to maintain access to victim systems.",
data_sources=["Process", "User Account"],
subtechniques=["T1136.001", "T1136.002", "T1136.003"],
),
"T1543": ATTACKTechnique(
technique_id="T1543",
name="Create or Modify System Process",
tactics=[ATTACKTactic.PERSISTENCE, ATTACKTactic.PRIVILEGE_ESCALATION],
description="Adversaries create or modify system-level processes to repeatedly execute malicious payloads.",
data_sources=["Command", "File", "Process", "Service", "Windows Registry"],
subtechniques=["T1543.001", "T1543.002", "T1543.003"],
),
# Privilege Escalation (TA0004)
"T1055": ATTACKTechnique(
technique_id="T1055",
name="Process Injection",
tactics=[ATTACKTactic.PRIVILEGE_ESCALATION, ATTACKTactic.DEFENSE_EVASION],
description="Adversaries inject code into processes to evade process-based defenses and elevate privileges.",
data_sources=["File", "Module", "Process"],
subtechniques=["T1055.001", "T1055.002", "T1055.003", "T1055.004", "T1055.012"],
),
"T1068": ATTACKTechnique(
technique_id="T1068",
name="Exploitation for Privilege Escalation",
tactics=[ATTACKTactic.PRIVILEGE_ESCALATION],
description="Adversaries exploit software vulnerabilities to elevate privileges.",
data_sources=["Process"],
),
# Defense Evasion (TA0005)
"T1070": ATTACKTechnique(
technique_id="T1070",
name="Indicator Removal",
tactics=[ATTACKTactic.DEFENSE_EVASION],
description="Adversaries delete or modify artifacts generated on a host system to remove evidence.",
data_sources=["Command", "File", "Firewall", "Network Traffic", "Process",
"User Account", "Windows Registry"],
subtechniques=["T1070.001", "T1070.002", "T1070.003", "T1070.004", "T1070.006"],
),
"T1070.001": ATTACKTechnique(
technique_id="T1070.001",
name="Indicator Removal: Clear Windows Event Logs",
tactics=[ATTACKTactic.DEFENSE_EVASION],
description="Adversaries clear Windows Event Logs to hide activity.",
data_sources=["Command", "Process"],
platforms=["Windows"],
),
"T1027": ATTACKTechnique(
technique_id="T1027",
name="Obfuscated Files or Information",
tactics=[ATTACKTactic.DEFENSE_EVASION],
description="Adversaries make payloads difficult to discover or analyze by encrypting, encoding, or obfuscating.",
data_sources=["Command", "File", "Process"],
subtechniques=["T1027.001", "T1027.002", "T1027.003", "T1027.004", "T1027.005",
"T1027.006", "T1027.009", "T1027.010", "T1027.011"],
),
"T1562": ATTACKTechnique(
technique_id="T1562",
name="Impair Defenses",
tactics=[ATTACKTactic.DEFENSE_EVASION],
description="Adversaries maliciously modify security tools to avoid detection.",
data_sources=["Command", "Process", "Sensor Health", "Service", "Windows Registry"],
subtechniques=["T1562.001", "T1562.002", "T1562.004", "T1562.006"],
),
"T1036": ATTACKTechnique(
technique_id="T1036",
name="Masquerading",
tactics=[ATTACKTactic.DEFENSE_EVASION],
description="Adversaries manipulate features of their artifacts to make them appear legitimate.",
data_sources=["Command", "File", "Image", "Process", "Scheduled Job", "Service"],
subtechniques=["T1036.001", "T1036.003", "T1036.004", "T1036.005", "T1036.007"],
),
# Credential Access (TA0006)
"T1003": ATTACKTechnique(
technique_id="T1003",
name="OS Credential Dumping",
tactics=[ATTACKTactic.CREDENTIAL_ACCESS],
description="Adversaries attempt to dump credentials to obtain account login information.",
data_sources=["Active Directory", "Command", "File", "Process"],
subtechniques=["T1003.001", "T1003.002", "T1003.003", "T1003.004",
"T1003.005", "T1003.006", "T1003.007", "T1003.008"],
),
"T1110": ATTACKTechnique(
technique_id="T1110",
name="Brute Force",
tactics=[ATTACKTactic.CREDENTIAL_ACCESS],
description="Adversaries use brute force techniques to gain access to accounts.",
data_sources=["Application Log", "User Account"],
subtechniques=["T1110.001", "T1110.002", "T1110.003", "T1110.004"],
),
"T1557": ATTACKTechnique(
technique_id="T1557",
name="Adversary-in-the-Middle",
tactics=[ATTACKTactic.CREDENTIAL_ACCESS, ATTACKTactic.COLLECTION],
description="Adversaries position themselves between two or more networked devices to support credential collection.",
data_sources=["Network Traffic", "Service"],
subtechniques=["T1557.001", "T1557.002", "T1557.003"],
),
# Discovery (TA0007)
"T1046": ATTACKTechnique(
technique_id="T1046",
name="Network Service Discovery",
tactics=[ATTACKTactic.DISCOVERY],
description="Adversaries attempt to discover services running on remote hosts.",
data_sources=["Cloud Service", "Command", "Network Traffic"],
),
"T1082": ATTACKTechnique(
technique_id="T1082",
name="System Information Discovery",
tactics=[ATTACKTactic.DISCOVERY],
description="Adversaries attempt to get detailed information about the operating system and hardware.",
data_sources=["Command", "OS API", "Process"],
),
# Lateral Movement (TA0008)
"T1021": ATTACKTechnique(
technique_id="T1021",
name="Remote Services",
tactics=[ATTACKTactic.LATERAL_MOVEMENT],
description="Adversaries use valid accounts to log into a service for remote access.",
data_sources=["Command", "Logon Session", "Module", "Network Share",
"Network Traffic", "Process"],
subtechniques=["T1021.001", "T1021.002", "T1021.003", "T1021.004",
"T1021.005", "T1021.006"],
),
"T1021.001": ATTACKTechnique(
technique_id="T1021.001",
name="Remote Services: Remote Desktop Protocol",
tactics=[ATTACKTactic.LATERAL_MOVEMENT],
description="Adversaries use RDP to log into systems using valid credentials.",
data_sources=["Logon Session", "Network Traffic", "Process"],
platforms=["Windows"],
),
"T1021.002": ATTACKTechnique(
technique_id="T1021.002",
name="Remote Services: SMB/Windows Admin Shares",
tactics=[ATTACKTactic.LATERAL_MOVEMENT],
description="Adversaries use valid SMB/Windows Admin Share credentials for lateral movement.",
data_sources=["Command", "Logon Session", "Network Share", "Network Traffic"],
platforms=["Windows"],
),
"T1570": ATTACKTechnique(
technique_id="T1570",
name="Lateral Tool Transfer",
tactics=[ATTACKTactic.LATERAL_MOVEMENT],
description="Adversaries transfer tools or other files between systems in a compromised environment.",
data_sources=["Command", "File", "Named Pipe", "Network Share", "Network Traffic", "Process"],
),
# Command and Control (TA0011)
"T1071": ATTACKTechnique(
technique_id="T1071",
name="Application Layer Protocol",
tactics=[ATTACKTactic.COMMAND_AND_CONTROL],
description="Adversaries communicate using OSI application layer protocols to avoid detection.",
data_sources=["Network Traffic"],
subtechniques=["T1071.001", "T1071.002", "T1071.003", "T1071.004"],
),
"T1071.001": ATTACKTechnique(
technique_id="T1071.001",
name="Application Layer Protocol: Web Protocols",
tactics=[ATTACKTactic.COMMAND_AND_CONTROL],
description="Adversaries communicate using HTTP/HTTPS to blend with normal web traffic.",
data_sources=["Network Traffic"],
),
"T1572": ATTACKTechnique(
technique_id="T1572",
name="Protocol Tunneling",
tactics=[ATTACKTactic.COMMAND_AND_CONTROL],
description="Adversaries tunnel network communications to avoid detection using encapsulated protocols.",
data_sources=["Network Traffic"],
),
"T1573": ATTACKTechnique(
technique_id="T1573",
name="Encrypted Channel",
tactics=[ATTACKTactic.COMMAND_AND_CONTROL],
description="Adversaries employ encryption to conceal command and control traffic.",
data_sources=["Network Traffic"],
subtechniques=["T1573.001", "T1573.002"],
),
"T1105": ATTACKTechnique(
technique_id="T1105",
name="Ingress Tool Transfer",
tactics=[ATTACKTactic.COMMAND_AND_CONTROL],
description="Adversaries transfer tools from an external system into a compromised environment.",
data_sources=["File", "Network Traffic"],
),
"T1090": ATTACKTechnique(
technique_id="T1090",
name="Proxy",
tactics=[ATTACKTactic.COMMAND_AND_CONTROL],
description="Adversaries use connection proxies to direct network traffic between systems.",
data_sources=["Network Traffic"],
subtechniques=["T1090.001", "T1090.002", "T1090.003"],
),
"T1219": ATTACKTechnique(
technique_id="T1219",
name="Remote Access Software",
tactics=[ATTACKTactic.COMMAND_AND_CONTROL],
description="Adversaries use legitimate desktop support and remote access tools as C2 channels.",
data_sources=["Network Traffic", "Process"],
),
"T1568": ATTACKTechnique(
technique_id="T1568",
name="Dynamic Resolution",
tactics=[ATTACKTactic.COMMAND_AND_CONTROL],
description="Adversaries dynamically establish connections to C2 infrastructure to evade common detections.",
data_sources=["Network Traffic"],
subtechniques=["T1568.001", "T1568.002", "T1568.003"],
),
# Exfiltration (TA0010)
"T1041": ATTACKTechnique(
technique_id="T1041",
name="Exfiltration Over C2 Channel",
tactics=[ATTACKTactic.EXFILTRATION],
description="Adversaries steal data by exfiltrating it over an existing command and control channel.",
data_sources=["Command", "File", "Network Traffic"],
),
"T1048": ATTACKTechnique(
technique_id="T1048",
name="Exfiltration Over Alternative Protocol",
tactics=[ATTACKTactic.EXFILTRATION],
description="Adversaries steal data by exfiltrating it over a different protocol than the existing C2 channel.",
data_sources=["Command", "File", "Network Traffic"],
subtechniques=["T1048.001", "T1048.002", "T1048.003"],
),
"T1567": ATTACKTechnique(
technique_id="T1567",
name="Exfiltration Over Web Service",
tactics=[ATTACKTactic.EXFILTRATION],
description="Adversaries use web services to exfiltrate data rather than their primary C2 channel.",
data_sources=["Command", "File", "Network Traffic"],
subtechniques=["T1567.001", "T1567.002"],
),
# Impact (TA0040)
"T1486": ATTACKTechnique(
technique_id="T1486",
name="Data Encrypted for Impact",
tactics=[ATTACKTactic.IMPACT],
description="Adversaries encrypt data on target systems to interrupt availability (ransomware).",
data_sources=["Cloud Storage", "Command", "File", "Process"],
),
"T1489": ATTACKTechnique(
technique_id="T1489",
name="Service Stop",
tactics=[ATTACKTactic.IMPACT],
description="Adversaries stop or disable services on a system to render those services unavailable.",
data_sources=["Command", "File", "Process", "Service", "Windows Registry"],
),
"T1490": ATTACKTechnique(
technique_id="T1490",
name="Inhibit System Recovery",
tactics=[ATTACKTactic.IMPACT],
description="Adversaries delete or remove built-in OS data and disable services to prevent recovery.",
data_sources=["Cloud Storage", "Command", "File", "Process", "Service", "Windows Registry"],
),
"T1498": ATTACKTechnique(
technique_id="T1498",
name="Network Denial of Service",
tactics=[ATTACKTactic.IMPACT],
description="Adversaries perform network denial of service to degrade or block availability.",
data_sources=["Network Traffic", "Sensor Health"],
subtechniques=["T1498.001", "T1498.002"],
),
"T1529": ATTACKTechnique(
technique_id="T1529",
name="System Shutdown/Reboot",
tactics=[ATTACKTactic.IMPACT],
description="Adversaries shut down/reboot systems to interrupt access or aid destruction.",
data_sources=["Command", "Process", "Sensor Health"],
),
"T1531": ATTACKTechnique(
technique_id="T1531",
name="Account Access Removal",
tactics=[ATTACKTactic.IMPACT],
description="Adversaries interrupt availability of resources by inhibiting access to accounts.",
data_sources=["Active Directory", "User Account"],
),
}
# =============================================================================
# IOC-to-ATT&CK Mapping Rules
# =============================================================================
# Mapping rules: (ioc_type, context_keywords) -> list of technique_ids
# Context keywords are matched case-insensitively against the context string
IOC_TECHNIQUE_MAPPING: list[dict[str, Any]] = [
# IP indicators - typically C2 or scanning
{
"ioc_types": ["ip", "ipv4", "ipv6"],
"default_techniques": ["T1071", "T1071.001"],
"context_rules": [
{"keywords": ["c2", "c&c", "command", "control", "beacon"], "techniques": ["T1071", "T1071.001", "T1573", "T1105"]},
{"keywords": ["botnet", "bot"], "techniques": ["T1071", "T1071.001", "T1573", "T1219"]},
{"keywords": ["scan", "recon", "probe"], "techniques": ["T1046"]},
{"keywords": ["tor", "proxy", "vpn"], "techniques": ["T1090", "T1572"]},
{"keywords": ["brute", "login", "auth"], "techniques": ["T1110", "T1078"]},
{"keywords": ["exfil", "data theft", "steal"], "techniques": ["T1041", "T1048"]},
{"keywords": ["rdp", "remote desktop"], "techniques": ["T1021.001", "T1021"]},
{"keywords": ["smb", "windows share"], "techniques": ["T1021.002", "T1570"]},
{"keywords": ["dns", "domain generation", "dga"], "techniques": ["T1568", "T1071.004"]},
],
},
# IP:Port indicators - C2 with specific service identification
{
"ioc_types": ["ip:port"],
"default_techniques": ["T1071", "T1105"],
"context_rules": [
{"keywords": ["c2", "c&c", "command", "control"], "techniques": ["T1071", "T1573", "T1105", "T1219"]},
{"keywords": ["botnet", "bot"], "techniques": ["T1071", "T1573"]},
{"keywords": ["cobalt", "strike", "beacon"], "techniques": ["T1071.001", "T1573", "T1055"]},
{"keywords": ["metasploit", "meterpreter"], "techniques": ["T1071.001", "T1573", "T1055"]},
],
},
# Domain indicators
{
"ioc_types": ["domain"],
"default_techniques": ["T1071.001", "T1568"],
"context_rules": [
{"keywords": ["phish", "spoof", "fake"], "techniques": ["T1566", "T1566.002"]},
{"keywords": ["c2", "c&c", "command", "control"], "techniques": ["T1071.001", "T1568", "T1573"]},
{"keywords": ["malware", "payload", "download", "dropper"], "techniques": ["T1105", "T1071.001"]},
{"keywords": ["dga", "domain generation"], "techniques": ["T1568.002"]},
{"keywords": ["dns tunnel", "tunnel"], "techniques": ["T1572", "T1071.004"]},
{"keywords": ["exfil"], "techniques": ["T1048", "T1567"]},
],
},
# URL indicators
{
"ioc_types": ["url"],
"default_techniques": ["T1071.001", "T1105"],
"context_rules": [
{"keywords": ["phish", "credential", "login", "harvest"], "techniques": ["T1566.002", "T1078"]},
{"keywords": ["malware", "payload", "download", "dropper", "exe", "dll"], "techniques": ["T1105", "T1204"]},
{"keywords": ["exploit", "kit", "drive-by"], "techniques": ["T1190", "T1203"]},
{"keywords": ["webshell", "web shell"], "techniques": ["T1505.003"]},
{"keywords": ["c2", "beacon", "callback"], "techniques": ["T1071.001", "T1573"]},
{"keywords": ["exfil", "upload", "steal"], "techniques": ["T1567", "T1041"]},
],
},
# File hash indicators
{
"ioc_types": ["md5", "sha1", "sha256"],
"default_techniques": ["T1027", "T1059"],
"context_rules": [
{"keywords": ["ransomware", "ransom", "encrypt"], "techniques": ["T1486", "T1490", "T1489"]},
{"keywords": ["trojan", "rat", "remote access"], "techniques": ["T1219", "T1105", "T1055"]},
{"keywords": ["backdoor"], "techniques": ["T1547", "T1543", "T1133"]},
{"keywords": ["worm"], "techniques": ["T1570", "T1021"]},
{"keywords": ["rootkit"], "techniques": ["T1014", "T1055"]},
{"keywords": ["keylog", "stealer", "infostealer"], "techniques": ["T1056", "T1003"]},
{"keywords": ["miner", "cryptominer", "crypto"], "techniques": ["T1496"]},
{"keywords": ["cobalt", "strike"], "techniques": ["T1055", "T1071.001", "T1573"]},
{"keywords": ["emotet"], "techniques": ["T1566.001", "T1059.001", "T1547.001"]},
{"keywords": ["trickbot"], "techniques": ["T1055", "T1003", "T1071.001"]},
{"keywords": ["powershell", "ps1"], "techniques": ["T1059.001", "T1027"]},
{"keywords": ["macro", "vba", "office"], "techniques": ["T1059.005", "T1566.001"]},
{"keywords": ["webshell"], "techniques": ["T1505.003"]},
{"keywords": ["loader", "dropper"], "techniques": ["T1105", "T1059"]},
],
},
# Email indicators
{
"ioc_types": ["email"],
"default_techniques": ["T1566", "T1566.001"],
"context_rules": [
{"keywords": ["phish", "spear"], "techniques": ["T1566", "T1566.001", "T1566.002"]},
{"keywords": ["bec", "business email"], "techniques": ["T1566.002", "T1078"]},
{"keywords": ["spam", "bulk"], "techniques": ["T1566"]},
],
},
]
# =============================================================================
# ATT&CK Mapper
# =============================================================================
class MITREATTACKMapper:
"""
Maps threat indicators to MITRE ATT&CK techniques with context-aware matching.
"""
def __init__(self) -> None:
self.techniques = TECHNIQUE_DATABASE
self.mapping_rules = IOC_TECHNIQUE_MAPPING
def get_technique(self, technique_id: str) -> Optional[ATTACKTechnique]:
"""
Look up a technique by ID.
Args:
technique_id: MITRE ATT&CK technique ID (e.g., 'T1566')
Returns:
ATTACKTechnique or None
"""
return self.techniques.get(technique_id)
def get_techniques_for_tactic(self, tactic: ATTACKTactic) -> list[ATTACKTechnique]:
"""
Get all techniques for a given tactic.
Args:
tactic: MITRE ATT&CK tactic
Returns:
List of techniques mapped to that tactic
"""
return [
t for t in self.techniques.values()
if tactic in t.tactics
]
def map_ioc(
self,
ioc_type: str,
ioc_value: str,
context: str = "",
) -> list[ATTACKTechnique]:
"""
Map an IOC to relevant ATT&CK techniques using type and context.
Args:
ioc_type: IOC type (ip, domain, url, md5, sha1, sha256, email, ip:port)
ioc_value: The IOC value
context: Additional context (malware name, threat type, tags, description)
Returns:
List of matched ATTACKTechnique objects, sorted by relevance
"""
matched_ids: list[str] = []
context_lower = context.lower() if context else ""
for rule_set in self.mapping_rules:
if ioc_type.lower() not in rule_set["ioc_types"]:
continue
# Check context rules first
context_matched = False
for ctx_rule in rule_set.get("context_rules", []):
keywords = ctx_rule["keywords"]
if any(kw in context_lower for kw in keywords):
for tid in ctx_rule["techniques"]:
if tid not in matched_ids:
matched_ids.append(tid)
context_matched = True
# Apply defaults if no context matched
if not context_matched:
for tid in rule_set.get("default_techniques", []):
if tid not in matched_ids:
matched_ids.append(tid)
# Resolve technique IDs to objects
techniques: list[ATTACKTechnique] = []
for tid in matched_ids:
tech = self.techniques.get(tid)
if tech:
techniques.append(tech)
else:
# Log techniques referenced in rules but not in database
logger.debug(f"Technique {tid} referenced in mapping rules but not in database")
return techniques
def map_indicators_batch(
self,
indicators: list[dict[str, Any]],
) -> dict[str, list[dict[str, Any]]]:
"""
Map multiple indicators to ATT&CK techniques in batch.
Args:
indicators: List of indicator dicts with ioc_type, ioc/ioc_value, and context fields
Returns:
Dict mapping technique_ids to lists of matching indicators
"""
technique_map: dict[str, list[dict[str, Any]]] = {}
for ind in indicators:
ioc_type = ind.get("ioc_type", ind.get("type", ""))
ioc_value = ind.get("ioc", ind.get("ioc_value", ind.get("value", "")))
# Build context from available fields
context_parts = [
ind.get("malware", ""),
ind.get("malware_printable", ""),
ind.get("threat_type", ""),
ind.get("description", ""),
]
tags = ind.get("tags", [])
if tags:
context_parts.extend(tags)
context = " ".join(str(p) for p in context_parts if p)
techniques = self.map_ioc(ioc_type, ioc_value, context)
for tech in techniques:
if tech.technique_id not in technique_map:
technique_map[tech.technique_id] = []
technique_map[tech.technique_id].append({
"ioc_type": ioc_type,
"ioc_value": ioc_value,
"context": context[:200],
})
return technique_map
# =============================================================================
# ATT&CK Navigator Layer Generation
# =============================================================================
def generate_attack_navigator_layer(
indicators: list[dict[str, Any]],
layer_name: str = "Threat Intelligence Coverage",
layer_description: str = "ATT&CK techniques mapped from threat intelligence indicators",
) -> dict[str, Any]:
"""
Generate a MITRE ATT&CK Navigator JSON layer from threat indicators.
The output conforms to the ATT&CK Navigator layer file format v4.5:
https://mitre-attack.github.io/attack-navigator/
Args:
indicators: List of indicator dicts with ioc_type, ioc/ioc_value, and context
layer_name: Name for the Navigator layer
layer_description: Description for the layer
Returns:
ATT&CK Navigator layer JSON dict
"""
mapper = MITREATTACKMapper()
technique_map = mapper.map_indicators_batch(indicators)
# Calculate scores based on number of indicators per technique
max_count = max((len(inds) for inds in technique_map.values()), default=1)
techniques_layer: list[dict[str, Any]] = []
for tech_id, matching_indicators in technique_map.items():
tech = mapper.get_technique(tech_id)
if not tech:
continue
count = len(matching_indicators)
# Normalize score to 1-100
score = max(1, int((count / max_count) * 100)) if max_count > 0 else 1
# Color gradient: low (yellow) -> high (red)
color = _score_to_color(score)
tech_entry: dict[str, Any] = {
"techniqueID": tech.technique_id,
"tactic": tech.tactics[0].value if tech.tactics else "",
"color": color,
"comment": f"{count} indicator(s) mapped. Sample: {matching_indicators[0]['ioc_value'][:50]}",
"enabled": True,
"metadata": [
{"name": "indicator_count", "value": str(count)},
],
"links": [
{"label": "MITRE ATT&CK", "url": tech.url},
],
"showSubtechniques": bool(tech.subtechniques),
"score": score,
}
techniques_layer.append(tech_entry)
# Build the Navigator layer
layer: dict[str, Any] = {
"name": layer_name,
"versions": {
"attack": "15",
"navigator": "4.9.5",
"layer": "4.5",
},
"domain": "enterprise-attack",
"description": layer_description,
"filters": {
"platforms": ["Windows", "Linux", "macOS", "Network", "Cloud"],
},
"sorting": 3, # Sort by score descending
"layout": {
"layout": "side",
"aggregateFunction": "average",
"showID": True,
"showName": True,
"showAggregateScores": True,
"countUnscored": False,
"expandedSubtechniques": "annotated",
},
"hideDisabled": False,
"techniques": techniques_layer,
"gradient": {
"colors": ["#ffe766", "#ff6666"],
"minValue": 0,
"maxValue": 100,
},
"legendItems": [
{"label": "Low coverage (1-25 indicators)", "color": "#ffe766"},
{"label": "Medium coverage (26-50 indicators)", "color": "#ffb366"},
{"label": "High coverage (51-75 indicators)", "color": "#ff8c66"},
{"label": "Critical coverage (76-100 indicators)", "color": "#ff6666"},
],
"metadata": [
{"name": "generated_by", "value": "Threat Intel MCP - Defense Compliance Module"},
{"name": "generated_at", "value": datetime.now(timezone.utc).isoformat()},
{"name": "total_indicators", "value": str(len(indicators))},
{"name": "techniques_mapped", "value": str(len(techniques_layer))},
],
"links": [],
"showTacticRowBackground": True,
"tacticRowBackground": "#205b8f",
"selectTechniquesAcrossTactics": True,
"selectSubtechniquesWithParent": False,
"selectVisibleTechniques": False,
}
return layer
def _score_to_color(score: int) -> str:
"""Convert a score (1-100) to a hex color on a yellow-to-red gradient."""
# Clamp
score = max(0, min(100, score))
# Yellow (#ffe766) to Red (#ff6666)
# R stays at 0xff
# G: 0xe7 (231) -> 0x66 (102), delta = -129
# B: 0x66 (102) stays at 0x66
g = int(231 - (129 * score / 100))
return f"#ff{g:02x}66"
# =============================================================================
# Public Convenience Functions
# =============================================================================
def map_ioc_to_attack(
ioc_type: str,
ioc_value: str,
context: str = "",
) -> list[dict[str, Any]]:
"""
Map a single IOC to MITRE ATT&CK techniques.
Args:
ioc_type: IOC type (ip, domain, url, md5, sha1, sha256, email, ip:port)
ioc_value: The IOC value
context: Additional context string (malware name, tags, description)
Returns:
List of technique dicts with technique_id, name, tactics, url
"""
mapper = MITREATTACKMapper()
techniques = mapper.map_ioc(ioc_type, ioc_value, context)
return [t.to_dict() for t in techniques]