Skip to main content
Glama
loader_optimized.py3.67 kB
#!/usr/bin/env python3 import json import subprocess import sys from pathlib import Path from datetime import datetime sys.path.insert(0, str(Path(__file__).parent.parent)) from database.db import init_db, insert_cve, set_metadata from data_ingestion.loader import parse_cve_data from config import get_repo_url, get_repo_path def clone_or_update_repo(): repo_url = get_repo_url() repo_path = get_repo_path() if repo_path.exists(): print("Repo exists, pulling updates...") try: subprocess.run( ["git", "-C", str(repo_path), "pull"], capture_output=True, check=True ) print("Repo updated") except subprocess.CalledProcessError: print("Update failed, using existing repo") else: print("Cloning CVE repo (~5 mins)...") repo_path.parent.mkdir(parents=True, exist_ok=True) subprocess.run([ "git", "clone", "--depth", "1", "--filter=blob:none", repo_url, str(repo_path) ], check=True) print("Clone complete") return repo_path def load_all_cves(db_path=None, years=None): print("\nOptimized CVE Loader\n") repo_path = clone_or_update_repo() conn = init_db(db_path) print("Database initialized\n") cve_base = repo_path / "cves" if years is None: year_dirs = [d for d in cve_base.iterdir() if d.is_dir() and d.name.isdigit()] years = [d.name for d in sorted(year_dirs)] print(f"Loading from years: {', '.join(years)}\n") total_loaded = 0 total_errors = 0 start_time = datetime.now() for year in years: year_path = cve_base / year if not year_path.exists(): print(f"⚠ Year {year} not found, skipping") continue cve_files = list(year_path.rglob("*.json")) print(f"Processing {year}: {len(cve_files)} files") year_loaded = 0 year_errors = 0 for i, cve_file in enumerate(cve_files, 1): try: with open(cve_file, 'r') as f: raw_cve = json.load(f) parsed = parse_cve_data(raw_cve) if parsed and insert_cve(conn, parsed): year_loaded += 1 else: year_errors += 1 if i % 1000 == 0: print(f" {i}/{len(cve_files)}") except Exception as e: year_errors += 1 conn.commit() print(f" {year}: {year_loaded} loaded, {year_errors} errors\n") total_loaded += year_loaded total_errors += year_errors set_metadata(conn, 'last_update', datetime.now().isoformat()) set_metadata(conn, 'source', 'GitHub CVEProject/cvelistV5 (Git Clone)') set_metadata(conn, 'load_method', 'optimized') conn.close() duration = (datetime.now() - start_time).total_seconds() print(f"\nDone! {total_loaded} CVEs in {duration:.1f}s ({total_loaded/duration:.0f} CVEs/sec)") if total_errors > 0: print(f"Errors: {total_errors}\n") return total_loaded if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Optimized CVE loader using Git Clone method" ) parser.add_argument( "--years", nargs="+", help="Specific years to load (default: all)" ) parser.add_argument( "--db", help="Database path (default: data/cve.db)" ) args = parser.parse_args() load_all_cves(db_path=args.db, years=args.years)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/davidculver/cve-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server