#!/usr/bin/env python3
"""CVE data ingestion from GitHub CVE database."""
import json
import requests
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Generator
# Path for imports
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from database.db import init_db, insert_cve, set_metadata
# GitHub API URL for CVE content
GITHUB_API_BASE = "https://api.github.com/repos/CVEProject/cvelistV5/contents/cves"
def fetch_recent_cve_files(year: str = "2024", limit: int = 100) -> list[str]:
"""
Get list of CVE file URLs from GitHub for a given year.
Returns raw file URLs for downloading.
"""
print(f"Get CVE list for year {year}...")
# Get year directory listing
year_url = f"{GITHUB_API_BASE}/{year}"
response = requests.get(year_url, headers={"Accept": "application/vnd.github.v3+json"})
if response.status_code != 200:
print(f"Error getting year listing: {response.status_code}")
return []
subdirs = response.json()
cve_urls = []
# Iterate through ID range subdirectories (e.g., 0xxx, 1xxx)
for subdir in subdirs[:5]: # Limit subdirs for faster loading
if subdir['type'] != 'dir':
continue
subdir_url = subdir['url']
print(f" Scanning {subdir['name']}...")
response = requests.get(subdir_url, headers={"Accept": "application/vnd.github.v3+json"})
if response.status_code != 200:
continue
files = response.json()
for file in files:
if file['name'].endswith('.json'):
cve_urls.append(file['download_url'])
if len(cve_urls) >= limit:
return cve_urls
return cve_urls
def download_cve(url: str) -> dict | None:
"""Download and parse a single CVE JSON."""
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error downloading {url}: {e}")
return None
def parse_cve_data(raw_cve: dict) -> dict | None:
"""
Parse raw CVE JSON intodatabase format.
Handles the CVE JSON schema.
"""
try:
cve_id = raw_cve.get('cveMetadata', {}).get('cveId')
if not cve_id:
return None
# Get containers
containers = raw_cve.get('containers', {})
cna = containers.get('cna', {})
# Get description
description = ""
descriptions = cna.get('descriptions', [])
for desc in descriptions:
if desc.get('lang', '').startswith('en'):
description = desc.get('value', '')
break
if not description and descriptions:
description = descriptions[0].get('value', '')
# Get severity and CVSS score
severity = "UNKNOWN"
cvss_score = None
metrics = cna.get('metrics', [])
for metric in metrics:
# Check for CVSS v3.1
if 'cvssV3_1' in metric:
cvss_data = metric['cvssV3_1']
severity = cvss_data.get('baseSeverity', severity)
cvss_score = cvss_data.get('baseScore')
break
# Check for CVSS v3.0
elif 'cvssV3_0' in metric:
cvss_data = metric['cvssV3_0']
severity = cvss_data.get('baseSeverity', severity)
cvss_score = cvss_data.get('baseScore')
break
# Get dates
cve_metadata = raw_cve.get('cveMetadata', {})
published_date = cve_metadata.get('datePublished', '')[:10] # YYYY-MM-DD
modified_date = cve_metadata.get('dateUpdated', '')[:10]
# Get references
references = []
for ref in cna.get('references', []):
if 'url' in ref:
references.append(ref['url'])
return {
'cve_id': cve_id,
'description': description,
'severity': severity,
'cvss_score': cvss_score,
'published_date': published_date,
'modified_date': modified_date,
'references_json': json.dumps(references[:10]) # Limit references
}
except Exception as e:
print(f"Error parsing CVE: {e}")
return None
def load_cves(db_path: Path = None, year: str = "2024", limit: int = 100) -> int:
"""
Function to load CVEs into the database.
Returns count of CVEs loaded.
"""
print(f"\n{'='*50}")
print(f"CVE Data Loader")
print(f"{'='*50}\n")
# Initialize database
conn = init_db(db_path)
print(f"Database initialized at: {db_path or 'default location'}\n")
# Fetch CVE file URLs
cve_urls = fetch_recent_cve_files(year=year, limit=limit)
print(f"\nFound {len(cve_urls)} CVE files to process\n")
if not cve_urls:
print("No CVEs found to load.")
return 0
# Download/insert CVEs
loaded_count = 0
errors = 0
for i, url in enumerate(cve_urls, 1):
# Progress
if i % 10 == 0 or i == len(cve_urls):
print(f"Processing: {i}/{len(cve_urls)}")
# Download
raw_cve = download_cve(url)
if not raw_cve:
errors += 1
continue
# Parse
parsed = parse_cve_data(raw_cve)
if not parsed:
errors += 1
continue
# Insert
if insert_cve(conn, parsed):
loaded_count += 1
else:
errors += 1
# Commit changes
conn.commit()
# Update metadata
set_metadata(conn, 'last_update', datetime.now().isoformat())
set_metadata(conn, 'source', 'GitHub CVEProject/cvelistV5')
conn.close()
print(f"\n{'='*50}")
print(f"Loading complete!")
print(f" ✓ Loaded: {loaded_count} CVEs")
print(f" ✗ Errors: {errors}")
print(f"{'='*50}\n")
return loaded_count
if __name__ == "__main__":
# When run directly, load CVEs with defaults
import argparse
parser = argparse.ArgumentParser(description="Load CVE data database")
parser.add_argument("--year", default="2024", help="Year to get CVEs from")
parser.add_argument("--limit", type=int, default=100, help="Max CVEs to load")
args = parser.parse_args()
load_cves(year=args.year, limit=args.limit)