#!/usr/bin/env python3
"""
Benchmark different CVE loading methods.
Compares time, database size, and data quality.
"""
import time
import json
import sqlite3
import subprocess
import requests
from pathlib import Path
from datetime import datetime
import sys
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from database.db import init_db, insert_cve
from data_ingestion.loader import parse_cve_data
class BenchmarkResult:
def __init__(self, method_name):
self.method_name = method_name
self.start_time = None
self.end_time = None
self.cves_loaded = 0
self.errors = 0
self.db_size_bytes = 0
self.db_path = None
def start(self):
self.start_time = time.time()
def end(self):
self.end_time = time.time()
@property
def duration_seconds(self):
if self.start_time and self.end_time:
return self.end_time - self.start_time
return 0
@property
def cves_per_second(self):
if self.duration_seconds > 0:
return self.cves_loaded / self.duration_seconds
return 0
def measure_db_size(self):
if self.db_path and self.db_path.exists():
self.db_size_bytes = self.db_path.stat().st_size
def report(self):
print(f"\n{'='*60}")
print(f"Method: {self.method_name}")
print(f"{'='*60}")
print(f"Duration: {self.duration_seconds:.2f} seconds ({self.duration_seconds/60:.2f} minutes)")
print(f"CVEs Loaded: {self.cves_loaded}")
print(f"Errors: {self.errors}")
print(f"Speed: {self.cves_per_second:.2f} CVEs/second")
print(f"DB Size: {self.db_size_bytes / 1024 / 1024:.2f} MB")
print(f"Bytes per CVE: {self.db_size_bytes / self.cves_loaded if self.cves_loaded > 0 else 0:.2f} bytes")
def method_1_github_api(year="2024", limit=100):
"""Current method: GitHub API individual file downloads."""
print(f"\n>>> Testing Method 1: GitHub API (limit={limit})")
result = BenchmarkResult("GitHub API (Individual Files)")
db_path = Path("benchmarks/test_github_api.db")
# Clean up
if db_path.exists():
db_path.unlink()
result.db_path = db_path
result.start()
# Use existing loader
from data_ingestion.loader import load_cves
loaded = load_cves(db_path=db_path, year=year, limit=limit)
result.cves_loaded = loaded
result.end()
result.measure_db_size()
return result
def method_2_git_clone(year="2024", limit=100):
"""Method 2: Git clone entire repository, parse locally."""
print(f"\n>>> Testing Method 2: Git Clone + Local Parse (limit={limit})")
result = BenchmarkResult("Git Clone (Local Parsing)")
db_path = Path("benchmarks/test_git_clone.db")
# Clean up
if db_path.exists():
db_path.unlink()
result.db_path = db_path
result.start()
# Clone repository (shallow clone for speed)
repo_path = Path("benchmarks/cvelistV5")
if not repo_path.exists():
print(" Cloning CVE repository (this may take a few minutes)...")
subprocess.run([
"git", "clone", "--depth", "1", "--filter=blob:none",
"https://github.com/CVEProject/cvelistV5.git",
str(repo_path)
], capture_output=True)
# Initialize database
conn = init_db(db_path)
# Parse CVE files from cloned repo
cve_dir = repo_path / "cves" / year
if cve_dir.exists():
cve_files = list(cve_dir.rglob("*.json"))
print(f" Found {len(cve_files)} CVE files for {year}")
for i, cve_file in enumerate(cve_files[:limit]):
try:
with open(cve_file, 'r') as f:
raw_cve = json.load(f)
parsed = parse_cve_data(raw_cve)
if parsed and insert_cve(conn, parsed):
result.cves_loaded += 1
else:
result.errors += 1
except Exception as e:
result.errors += 1
conn.commit()
conn.close()
result.end()
result.measure_db_size()
return result
def method_3_nvd_feeds(year="2024"):
"""Method 3: NVD bulk JSON feeds."""
print(f"\n>>> Testing Method 3: NVD JSON Feeds (year={year})")
result = BenchmarkResult("NVD JSON Feeds (Bulk Download)")
db_path = Path("benchmarks/test_nvd_feeds.db")
# Clean up
if db_path.exists():
db_path.unlink()
result.db_path = db_path
result.start()
# NVD feed URL format
nvd_url = f"https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{year}.json.gz"
print(f" Note: NVD feeds use older format, may require different parsing")
print(f" URL: {nvd_url}")
print(f" Skipping actual download for now (would need format conversion)")
# This would require implementing NVD-specific parser
result.errors = 1 # Mark as not implemented
result.end()
return result
def run_benchmark(methods_to_test=None, limits=None):
"""Run benchmark comparing different methods."""
if methods_to_test is None:
methods_to_test = [1, 2] # Default to GitHub API and Git Clone
if limits is None:
limits = [50, 100, 500] # Different dataset sizes
print(f"\n{'#'*60}")
print(f"CVE Loading Benchmark Study")
print(f"{'#'*60}")
print(f"Date: {datetime.now().isoformat()}")
print(f"Testing methods: {methods_to_test}")
print(f"Dataset sizes: {limits}")
all_results = []
for limit in limits:
print(f"\n\n{'='*60}")
print(f"Testing with limit={limit} CVEs")
print(f"{'='*60}")
if 1 in methods_to_test:
result = method_1_github_api(year="2024", limit=limit)
result.report()
all_results.append(result)
if 2 in methods_to_test:
result = method_2_git_clone(year="2024", limit=limit)
result.report()
all_results.append(result)
if 3 in methods_to_test:
result = method_3_nvd_feeds(year="2024")
result.report()
all_results.append(result)
# Summary comparison
print(f"\n\n{'#'*60}")
print(f"SUMMARY COMPARISON")
print(f"{'#'*60}\n")
print(f"{'Method':<35} {'CVEs':<10} {'Time(s)':<12} {'Speed':<15} {'Size(MB)':<10}")
print(f"{'-'*90}")
for r in all_results:
if r.cves_loaded > 0:
print(f"{r.method_name:<35} {r.cves_loaded:<10} {r.duration_seconds:<12.2f} "
f"{r.cves_per_second:<15.2f} {r.db_size_bytes/1024/1024:<10.2f}")
# Recommendations
print(f"\n{'#'*60}")
print(f"RECOMMENDATIONS")
print(f"{'#'*60}\n")
fastest = max([r for r in all_results if r.cves_loaded > 0],
key=lambda x: x.cves_per_second)
print(f"✓ Fastest method: {fastest.method_name}")
print(f" Speed: {fastest.cves_per_second:.2f} CVEs/second")
most_efficient = min([r for r in all_results if r.cves_loaded > 0],
key=lambda x: x.db_size_bytes / x.cves_loaded if x.cves_loaded > 0 else float('inf'))
print(f"\n✓ Most space-efficient: {most_efficient.method_name}")
print(f" Size per CVE: {most_efficient.db_size_bytes / most_efficient.cves_loaded:.2f} bytes")
# Extrapolate to full dataset
print(f"\n{'#'*60}")
print(f"FULL DATASET PROJECTIONS (240,000 CVEs)")
print(f"{'#'*60}\n")
for r in all_results:
if r.cves_loaded >= 100: # Only extrapolate from substantial samples
time_for_full = (240000 / r.cves_loaded) * r.duration_seconds
size_for_full = (240000 / r.cves_loaded) * r.db_size_bytes
print(f"{r.method_name}:")
print(f" Estimated time: {time_for_full/60:.2f} minutes ({time_for_full/3600:.2f} hours)")
print(f" Estimated size: {size_for_full/1024/1024:.2f} MB ({size_for_full/1024/1024/1024:.2f} GB)")
print()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Benchmark CVE loading methods")
parser.add_argument("--methods", nargs="+", type=int, default=[1, 2],
help="Methods to test (1=GitHub API, 2=Git Clone, 3=NVD Feeds)")
parser.add_argument("--limits", nargs="+", type=int, default=[50, 100],
help="Dataset sizes to test")
args = parser.parse_args()
run_benchmark(methods_to_test=args.methods, limits=args.limits)