#!/usr/bin/env python3
import json
import subprocess
import sys
from pathlib import Path
from datetime import datetime
sys.path.insert(0, str(Path(__file__).parent.parent))
from database.db import init_db, insert_cve, set_metadata
from data_ingestion.loader import parse_cve_data
from config import get_repo_url, get_repo_path
def clone_or_update_repo():
repo_url = get_repo_url()
repo_path = get_repo_path()
if repo_path.exists():
print("Repo exists, pulling updates...")
try:
subprocess.run(
["git", "-C", str(repo_path), "pull"],
capture_output=True,
check=True
)
print("Repo updated")
except subprocess.CalledProcessError:
print("Update failed, using existing repo")
else:
print("Cloning CVE repo (~5 mins)...")
repo_path.parent.mkdir(parents=True, exist_ok=True)
subprocess.run([
"git", "clone", "--depth", "1", "--filter=blob:none",
repo_url,
str(repo_path)
], check=True)
print("Clone complete")
return repo_path
def load_all_cves(db_path=None, years=None):
print("\nOptimized CVE Loader\n")
repo_path = clone_or_update_repo()
conn = init_db(db_path)
print("Database initialized\n")
cve_base = repo_path / "cves"
if years is None:
year_dirs = [d for d in cve_base.iterdir() if d.is_dir() and d.name.isdigit()]
years = [d.name for d in sorted(year_dirs)]
print(f"Loading from years: {', '.join(years)}\n")
total_loaded = 0
total_errors = 0
start_time = datetime.now()
for year in years:
year_path = cve_base / year
if not year_path.exists():
print(f"⚠ Year {year} not found, skipping")
continue
cve_files = list(year_path.rglob("*.json"))
print(f"Processing {year}: {len(cve_files)} files")
year_loaded = 0
year_errors = 0
for i, cve_file in enumerate(cve_files, 1):
try:
with open(cve_file, 'r') as f:
raw_cve = json.load(f)
parsed = parse_cve_data(raw_cve)
if parsed and insert_cve(conn, parsed):
year_loaded += 1
else:
year_errors += 1
if i % 1000 == 0:
print(f" {i}/{len(cve_files)}")
except Exception as e:
year_errors += 1
conn.commit()
print(f" {year}: {year_loaded} loaded, {year_errors} errors\n")
total_loaded += year_loaded
total_errors += year_errors
set_metadata(conn, 'last_update', datetime.now().isoformat())
set_metadata(conn, 'source', 'GitHub CVEProject/cvelistV5 (Git Clone)')
set_metadata(conn, 'load_method', 'optimized')
conn.close()
duration = (datetime.now() - start_time).total_seconds()
print(f"\nDone! {total_loaded} CVEs in {duration:.1f}s ({total_loaded/duration:.0f} CVEs/sec)")
if total_errors > 0:
print(f"Errors: {total_errors}\n")
return total_loaded
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Optimized CVE loader using Git Clone method"
)
parser.add_argument(
"--years",
nargs="+",
help="Specific years to load (default: all)"
)
parser.add_argument(
"--db",
help="Database path (default: data/cve.db)"
)
args = parser.parse_args()
load_all_cves(db_path=args.db, years=args.years)