sqlite_index_manager.py•13.6 kB
"""
SQLite-backed index manager coordinating builder and store.
"""
from __future__ import annotations
import json
import logging
import os
import re
import tempfile
import threading
from pathlib import Path
from typing import Any, Dict, List, Optional
from .sqlite_index_builder import SQLiteIndexBuilder
from .sqlite_store import SQLiteIndexStore, SQLiteSchemaMismatchError
from ..constants import INDEX_FILE_DB, INDEX_FILE, INDEX_FILE_SHALLOW, SETTINGS_DIR
logger = logging.getLogger(__name__)
class SQLiteIndexManager:
    """Manage lifecycle of SQLite-backed deep index."""
    def __init__(self) -> None:
        self.project_path: Optional[str] = None
        self.index_builder: Optional[SQLiteIndexBuilder] = None
        self.store: Optional[SQLiteIndexStore] = None
        self.temp_dir: Optional[str] = None
        self.index_path: Optional[str] = None
        self.shallow_index_path: Optional[str] = None
        self._shallow_file_list: Optional[List[str]] = None
        self._is_loaded = False
        self._lock = threading.RLock()
        logger.info("Initialized SQLite Index Manager")
    def set_project_path(self, project_path: str) -> bool:
        """Configure project path and underlying storage location."""
        with self._lock:
            if not project_path or not isinstance(project_path, str):
                logger.error("Invalid project path: %s", project_path)
                return False
            project_path = project_path.strip()
            if not project_path or not os.path.isdir(project_path):
                logger.error("Project path does not exist: %s", project_path)
                return False
            self.project_path = project_path
            project_hash = _hash_project_path(project_path)
            self.temp_dir = os.path.join(tempfile.gettempdir(), SETTINGS_DIR, project_hash)
            os.makedirs(self.temp_dir, exist_ok=True)
            self.index_path = os.path.join(self.temp_dir, INDEX_FILE_DB)
            legacy_path = os.path.join(self.temp_dir, INDEX_FILE)
            if os.path.exists(legacy_path):
                try:
                    os.remove(legacy_path)
                    logger.info("Removed legacy JSON index at %s", legacy_path)
                except OSError as exc:  # pragma: no cover - best effort
                    logger.warning("Failed to remove legacy index %s: %s", legacy_path, exc)
            self.shallow_index_path = os.path.join(self.temp_dir, INDEX_FILE_SHALLOW)
            self.store = SQLiteIndexStore(self.index_path)
            self.index_builder = SQLiteIndexBuilder(project_path, self.store)
            self._is_loaded = False
            logger.info("SQLite index storage: %s", self.index_path)
            return True
    def build_index(self, force_rebuild: bool = False) -> bool:
        """Build or rebuild the SQLite index."""
        with self._lock:
            if not self.index_builder:
                logger.error("Index builder not initialized")
                return False
            try:
                stats = self.index_builder.build_index()
                logger.info(
                    "SQLite index build complete: %s files, %s symbols",
                    stats.get("files"),
                    stats.get("symbols"),
                )
                self._is_loaded = True
                return True
            except SQLiteSchemaMismatchError:
                logger.warning("Schema mismatch detected; recreating database")
                self.store.clear()  # type: ignore[union-attr]
                stats = self.index_builder.build_index()
                logger.info(
                    "SQLite index rebuild after schema reset: %s files, %s symbols",
                    stats.get("files"),
                    stats.get("symbols"),
                )
                self._is_loaded = True
                return True
            except Exception as exc:  # pragma: no cover - defensive
                logger.error("Failed to build SQLite index: %s", exc)
                self._is_loaded = False
                return False
    def load_index(self) -> bool:
        """Validate that an index database exists and schema is current."""
        with self._lock:
            if not self.store:
                logger.error("Index store not initialized")
                return False
            try:
                self.store.initialize_schema()
                with self.store.connect() as conn:
                    metadata = self.store.get_metadata(conn, "index_metadata")
            except SQLiteSchemaMismatchError:
                logger.info("Schema mismatch on load; forcing rebuild on next build_index()")
                self._is_loaded = False
                return False
            except Exception as exc:  # pragma: no cover
                logger.error("Failed to load SQLite index: %s", exc)
                self._is_loaded = False
                return False
            self._is_loaded = metadata is not None
            return self._is_loaded
    def refresh_index(self) -> bool:
        """Force rebuild of the SQLite index."""
        with self._lock:
            logger.info("Refreshing SQLite deep index...")
            if self.build_index(force_rebuild=True):
                return self.load_index()
            return False
    def build_shallow_index(self) -> bool:
        """Build the shallow index file list using existing builder helper."""
        with self._lock:
            if not self.index_builder or not self.project_path or not self.shallow_index_path:
                logger.error("Index builder not initialized for shallow index")
                return False
            try:
                file_list = self.index_builder.build_shallow_file_list()
                with open(self.shallow_index_path, "w", encoding="utf-8") as handle:
                    json.dump(file_list, handle, ensure_ascii=False)
                self._shallow_file_list = file_list
                return True
            except Exception as exc:  # pragma: no cover
                logger.error("Failed to build shallow index: %s", exc)
                return False
    def load_shallow_index(self) -> bool:
        """Load shallow index from disk."""
        with self._lock:
            if not self.shallow_index_path or not os.path.exists(self.shallow_index_path):
                return False
            try:
                with open(self.shallow_index_path, "r", encoding="utf-8") as handle:
                    data = json.load(handle)
                if isinstance(data, list):
                    self._shallow_file_list = [_normalize_path(p) for p in data if isinstance(p, str)]
                    return True
            except Exception as exc:  # pragma: no cover
                logger.error("Failed to load shallow index: %s", exc)
            return False
    def find_files(self, pattern: str = "*") -> List[str]:
        """Find files from the shallow index using glob semantics."""
        with self._lock:
            if not isinstance(pattern, str):
                logger.error("Pattern must be a string, got %s", type(pattern))
                return []
            pattern = pattern.strip() or "*"
            norm_pattern = pattern.replace("\\\\", "/").replace("\\", "/")
            regex = _compile_glob_regex(norm_pattern)
            if self._shallow_file_list is None:
                if not self.load_shallow_index():
                    if self.build_shallow_index():
                        self.load_shallow_index()
            files = list(self._shallow_file_list or [])
            if norm_pattern == "*":
                return files
            return [f for f in files if regex.match(f)]
    def get_file_summary(self, file_path: str) -> Optional[Dict[str, Any]]:
        """Return summary information for a file from SQLite storage."""
        with self._lock:
            if not isinstance(file_path, str):
                logger.error("File path must be a string, got %s", type(file_path))
                return None
            if not self.store or not self._is_loaded:
                if not self.load_index():
                    return None
            normalized = _normalize_path(file_path)
            with self.store.connect() as conn:
                row = conn.execute(
                    """
                    SELECT id, language, line_count, imports, exports, docstring
                    FROM files WHERE path = ?
                    """,
                    (normalized,),
                ).fetchone()
                if not row:
                    logger.warning("File not found in index: %s", normalized)
                    return None
                symbol_rows = conn.execute(
                    """
                    SELECT type, line, signature, docstring, called_by, short_name
                    FROM symbols
                    WHERE file_id = ?
                    ORDER BY line ASC
                    """,
                    (row["id"],),
                ).fetchall()
            imports = _safe_json_loads(row["imports"])
            exports = _safe_json_loads(row["exports"])
            categorized = _categorize_symbols(symbol_rows)
            return {
                "file_path": normalized,
                "language": row["language"],
                "line_count": row["line_count"],
                "symbol_count": len(symbol_rows),
                "functions": categorized["functions"],
                "classes": categorized["classes"],
                "methods": categorized["methods"],
                "imports": imports,
                "exports": exports,
                "docstring": row["docstring"],
            }
    def get_index_stats(self) -> Dict[str, Any]:
        """Return basic statistics for the current index."""
        with self._lock:
            if not self.store:
                return {"status": "not_loaded"}
            try:
                with self.store.connect() as conn:
                    metadata = self.store.get_metadata(conn, "index_metadata")
            except SQLiteSchemaMismatchError:
                return {"status": "not_loaded"}
            if not metadata:
                return {"status": "not_loaded"}
            return {
                "status": "loaded" if self._is_loaded else "not_loaded",
                "indexed_files": metadata.get("indexed_files", 0),
                "total_symbols": metadata.get("total_symbols", 0),
                "symbol_types": metadata.get("symbol_types", {}),
                "languages": metadata.get("languages", []),
                "project_path": metadata.get("project_path"),
                "timestamp": metadata.get("timestamp"),
            }
    def cleanup(self) -> None:
        """Reset internal state."""
        with self._lock:
            self.project_path = None
            self.index_builder = None
            self.store = None
            self.temp_dir = None
            self.index_path = None
            self._shallow_file_list = None
            self._is_loaded = False
