Skip to main content
Glama
loader.py5.61 kB
#!/usr/bin/env python3 import json import requests import sqlite3 from datetime import datetime from pathlib import Path from typing import Generator import sys sys.path.insert(0, str(Path(__file__).parent.parent)) from database.db import init_db, insert_cve, set_metadata from config import get_github_api_base def fetch_recent_cve_files(year: str = "2024", limit: int = 100) -> list[str]: print("Getting CVE list for year {}...".format(year)) # Get year directory listing github_api_base = get_github_api_base() year_url = f"{github_api_base}/{year}" response = requests.get(year_url, headers={"Accept": "application/vnd.github.v3+json"}) if response.status_code != 200: print(f"Error getting year listing: {response.status_code}") return [] subdirs = response.json() cve_urls = [] # Iterate through ID range subdirectories (e.g., 0xxx, 1xxx) for subdir in subdirs[:5]: # Limit subdirs for faster loading if subdir['type'] != 'dir': continue subdir_url = subdir['url'] print(f" Scanning {subdir['name']}...") response = requests.get(subdir_url, headers={"Accept": "application/vnd.github.v3+json"}) if response.status_code != 200: continue files = response.json() for file in files: if file['name'].endswith('.json'): cve_urls.append(file['download_url']) if len(cve_urls) >= limit: return cve_urls return cve_urls def download_cve(url: str) -> dict | None: try: resp = requests.get(url, timeout=10) if resp.status_code == 200: return resp.json() except Exception as e: print(f"Error downloading {url}: {e}") return None def parse_cve_data(raw_cve: dict) -> dict | None: try: cve_id = raw_cve.get('cveMetadata', {}).get('cveId') if not cve_id: return None # Get containers containers = raw_cve.get('containers', {}) cna = containers.get('cna', {}) # Get description description = "" descriptions = cna.get('descriptions', []) for desc in descriptions: if desc.get('lang', '').startswith('en'): description = desc.get('value', '') break if not description and descriptions: description = descriptions[0].get('value', '') # Get severity and CVSS score severity = "UNKNOWN" cvss_score = None metrics = cna.get('metrics', []) for metric in metrics: # Check for CVSS v3.1 if 'cvssV3_1' in metric: cvss_data = metric['cvssV3_1'] severity = cvss_data.get('baseSeverity', severity) cvss_score = cvss_data.get('baseScore') break # Check for CVSS v3.0 elif 'cvssV3_0' in metric: cvss_data = metric['cvssV3_0'] severity = cvss_data.get('baseSeverity', severity) cvss_score = cvss_data.get('baseScore') break # Get dates cve_metadata = raw_cve.get('cveMetadata', {}) published_date = cve_metadata.get('datePublished', '')[:10] # YYYY-MM-DD modified_date = cve_metadata.get('dateUpdated', '')[:10] # Get references references = [] for ref in cna.get('references', []): if 'url' in ref: references.append(ref['url']) return { 'cve_id': cve_id, 'description': description, 'severity': severity, 'cvss_score': cvss_score, 'published_date': published_date, 'modified_date': modified_date, 'references_json': json.dumps(references[:10]) # Limit references } except Exception as e: print(f"Error parsing CVE: {e}") return None def load_cves(db_path: Path = None, year: str = "2024", limit: int = 100) -> int: print("\nCVE Data Loader\n") conn = init_db(db_path) print(f"Database: {db_path or 'default location'}\n") cve_urls = fetch_recent_cve_files(year=year, limit=limit) print(f"\nFound {len(cve_urls)} CVE files\n") if not cve_urls: print("No CVEs found") return 0 loaded = 0 errors = 0 for i, url in enumerate(cve_urls, 1): if i % 10 == 0 or i == len(cve_urls): print(f"Processing: {i}/{len(cve_urls)}") raw_cve = download_cve(url) if not raw_cve: errors += 1 continue parsed = parse_cve_data(raw_cve) if not parsed: errors += 1 continue if insert_cve(conn, parsed): loaded += 1 else: errors += 1 conn.commit() set_metadata(conn, 'last_update', datetime.now().isoformat()) set_metadata(conn, 'source', 'GitHub CVEProject/cvelistV5') conn.close() print(f"\nDone! Loaded {loaded} CVEs, {errors} errors\n") return loaded if __name__ == "__main__": # When run directly, load CVEs with defaults import argparse parser = argparse.ArgumentParser(description="Load CVE data database") parser.add_argument("--year", default="2024", help="Year to get CVEs from") parser.add_argument("--limit", type=int, default=100, help="Max CVEs to load") args = parser.parse_args() load_cves(year=args.year, limit=args.limit)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/davidculver/cve-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server