sqlite_store.py•5.88 kB
"""
SQLite storage layer for deep code index data.
This module centralizes SQLite setup, schema management, and connection
pragmas so higher-level builders/managers can focus on data orchestration.
"""
from __future__ import annotations
import json
import os
import sqlite3
import threading
from contextlib import contextmanager
from typing import Any, Dict, Generator, Optional
SCHEMA_VERSION = 1
class SQLiteSchemaMismatchError(RuntimeError):
    """Raised when the on-disk schema cannot be used safely."""
class SQLiteIndexStore:
    """Utility wrapper around an on-disk SQLite database for the deep index."""
    def __init__(self, db_path: str) -> None:
        if not db_path or not isinstance(db_path, str):
            raise ValueError("db_path must be a non-empty string")
        self.db_path = db_path
        self._lock = threading.RLock()
    def initialize_schema(self) -> None:
        """Create database schema if needed and validate schema version."""
        with self._lock:
            os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
            with self.connect(for_build=True) as conn:
                self._create_tables(conn)
                self._ensure_schema_version(conn)
                # Ensure metadata contains the canonical project path placeholder
                if self.get_metadata(conn, "project_path") is None:
                    self.set_metadata(conn, "project_path", "")
    @contextmanager
    def connect(self, *, for_build: bool = False) -> Generator[sqlite3.Connection, None, None]:
        """
        Context manager yielding a configured SQLite connection.
        Args:
            for_build: Apply write-optimized pragmas (journal mode, cache size).
        """
        with self._lock:
            conn = sqlite3.connect(self.db_path, check_same_thread=False)
            conn.row_factory = sqlite3.Row
            self._apply_pragmas(conn, for_build)
            try:
                yield conn
                conn.commit()
            except Exception:
                conn.rollback()
                raise
            finally:
                conn.close()
    def clear(self) -> None:
        """Remove existing database file."""
        with self._lock:
            if os.path.exists(self.db_path):
                os.remove(self.db_path)
    # Metadata helpers -------------------------------------------------
    def set_metadata(self, conn: sqlite3.Connection, key: str, value: Any) -> None:
        """Persist a metadata key/value pair (value stored as JSON string)."""
        conn.execute(
            """
            INSERT INTO metadata(key, value)
            VALUES(?, ?)
            ON CONFLICT(key) DO UPDATE SET value=excluded.value
            """,
            (key, json.dumps(value)),
        )
    def get_metadata(self, conn: sqlite3.Connection, key: str) -> Optional[Any]:
        """Retrieve a metadata value (deserialized from JSON)."""
        row = conn.execute("SELECT value FROM metadata WHERE key=?", (key,)).fetchone()
        if not row:
            return None
        try:
            return json.loads(row["value"])
        except json.JSONDecodeError:
            return row["value"]
    # Internal helpers -------------------------------------------------
    def _create_tables(self, conn: sqlite3.Connection) -> None:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS metadata (
                key TEXT PRIMARY KEY,
                value TEXT NOT NULL
            )
            """
        )
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS files (
                id INTEGER PRIMARY KEY,
                path TEXT UNIQUE NOT NULL,
                language TEXT,
                line_count INTEGER,
                imports TEXT,
                exports TEXT,
                package TEXT,
                docstring TEXT
            )
            """
        )
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS symbols (
                id INTEGER PRIMARY KEY,
                symbol_id TEXT UNIQUE NOT NULL,
                file_id INTEGER NOT NULL REFERENCES files(id) ON DELETE CASCADE,
                type TEXT,
                line INTEGER,
                signature TEXT,
                docstring TEXT,
                called_by TEXT,
                short_name TEXT
            )
            """
        )
        conn.execute(
            """
            CREATE INDEX IF NOT EXISTS idx_symbols_file ON symbols(file_id)
            """
        )
        conn.execute(
            """
            CREATE INDEX IF NOT EXISTS idx_symbols_short_name ON symbols(short_name)
            """
        )
    def _ensure_schema_version(self, conn: sqlite3.Connection) -> None:
        stored = self.get_metadata(conn, "schema_version")
        if stored is None:
            self.set_metadata(conn, "schema_version", SCHEMA_VERSION)
            return
        if int(stored) != SCHEMA_VERSION:
            raise SQLiteSchemaMismatchError(
                f"Unexpected schema version {stored} (expected {SCHEMA_VERSION})"
            )
    def _apply_pragmas(self, conn: sqlite3.Connection, for_build: bool) -> None:
        pragmas: Dict[str, Any] = {
            "journal_mode": "WAL" if for_build else "WAL",
            "synchronous": "NORMAL" if for_build else "FULL",
            "cache_size": -262144,  # negative => size in KB, ~256MB
        }
        for pragma, value in pragmas.items():
            try:
                conn.execute(f"PRAGMA {pragma}={value}")
            except sqlite3.DatabaseError:
                # PRAGMA not supported or rejected; continue best-effort.
                continue
        if for_build:
            try:
                conn.execute("PRAGMA temp_store=MEMORY")
            except sqlite3.DatabaseError:
                pass