def _hash_project_path(project_path: str) -> str:
    import hashlib
    return hashlib.md5(project_path.encode()).hexdigest()[:12]
def _compile_glob_regex(pattern: str):
    i = 0
    out = []
    special = ".^$+{}[]|()"
    while i < len(pattern):
        c = pattern[i]
        if c == "*":
            if i + 1 < len(pattern) and pattern[i + 1] == "*":
                out.append(".*")
                i += 2
                continue
            out.append("[^/]*")
        elif c == "?":
            out.append("[^/]")
        elif c in special:
            out.append("\\" + c)
        else:
            out.append(c)
        i += 1
    return re.compile("^" + "".join(out) + "$")
def _normalize_path(path: str) -> str:
    result = path.replace("\\\\", "/").replace("\\", "/")
    if result.startswith("./"):
        result = result[2:]
    return result
def _safe_json_loads(value: Any) -> List[Any]:
    if not value:
        return []
    if isinstance(value, list):
        return value
    try:
        parsed = json.loads(value)
        return parsed if isinstance(parsed, list) else []
    except json.JSONDecodeError:
        return []
def _categorize_symbols(symbol_rows) -> Dict[str, List[Dict[str, Any]]]:
    functions: List[Dict[str, Any]] = []
    classes: List[Dict[str, Any]] = []
    methods: List[Dict[str, Any]] = []
    for row in symbol_rows:
        symbol_type = row["type"]
        called_by = _safe_json_loads(row["called_by"])
        info = {
            "name": row["short_name"],
            "called_by": called_by,
            "line": row["line"],
            "signature": row["signature"],
            "docstring": row["docstring"],
        }
        signature = row["signature"] or ""
        if signature.startswith("def ") and "::" in signature:
            methods.append(info)
        elif signature.startswith("def "):
            functions.append(info)
        elif signature.startswith("class ") or symbol_type == "class":
            classes.append(info)
        else:
            if symbol_type == "method":
                methods.append(info)
            elif symbol_type == "class":
                classes.append(info)
            else:
                functions.append(info)
    functions.sort(key=lambda item: item.get("line") or 0)
    classes.sort(key=lambda item: item.get("line") or 0)
    methods.sort(key=lambda item: item.get("line") or 0)
    return {
        "functions": functions,
        "classes": classes,
        "methods": methods,
    }