Skip to main content
Glama
cve_loading_benchmark.py8.75 kB
#!/usr/bin/env python3 """ Benchmark different CVE loading methods. Compares time, database size, and data quality. """ import time import json import sqlite3 import subprocess import requests from pathlib import Path from datetime import datetime import sys # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from database.db import init_db, insert_cve from data_ingestion.loader import parse_cve_data class BenchmarkResult: def __init__(self, method_name): self.method_name = method_name self.start_time = None self.end_time = None self.cves_loaded = 0 self.errors = 0 self.db_size_bytes = 0 self.db_path = None def start(self): self.start_time = time.time() def end(self): self.end_time = time.time() @property def duration_seconds(self): if self.start_time and self.end_time: return self.end_time - self.start_time return 0 @property def cves_per_second(self): if self.duration_seconds > 0: return self.cves_loaded / self.duration_seconds return 0 def measure_db_size(self): if self.db_path and self.db_path.exists(): self.db_size_bytes = self.db_path.stat().st_size def report(self): print(f"\n{'='*60}") print(f"Method: {self.method_name}") print(f"{'='*60}") print(f"Duration: {self.duration_seconds:.2f} seconds ({self.duration_seconds/60:.2f} minutes)") print(f"CVEs Loaded: {self.cves_loaded}") print(f"Errors: {self.errors}") print(f"Speed: {self.cves_per_second:.2f} CVEs/second") print(f"DB Size: {self.db_size_bytes / 1024 / 1024:.2f} MB") print(f"Bytes per CVE: {self.db_size_bytes / self.cves_loaded if self.cves_loaded > 0 else 0:.2f} bytes") def method_1_github_api(year="2024", limit=100): """Current method: GitHub API individual file downloads.""" print(f"\n>>> Testing Method 1: GitHub API (limit={limit})") result = BenchmarkResult("GitHub API (Individual Files)") db_path = Path("benchmarks/test_github_api.db") # Clean up if db_path.exists(): db_path.unlink() result.db_path = db_path result.start() # Use existing loader from data_ingestion.loader import load_cves loaded = load_cves(db_path=db_path, year=year, limit=limit) result.cves_loaded = loaded result.end() result.measure_db_size() return result def method_2_git_clone(year="2024", limit=100): """Method 2: Git clone entire repository, parse locally.""" print(f"\n>>> Testing Method 2: Git Clone + Local Parse (limit={limit})") result = BenchmarkResult("Git Clone (Local Parsing)") db_path = Path("benchmarks/test_git_clone.db") # Clean up if db_path.exists(): db_path.unlink() result.db_path = db_path result.start() # Clone repository (shallow clone for speed) repo_path = Path("benchmarks/cvelistV5") if not repo_path.exists(): print(" Cloning CVE repository (this may take a few minutes)...") subprocess.run([ "git", "clone", "--depth", "1", "--filter=blob:none", "https://github.com/CVEProject/cvelistV5.git", str(repo_path) ], capture_output=True) # Initialize database conn = init_db(db_path) # Parse CVE files from cloned repo cve_dir = repo_path / "cves" / year if cve_dir.exists(): cve_files = list(cve_dir.rglob("*.json")) print(f" Found {len(cve_files)} CVE files for {year}") for i, cve_file in enumerate(cve_files[:limit]): try: with open(cve_file, 'r') as f: raw_cve = json.load(f) parsed = parse_cve_data(raw_cve) if parsed and insert_cve(conn, parsed): result.cves_loaded += 1 else: result.errors += 1 except Exception as e: result.errors += 1 conn.commit() conn.close() result.end() result.measure_db_size() return result def method_3_nvd_feeds(year="2024"): """Method 3: NVD bulk JSON feeds.""" print(f"\n>>> Testing Method 3: NVD JSON Feeds (year={year})") result = BenchmarkResult("NVD JSON Feeds (Bulk Download)") db_path = Path("benchmarks/test_nvd_feeds.db") # Clean up if db_path.exists(): db_path.unlink() result.db_path = db_path result.start() # NVD feed URL format nvd_url = f"https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{year}.json.gz" print(f" Note: NVD feeds use older format, may require different parsing") print(f" URL: {nvd_url}") print(f" Skipping actual download for now (would need format conversion)") # This would require implementing NVD-specific parser result.errors = 1 # Mark as not implemented result.end() return result def run_benchmark(methods_to_test=None, limits=None): """Run benchmark comparing different methods.""" if methods_to_test is None: methods_to_test = [1, 2] # Default to GitHub API and Git Clone if limits is None: limits = [50, 100, 500] # Different dataset sizes print(f"\n{'#'*60}") print(f"CVE Loading Benchmark Study") print(f"{'#'*60}") print(f"Date: {datetime.now().isoformat()}") print(f"Testing methods: {methods_to_test}") print(f"Dataset sizes: {limits}") all_results = [] for limit in limits: print(f"\n\n{'='*60}") print(f"Testing with limit={limit} CVEs") print(f"{'='*60}") if 1 in methods_to_test: result = method_1_github_api(year="2024", limit=limit) result.report() all_results.append(result) if 2 in methods_to_test: result = method_2_git_clone(year="2024", limit=limit) result.report() all_results.append(result) if 3 in methods_to_test: result = method_3_nvd_feeds(year="2024") result.report() all_results.append(result) # Summary comparison print(f"\n\n{'#'*60}") print(f"SUMMARY COMPARISON") print(f"{'#'*60}\n") print(f"{'Method':<35} {'CVEs':<10} {'Time(s)':<12} {'Speed':<15} {'Size(MB)':<10}") print(f"{'-'*90}") for r in all_results: if r.cves_loaded > 0: print(f"{r.method_name:<35} {r.cves_loaded:<10} {r.duration_seconds:<12.2f} " f"{r.cves_per_second:<15.2f} {r.db_size_bytes/1024/1024:<10.2f}") # Recommendations print(f"\n{'#'*60}") print(f"RECOMMENDATIONS") print(f"{'#'*60}\n") fastest = max([r for r in all_results if r.cves_loaded > 0], key=lambda x: x.cves_per_second) print(f"✓ Fastest method: {fastest.method_name}") print(f" Speed: {fastest.cves_per_second:.2f} CVEs/second") most_efficient = min([r for r in all_results if r.cves_loaded > 0], key=lambda x: x.db_size_bytes / x.cves_loaded if x.cves_loaded > 0 else float('inf')) print(f"\n✓ Most space-efficient: {most_efficient.method_name}") print(f" Size per CVE: {most_efficient.db_size_bytes / most_efficient.cves_loaded:.2f} bytes") # Extrapolate to full dataset print(f"\n{'#'*60}") print(f"FULL DATASET PROJECTIONS (240,000 CVEs)") print(f"{'#'*60}\n") for r in all_results: if r.cves_loaded >= 100: # Only extrapolate from substantial samples time_for_full = (240000 / r.cves_loaded) * r.duration_seconds size_for_full = (240000 / r.cves_loaded) * r.db_size_bytes print(f"{r.method_name}:") print(f" Estimated time: {time_for_full/60:.2f} minutes ({time_for_full/3600:.2f} hours)") print(f" Estimated size: {size_for_full/1024/1024:.2f} MB ({size_for_full/1024/1024/1024:.2f} GB)") print() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Benchmark CVE loading methods") parser.add_argument("--methods", nargs="+", type=int, default=[1, 2], help="Methods to test (1=GitHub API, 2=Git Clone, 3=NVD Feeds)") parser.add_argument("--limits", nargs="+", type=int, default=[50, 100], help="Dataset sizes to test") args = parser.parse_args() run_benchmark(methods_to_test=args.methods, limits=args.limits)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/davidculver/cve-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server