#!/usr/bin/env python3
import sqlite3
from pathlib import Path
def get_db_path() -> Path:
"""Path to the database file."""
# Default to data/ directory in project root
return Path(__file__).parent.parent.parent / "data" / "cve.db"
def init_db(db_path: Path = None) -> sqlite3.Connection:
if db_path is None:
db_path = get_db_path()
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.execute('''
CREATE TABLE IF NOT EXISTS cves (
cve_id TEXT PRIMARY KEY,
description TEXT,
severity TEXT,
cvss_score REAL,
published_date TEXT,
modified_date TEXT,
references_json TEXT
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT
)
''')
conn.commit()
return conn
def insert_cve(conn: sqlite3.Connection, cve_data: dict) -> bool:
try:
conn.execute('''
INSERT OR REPLACE INTO cves
(cve_id, description, severity, cvss_score, published_date, modified_date, references_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
cve_data.get('cve_id'),
cve_data.get('description'),
cve_data.get('severity'),
cve_data.get('cvss_score'),
cve_data.get('published_date'),
cve_data.get('modified_date'),
cve_data.get('references_json')
))
return True
except Exception as e:
print(f"Error inserting CVE: {e}")
return False
def get_cve(conn: sqlite3.Connection, cve_id: str) -> dict | None:
"""Retrieve a single CVE by ID."""
cursor = conn.execute(
"SELECT * FROM cves WHERE cve_id = ?",
(cve_id.upper(),)
)
row = cursor.fetchone()
if row:
return dict(row)
return None
def search_cves(conn: sqlite3.Connection, keyword: str, limit: int = 10) -> list[dict]:
"""Search CVEs by keyword in description."""
cursor = conn.execute(
"SELECT * FROM cves WHERE description LIKE ? LIMIT ?",
(f"%{keyword}%", limit)
)
return [dict(row) for row in cursor.fetchall()]
def get_stats(conn: sqlite3.Connection) -> dict:
"""Get database statistics."""
cursor = conn.execute("SELECT COUNT(*) as total FROM cves")
total = cursor.fetchone()['total']
cursor = conn.execute("SELECT MIN(published_date) as oldest, MAX(published_date) as newest FROM cves")
dates = cursor.fetchone()
cursor = conn.execute("SELECT value FROM metadata WHERE key = 'last_update'")
last_update_row = cursor.fetchone()
last_update = last_update_row['value'] if last_update_row else None
return {
"total_cves": total,
"oldest_cve": dates['oldest'],
"newest_cve": dates['newest'],
"last_update": last_update
}
def set_metadata(conn: sqlite3.Connection, key: str, value: str):
"""Set a metadata value."""
conn.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)",
(key, value)
)
conn.commit()