#!/usr/bin/env python3
import json
import requests
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Generator
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from database.db import init_db, insert_cve, set_metadata
from config import get_github_api_base
def fetch_recent_cve_files(year: str = "2024", limit: int = 100) -> list[str]:
print("Getting CVE list for year {}...".format(year))
# Get year directory listing
github_api_base = get_github_api_base()
year_url = f"{github_api_base}/{year}"
response = requests.get(year_url, headers={"Accept": "application/vnd.github.v3+json"})
if response.status_code != 200:
print(f"Error getting year listing: {response.status_code}")
return []
subdirs = response.json()
cve_urls = []
# Iterate through ID range subdirectories (e.g., 0xxx, 1xxx)
for subdir in subdirs[:5]: # Limit subdirs for faster loading
if subdir['type'] != 'dir':
continue
subdir_url = subdir['url']
print(f" Scanning {subdir['name']}...")
response = requests.get(subdir_url, headers={"Accept": "application/vnd.github.v3+json"})
if response.status_code != 200:
continue
files = response.json()
for file in files:
if file['name'].endswith('.json'):
cve_urls.append(file['download_url'])
if len(cve_urls) >= limit:
return cve_urls
return cve_urls
def download_cve(url: str) -> dict | None:
try:
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
return resp.json()
except Exception as e:
print(f"Error downloading {url}: {e}")
return None
def parse_cve_data(raw_cve: dict) -> dict | None:
try:
cve_id = raw_cve.get('cveMetadata', {}).get('cveId')
if not cve_id:
return None
# Get containers
containers = raw_cve.get('containers', {})
cna = containers.get('cna', {})
# Get description
description = ""
descriptions = cna.get('descriptions', [])
for desc in descriptions:
if desc.get('lang', '').startswith('en'):
description = desc.get('value', '')
break
if not description and descriptions:
description = descriptions[0].get('value', '')
# Get severity and CVSS score
severity = "UNKNOWN"
cvss_score = None
metrics = cna.get('metrics', [])
for metric in metrics:
# Check for CVSS v3.1
if 'cvssV3_1' in metric:
cvss_data = metric['cvssV3_1']
severity = cvss_data.get('baseSeverity', severity)
cvss_score = cvss_data.get('baseScore')
break
# Check for CVSS v3.0
elif 'cvssV3_0' in metric:
cvss_data = metric['cvssV3_0']
severity = cvss_data.get('baseSeverity', severity)
cvss_score = cvss_data.get('baseScore')
break
# Get dates
cve_metadata = raw_cve.get('cveMetadata', {})
published_date = cve_metadata.get('datePublished', '')[:10] # YYYY-MM-DD
modified_date = cve_metadata.get('dateUpdated', '')[:10]
# Get references
references = []
for ref in cna.get('references', []):
if 'url' in ref:
references.append(ref['url'])
return {
'cve_id': cve_id,
'description': description,
'severity': severity,
'cvss_score': cvss_score,
'published_date': published_date,
'modified_date': modified_date,
'references_json': json.dumps(references[:10]) # Limit references
}
except Exception as e:
print(f"Error parsing CVE: {e}")
return None
def load_cves(db_path: Path = None, year: str = "2024", limit: int = 100) -> int:
print("\nCVE Data Loader\n")
conn = init_db(db_path)
print(f"Database: {db_path or 'default location'}\n")
cve_urls = fetch_recent_cve_files(year=year, limit=limit)
print(f"\nFound {len(cve_urls)} CVE files\n")
if not cve_urls:
print("No CVEs found")
return 0
loaded = 0
errors = 0
for i, url in enumerate(cve_urls, 1):
if i % 10 == 0 or i == len(cve_urls):
print(f"Processing: {i}/{len(cve_urls)}")
raw_cve = download_cve(url)
if not raw_cve:
errors += 1
continue
parsed = parse_cve_data(raw_cve)
if not parsed:
errors += 1
continue
if insert_cve(conn, parsed):
loaded += 1
else:
errors += 1
conn.commit()
set_metadata(conn, 'last_update', datetime.now().isoformat())
set_metadata(conn, 'source', 'GitHub CVEProject/cvelistV5')
conn.close()
print(f"\nDone! Loaded {loaded} CVEs, {errors} errors\n")
return loaded
if __name__ == "__main__":
# When run directly, load CVEs with defaults
import argparse
parser = argparse.ArgumentParser(description="Load CVE data database")
parser.add_argument("--year", default="2024", help="Year to get CVEs from")
parser.add_argument("--limit", type=int, default=100, help="Max CVEs to load")
args = parser.parse_args()
load_cves(year=args.year, limit=args.limit)