"""Database management using SQLite."""
import sqlite3
import json
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict, Any
import logging
from ..models import (
BugBountyProgram,
ScanResult,
Finding,
Target,
AuditLogEntry,
ScanStatus,
)
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Manages SQLite database for bug bounty data."""
def __init__(self, db_path: str = "./data/bugbounty.db"):
"""Initialize database manager.
Args:
db_path: Path to SQLite database file
"""
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn: Optional[sqlite3.Connection] = None
self._initialize_db()
def _initialize_db(self) -> None:
"""Initialize database schema."""
self.conn = sqlite3.connect(str(self.db_path))
self.conn.row_factory = sqlite3.Row
cursor = self.conn.cursor()
# Programs table
cursor.execute("""
CREATE TABLE IF NOT EXISTS programs (
program_id TEXT PRIMARY KEY,
platform TEXT NOT NULL,
name TEXT NOT NULL,
url TEXT NOT NULL,
enrolled BOOLEAN NOT NULL DEFAULT 0,
max_severity TEXT,
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
data JSON
)
""")
# Scans table
cursor.execute("""
CREATE TABLE IF NOT EXISTS scans (
scan_id TEXT PRIMARY KEY,
program_id TEXT NOT NULL,
tool TEXT NOT NULL,
target TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT NOT NULL,
error_message TEXT,
duration_seconds REAL,
metadata JSON,
FOREIGN KEY (program_id) REFERENCES programs(program_id)
)
""")
# Findings table
cursor.execute("""
CREATE TABLE IF NOT EXISTS findings (
finding_id INTEGER PRIMARY KEY AUTOINCREMENT,
scan_id TEXT NOT NULL,
title TEXT NOT NULL,
severity TEXT NOT NULL,
description TEXT,
cvss_score REAL,
cwe_id TEXT,
evidence JSON,
remediation TEXT,
confirmed BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (scan_id) REFERENCES scans(scan_id)
)
""")
# Targets table
cursor.execute("""
CREATE TABLE IF NOT EXISTS targets (
target_id TEXT PRIMARY KEY,
program_id TEXT NOT NULL,
value TEXT NOT NULL,
type TEXT NOT NULL,
in_scope BOOLEAN NOT NULL,
active BOOLEAN DEFAULT 1,
last_scanned TIMESTAMP,
notes TEXT,
FOREIGN KEY (program_id) REFERENCES programs(program_id)
)
""")
# Audit log table
cursor.execute("""
CREATE TABLE IF NOT EXISTS audit_log (
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
action TEXT NOT NULL,
program_id TEXT,
target TEXT,
tool TEXT,
success BOOLEAN NOT NULL,
details JSON,
validation_result TEXT
)
""")
# Create indexes
cursor.execute("CREATE INDEX IF NOT EXISTS idx_scans_program ON scans(program_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_findings_scan ON findings(scan_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_findings_severity ON findings(severity)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_targets_program ON targets(program_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_audit_program ON audit_log(program_id)")
self.conn.commit()
logger.info(f"Database initialized at {self.db_path}")
def save_scan_result(self, scan_result: ScanResult) -> None:
"""Save scan result to database.
Args:
scan_result: ScanResult object to save
"""
cursor = self.conn.cursor()
# Save scan
cursor.execute("""
INSERT OR REPLACE INTO scans
(scan_id, program_id, tool, target, timestamp, status, error_message, duration_seconds, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
scan_result.scan_id,
scan_result.program_id,
scan_result.tool,
scan_result.target,
scan_result.timestamp,
scan_result.status,
scan_result.error_message,
scan_result.duration_seconds,
json.dumps(scan_result.metadata),
))
# Save findings
for finding in scan_result.findings:
cursor.execute("""
INSERT INTO findings
(scan_id, title, severity, description, cvss_score, cwe_id, evidence, remediation, confirmed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
scan_result.scan_id,
finding.title,
finding.severity,
finding.description,
finding.cvss_score,
finding.cwe_id,
json.dumps(finding.evidence),
finding.remediation,
finding.confirmed,
))
self.conn.commit()
def get_scan_result(self, scan_id: str) -> Optional[ScanResult]:
"""Retrieve scan result by ID.
Args:
scan_id: Scan ID to retrieve
Returns:
ScanResult object if found, None otherwise
"""
cursor = self.conn.cursor()
# Get scan
cursor.execute("SELECT * FROM scans WHERE scan_id = ?", (scan_id,))
scan_row = cursor.fetchone()
if not scan_row:
return None
# Get findings
cursor.execute("SELECT * FROM findings WHERE scan_id = ?", (scan_id,))
finding_rows = cursor.fetchall()
findings = []
for row in finding_rows:
findings.append(Finding(
title=row['title'],
severity=row['severity'],
description=row['description'],
cvss_score=row['cvss_score'],
cwe_id=row['cwe_id'],
evidence=json.loads(row['evidence']) if row['evidence'] else {},
remediation=row['remediation'],
confirmed=bool(row['confirmed']),
))
return ScanResult(
scan_id=scan_row['scan_id'],
program_id=scan_row['program_id'],
tool=scan_row['tool'],
target=scan_row['target'],
timestamp=datetime.fromisoformat(scan_row['timestamp']),
findings=findings,
status=ScanStatus(scan_row['status']),
error_message=scan_row['error_message'],
metadata=json.loads(scan_row['metadata']) if scan_row['metadata'] else {},
duration_seconds=scan_row['duration_seconds'],
)
def list_scans(self, program_id: Optional[str] = None, limit: int = 100) -> List[ScanResult]:
"""List scan results.
Args:
program_id: Filter by program ID (optional)
limit: Maximum number of results
Returns:
List of ScanResult objects
"""
cursor = self.conn.cursor()
if program_id:
cursor.execute(
"SELECT scan_id FROM scans WHERE program_id = ? ORDER BY timestamp DESC LIMIT ?",
(program_id, limit)
)
else:
cursor.execute(
"SELECT scan_id FROM scans ORDER BY timestamp DESC LIMIT ?",
(limit,)
)
scan_ids = [row['scan_id'] for row in cursor.fetchall()]
return [self.get_scan_result(sid) for sid in scan_ids if self.get_scan_result(sid)]
def save_target(self, target: Target) -> None:
"""Save target to database.
Args:
target: Target object to save
"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO targets
(target_id, program_id, value, type, in_scope, active, last_scanned, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
target.target_id,
target.program_id,
target.value,
target.type,
target.in_scope,
target.active,
target.last_scanned,
target.notes,
))
self.conn.commit()
def log_audit(self, entry: AuditLogEntry) -> None:
"""Save audit log entry.
Args:
entry: AuditLogEntry to save
"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO audit_log
(timestamp, action, program_id, target, tool, success, details, validation_result)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
entry.timestamp,
entry.action,
entry.program_id,
entry.target,
entry.tool,
entry.success,
json.dumps(entry.details),
entry.validation_result,
))
self.conn.commit()
def get_statistics(self, program_id: Optional[str] = None) -> Dict[str, Any]:
"""Get statistics on scans and findings.
Args:
program_id: Filter by program ID (optional)
Returns:
Dictionary with statistics
"""
cursor = self.conn.cursor()
stats = {}
# Total scans
if program_id:
cursor.execute("SELECT COUNT(*) as count FROM scans WHERE program_id = ?", (program_id,))
else:
cursor.execute("SELECT COUNT(*) as count FROM scans")
stats['total_scans'] = cursor.fetchone()['count']
# Findings by severity
if program_id:
cursor.execute("""
SELECT f.severity, COUNT(*) as count
FROM findings f
JOIN scans s ON f.scan_id = s.scan_id
WHERE s.program_id = ?
GROUP BY f.severity
""", (program_id,))
else:
cursor.execute("""
SELECT severity, COUNT(*) as count
FROM findings
GROUP BY severity
""")
stats['findings_by_severity'] = {row['severity']: row['count'] for row in cursor.fetchall()}
return stats
def close(self) -> None:
"""Close database connection."""
if self.conn:
self.conn.close()