"""
Threat Intelligence Tool
Checks IP addresses from PCAP files against threat intelligence feeds.
"""
import logging
import os
import subprocess
from typing import Dict, Any, List, Set
import httpx
logger = logging.getLogger(__name__)
async def check_threats_execute(
project_name: str,
pcap_name: str,
check_urlhaus: bool = True
) -> Dict[str, Any]:
"""
Check IP addresses from a PCAP file against threat intelligence feeds.
Currently supports:
- URLhaus blacklist (malware C2 servers, malicious URLs)
This tool:
- Extracts all unique IP addresses from the PCAP
- Checks them against threat intelligence databases
- Returns list of IPs with threat indicators
- Provides threat context and severity
Args:
project_name: Name of the project containing the PCAP
pcap_name: Name of the PCAP file in the project
check_urlhaus: Whether to check against URLhaus blacklist (default: True)
Returns:
Dictionary containing:
- ok: Success status
- project_name: Name of the project
- pcap_name: Name of the PCAP file
- unique_ips: List of all unique IPs found
- total_ips: Count of unique IPs
- threats_found: List of IPs with threat indicators
- total_threats: Count of threatening IPs
- threat_details: Detailed information about each threat
- feeds_checked: List of threat feeds that were queried
- error: Error message if analysis failed
Example:
result = await check_threats(
project_name="incident_response",
pcap_name="capture.pcap",
check_urlhaus=True
)
"""
try:
from .workspace_sync import get_pcap_path as get_project_pcap_path
pcap_path = get_project_pcap_path(project_name, pcap_name)
if not pcap_path:
return {
"ok": False,
"error": f"PCAP '{pcap_name}' not found in project '{project_name}'"
}
logger.info(f"Checking threats for: {pcap_path}")
# Extract IP addresses from PCAP
cmd = [
"tshark",
"-r", pcap_path,
"-T", "fields",
"-e", "ip.src",
"-e", "ip.dst"
]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
timeout=60
)
ip_output = result.stdout.decode('utf-8')
# Parse and collect unique IPs
unique_ips: Set[str] = set()
for line in ip_output.strip().split('\n'):
if not line.strip():
continue
parts = line.split('\t')
for ip in parts:
if ip and ip != 'unknown' and '.' in ip:
unique_ips.add(ip.strip())
unique_ips_list = sorted(list(unique_ips))
logger.info(f"Found {len(unique_ips_list)} unique IPs")
threats_found = []
feeds_checked = []
# Check against URLhaus
if check_urlhaus:
feeds_checked.append("URLhaus")
logger.info("Checking against URLhaus blacklist")
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get("https://urlhaus.abuse.ch/downloads/text/")
if response.status_code == 200:
urlhaus_data = response.text
# Extract IPs from URLhaus data
urlhaus_ips = set()
for line in urlhaus_data.split('\n'):
line = line.strip()
if not line or line.startswith('#'):
continue
# Extract IP if present (URLhaus has URLs, need to extract IPs)
parts = line.split('/')
for part in parts:
# Simple IP pattern check
if '.' in part and all(c.isdigit() or c == '.' for c in part):
urlhaus_ips.add(part)
# Check our IPs against URLhaus
for ip in unique_ips_list:
if ip in urlhaus_ips:
threats_found.append({
"ip": ip,
"source": "URLhaus",
"severity": "HIGH",
"description": "IP found in URLhaus malware database",
"recommendation": "Block this IP immediately and investigate all traffic"
})
logger.info(f"URLhaus check complete: {len(threats_found)} threats found")
else:
logger.warning(f"URLhaus returned status code: {response.status_code}")
except Exception as e:
logger.error(f"Error checking URLhaus: {e}")
# Continue with other checks
# Prepare response
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"unique_ips": unique_ips_list,
"total_ips": len(unique_ips_list),
"threats_found": threats_found,
"total_threats": len(threats_found),
"threat_details": {
"high_severity": len([t for t in threats_found if t.get("severity") == "HIGH"]),
"medium_severity": len([t for t in threats_found if t.get("severity") == "MEDIUM"]),
"low_severity": len([t for t in threats_found if t.get("severity") == "LOW"])
},
"feeds_checked": feeds_checked,
"security_notes": {
"clean": len(threats_found) == 0,
"message": "No threats detected" if len(threats_found) == 0 else f"{len(threats_found)} potential threat(s) detected",
"recommendations": [
"Review all traffic to/from threatening IPs",
"Check for data exfiltration",
"Investigate compromised hosts",
"Update firewall rules to block malicious IPs"
] if len(threats_found) > 0 else []
}
}
except subprocess.TimeoutExpired:
logger.error("tshark command timed out")
return {
"ok": False,
"error": "Analysis timed out after 60 seconds"
}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode('utf-8') if e.stderr else str(e)
logger.error(f"tshark command failed: {error_msg}")
return {
"ok": False,
"error": f"tshark analysis failed: {error_msg}"
}
except Exception as e:
logger.error(f"Error checking threats: {e}", exc_info=True)
return {
"ok": False,
"error": f"Analysis failed: {str(e)}"
}
async def check_ip_threat_execute(
ip_address: str,
check_urlhaus: bool = True
) -> Dict[str, Any]:
"""
Check a specific IP address against threat intelligence feeds.
Currently supports:
- URLhaus blacklist
Args:
ip_address: IP address to check (e.g., "192.168.1.1")
check_urlhaus: Whether to check against URLhaus blacklist (default: True)
Returns:
Dictionary containing:
- ok: Success status
- ip_address: IP that was checked
- is_threat: Boolean indicating if IP is threatening
- threat_sources: List of feeds where IP was found
- threat_level: Severity level (HIGH, MEDIUM, LOW, CLEAN)
- details: Detailed threat information
- error: Error message if check failed
Example:
result = await check_ip_threat(
ip_address="192.0.2.1",
check_urlhaus=True
)
"""
try:
logger.info(f"Checking IP threat for: {ip_address}")
# Validate IP format
parts = ip_address.split('.')
if len(parts) != 4 or not all(p.isdigit() and 0 <= int(p) <= 255 for p in parts):
return {
"ok": False,
"error": f"Invalid IP address format: {ip_address}"
}
threats_found = []
feeds_checked = []
# Check against URLhaus
if check_urlhaus:
feeds_checked.append("URLhaus")
logger.info("Checking IP against URLhaus blacklist")
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get("https://urlhaus.abuse.ch/downloads/text/")
if response.status_code == 200:
urlhaus_data = response.text
# Check if IP appears in URLhaus data
found_in_urlhaus = ip_address in urlhaus_data
if found_in_urlhaus:
threats_found.append({
"source": "URLhaus",
"severity": "HIGH",
"description": "IP found in URLhaus malware database",
"recommendation": "Block this IP immediately"
})
logger.info(f"URLhaus check complete: {'THREAT' if found_in_urlhaus else 'CLEAN'}")
else:
logger.warning(f"URLhaus returned status code: {response.status_code}")
except Exception as e:
logger.error(f"Error checking URLhaus: {e}")
is_threat = len(threats_found) > 0
threat_level = "HIGH" if is_threat else "CLEAN"
return {
"ok": True,
"ip_address": ip_address,
"is_threat": is_threat,
"threat_sources": [t["source"] for t in threats_found],
"threat_level": threat_level,
"details": threats_found,
"feeds_checked": feeds_checked,
"message": f"IP {ip_address} is {'THREATENING' if is_threat else 'CLEAN'}"
}
except Exception as e:
logger.error(f"Error checking IP threat: {e}", exc_info=True)
return {
"ok": False,
"error": f"Check failed: {str(e)}",
"ip_address": ip_address
}