#!/usr/bin/env python3
"""
Threat Intelligence Data Fetcher
Background service that fetches and caches threat data.
Uses shared configuration from config.py to avoid duplication.
"""
import asyncio
import json
from datetime import datetime, timedelta
from typing import Any
import aiohttp
from .config import (
THREAT_FEEDS,
FeedType,
get_cache_dir,
setup_logging,
)
logger = setup_logging("threat-intel-fetcher")
# CISA KEV URL (also in THREAT_FEEDS but duplicated here for direct JSON fetch)
CISA_KEV_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
async def fetch_url(url: str, timeout: int = 30) -> str:
"""Fetch URL content with timeout."""
async with aiohttp.ClientSession() as session:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
response.raise_for_status()
return await response.text()
async def fetch_json(url: str, timeout: int = 30) -> dict:
"""Fetch and parse JSON from URL."""
async with aiohttp.ClientSession() as session:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
response.raise_for_status()
return await response.json()
def parse_ip_list(content: str) -> list[str]:
"""Parse IP addresses from text content, skipping comments and blanks."""
import ipaddress
ips: list[str] = []
for line in content.split('\n'):
line = line.strip()
if line and not line.startswith('#'):
parts = line.split()
if parts:
try:
ipaddress.ip_address(parts[0])
ips.append(parts[0])
except ValueError:
continue
return ips
def parse_url_list(content: str) -> list[str]:
"""Parse URLs from text content, skipping comments and blanks."""
urls: list[str] = []
for line in content.split('\n'):
line = line.strip()
if line and not line.startswith('#'):
if line.startswith('http://') or line.startswith('https://'):
urls.append(line)
return urls
async def fetch_all_feeds() -> dict[str, Any]:
"""Fetch all threat feeds and cache results."""
cache_dir = get_cache_dir()
cache_dir.mkdir(parents=True, exist_ok=True)
summary: dict[str, Any] = {
"totals": {"malicious_ips": 0, "malicious_urls": 0, "recent_cves": 0},
"feeds": {},
"alerts": [],
}
logger.info("Fetching threat feeds...")
for name, feed in THREAT_FEEDS.items():
if not feed.enabled:
continue
try:
logger.info(f" Fetching {name}...")
items: list[str] = []
if feed.feed_type == FeedType.IP_LIST:
content = await fetch_url(feed.url)
items = parse_ip_list(content)
summary["feeds"][name] = {"count": len(items), "type": "ip_list"}
summary["totals"]["malicious_ips"] += len(items)
elif feed.feed_type == FeedType.URL_LIST:
content = await fetch_url(feed.url)
items = parse_url_list(content)
summary["feeds"][name] = {"count": len(items), "type": "url_list"}
summary["totals"]["malicious_urls"] += len(items)
elif feed.feed_type == FeedType.JSON:
# JSON feeds are handled separately (e.g., CISA KEV below)
summary["feeds"][name] = {"count": 0, "type": "json", "note": "JSON feed"}
continue
else:
content = await fetch_url(feed.url)
items = content.split('\n')
summary["feeds"][name] = {"count": len(items), "type": feed.feed_type.value}
# Cache individual feed
feed_cache = cache_dir / f"{name}.json"
with open(feed_cache, 'w') as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"count": len(items),
"items": items[:1000],
}, f)
logger.info(f" {name}: {len(items)} items")
except Exception as e:
logger.error(f" {name}: ERROR - {e}")
summary["feeds"][name] = {"count": 0, "error": str(e)}
# Fetch CISA KEV
try:
logger.info(" Fetching CISA KEV...")
kev_data = await fetch_json(CISA_KEV_URL)
vulnerabilities = kev_data.get("vulnerabilities", [])
# Filter recent
cutoff = datetime.now() - timedelta(days=7)
recent: list[dict[str, Any]] = []
for vuln in vulnerabilities:
try:
date_added = datetime.strptime(
vuln.get("dateAdded", "2000-01-01"), "%Y-%m-%d"
)
if date_added >= cutoff:
recent.append({
"cve_id": vuln.get("cveID"),
"vendor": vuln.get("vendorProject"),
"product": vuln.get("product"),
"name": vuln.get("vulnerabilityName"),
"description": vuln.get("shortDescription"),
"date_added": vuln.get("dateAdded"),
})
except (ValueError, TypeError):
continue
summary["totals"]["recent_cves"] = len(recent)
# Cache KEV
kev_cache = cache_dir / "kev_cache.json"
with open(kev_cache, 'w') as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"total": len(vulnerabilities),
"recent_count": len(recent),
"vulnerabilities": recent,
}, f)
logger.info(f" CISA KEV: {len(recent)} recent / {len(vulnerabilities)} total")
if len(recent) > 0:
summary["alerts"].append({
"type": "new_kev",
"message": f"{len(recent)} new CISA KEV in last 7 days",
"severity": "high",
})
except Exception as e:
logger.error(f" CISA KEV: ERROR - {e}")
# Save summary cache
summary_cache = cache_dir / "summary_cache.json"
with open(summary_cache, 'w') as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"data": summary,
}, f)
logger.info("Fetch complete. Summary cached.")
return summary
async def main() -> None:
"""Main loop - fetch data every 30 minutes."""
logger.info("Threat Intelligence Data Fetcher starting...")
while True:
try:
await fetch_all_feeds()
except Exception as e:
logger.error(f"Error in fetch cycle: {e}")
# Wait 30 minutes
logger.info("Sleeping for 30 minutes...")
await asyncio.sleep(1800)
if __name__ == "__main__":
asyncio.run(main())