#!/usr/bin/env python3
import sqlite3
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import get_db_path as config_get_db_path
def get_db_path() -> Path:
return config_get_db_path()
def init_db(db_path: Path = None) -> sqlite3.Connection:
# FIXME: should add index on description for faster LIKE queries
if db_path is None:
db_path = get_db_path()
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.execute('''
CREATE TABLE IF NOT EXISTS cves (
cve_id TEXT PRIMARY KEY,
description TEXT,
severity TEXT,
cvss_score REAL,
published_date TEXT,
modified_date TEXT,
references_json TEXT
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT
)
''')
conn.commit()
return conn
def insert_cve(conn: sqlite3.Connection, cve_data: dict) -> bool:
try:
conn.execute('''
INSERT OR REPLACE INTO cves
(cve_id, description, severity, cvss_score, published_date, modified_date, references_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
cve_data.get('cve_id'),
cve_data.get('description'),
cve_data.get('severity'),
cve_data.get('cvss_score'),
cve_data.get('published_date'),
cve_data.get('modified_date'),
cve_data.get('references_json')
))
return True
except Exception as e:
print(f"Error inserting CVE: {e}")
return False
def get_cve(conn: sqlite3.Connection, cve_id: str) -> dict | None:
cursor = conn.execute("SELECT * FROM cves WHERE cve_id = ?", (cve_id.upper(),))
row = cursor.fetchone()
return dict(row) if row else None
def search_cves(conn: sqlite3.Connection, keyword: str, limit: int = 10) -> list[dict]:
cursor = conn.execute(
"SELECT * FROM cves WHERE description LIKE ? LIMIT ?",
(f"%{keyword}%", limit)
)
return [dict(row) for row in cursor.fetchall()]
def get_stats(conn: sqlite3.Connection) -> dict:
cursor = conn.execute("SELECT COUNT(*) as total FROM cves")
total = cursor.fetchone()['total']
cursor = conn.execute("SELECT MIN(published_date) as oldest, MAX(published_date) as newest FROM cves")
dates = cursor.fetchone()
cursor = conn.execute("SELECT value FROM metadata WHERE key = 'last_update'")
last_update_row = cursor.fetchone()
last_update = last_update_row['value'] if last_update_row else None
return {
"total_cves": total,
"oldest_cve": dates['oldest'],
"newest_cve": dates['newest'],
"last_update": last_update
}
def set_metadata(conn: sqlite3.Connection, key: str, value: str):
conn.execute("INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", (key, value))
conn.commit()