Skip to main content
Glama

Smart Code Search MCP Server

scsold.py82.6 kB
#!/usr/bin/env python3 """ Smart Code Search - Find code by meaning, not just text One file to rule them all! """ import ast import sqlite3 import json import subprocess import os import sys import re from pathlib import Path from datetime import datetime from typing import List, Tuple, Dict, Optional import hashlib import time import threading from queue import Queue # Check dependencies try: from sentence_transformers import SentenceTransformer import numpy as np except ImportError: print("Installing required dependencies...") subprocess.check_call([sys.executable, "-m", "pip", "install", "sentence-transformers", "numpy", "torch"]) from sentence_transformers import SentenceTransformer import numpy as np class SmartCodeSearch: def __init__(self, project_root=".", quiet=False): self.root = Path(project_root) self.db_path = self.root / ".claude-symbols" / "search.db" self.db_path.parent.mkdir(exist_ok=True) self.debug = "--debug" in sys.argv self.db_operations = 0 self.db_time = 0 # Initialize model (downloads ~90MB on first run) if not quiet: print("🤖 Initializing AI model...") try: model_name = 'all-MiniLM-L6-v2' local_model_path = self.root / model_name # Check if model exists locally if local_model_path.exists(): # Use local model self.model = SentenceTransformer(str(local_model_path)) if not quiet: print(f" ✅ Model loaded from local directory: {model_name}") else: # Model not found locally, need to download if not quiet: print(f" 📥 Model not found locally. Setting up Git LFS and downloading...") # Check if git-lfs is installed try: subprocess.run(["git", "lfs", "version"], check=True, capture_output=True) except (subprocess.CalledProcessError, FileNotFoundError): if not quiet: print(" 📦 Installing Git LFS...") try: # Try to install git-lfs subprocess.run(["sudo", "apt-get", "update"], check=True, capture_output=True) subprocess.run(["sudo", "apt-get", "install", "-y", "git-lfs"], check=True, capture_output=True) subprocess.run(["git", "lfs", "install"], check=True, capture_output=True) if not quiet: print(" ✅ Git LFS installed successfully") except subprocess.CalledProcessError: print("\n❌ Failed to install Git LFS automatically.") print(" Please install manually with: sudo apt-get install git-lfs") print(" Then run: git lfs install") raise # Clone the model repository try: if not quiet: print(f" 🔄 Cloning model from HuggingFace...") # Clone the repository subprocess.run([ "git", "clone", "https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2", str(local_model_path) ], check=True, capture_output=True) # Pull LFS files subprocess.run( ["git", "lfs", "pull"], cwd=str(local_model_path), check=True, capture_output=True ) if not quiet: print(" ✅ Model downloaded successfully") # Load the model from local directory self.model = SentenceTransformer(str(local_model_path)) except subprocess.CalledProcessError as e: print(f"\n❌ Failed to download model: {e}") print(" Please download manually:") print(" git clone https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2") print(" cd all-MiniLM-L6-v2/") print(" git lfs pull") raise # Test the model test_embedding = self.model.encode("test", convert_to_numpy=True) if test_embedding is None or len(test_embedding) == 0: raise Exception("Model failed to generate embeddings") except Exception as e: print(f"\n❌ Error initializing AI model: {e}") print(" The search tool requires the sentence-transformers library.") print(" Please ensure you have an internet connection for the initial download.") raise self.conn = sqlite3.connect(self.db_path) self._init_db() # Language patterns for different file types self.language_patterns = { 'python': { 'extensions': ['.py'], 'function': re.compile(r'^(?:async\s+)?def\s+(\w+)\s*\((.*?)\):', re.MULTILINE), 'class': re.compile(r'^class\s+(\w+)(?:\((.*?)\))?:', re.MULTILINE), 'method': re.compile(r'^\s+(?:async\s+)?def\s+(\w+)\s*\((.*?)\):', re.MULTILINE), 'variable': re.compile(r'^([A-Z_]+)\s*=\s*[^=]', re.MULTILINE), # Constants 'import': re.compile(r'^(?:from\s+(\S+)\s+)?import\s+(.+)$', re.MULTILINE), 'decorator': re.compile(r'^@(\w+(?:\.\w+)*)', re.MULTILINE), 'type_alias': re.compile(r'^(\w+)\s*=\s*(?:Union|Optional|List|Dict|Tuple|Type|Callable)\[', re.MULTILINE) }, 'javascript': { 'extensions': ['.js', '.jsx', '.mjs'], 'function': re.compile(r'(?:function\s+(\w+)|const\s+(\w+)\s*=\s*(?:async\s+)?(?:\([^)]*\)|[^=]+)\s*=>)', re.MULTILINE), 'class': re.compile(r'class\s+(\w+)(?:\s+extends\s+\w+)?', re.MULTILINE), 'method': re.compile(r'(?:async\s+)?(\w+)\s*\([^)]*\)\s*{', re.MULTILINE), 'variable': re.compile(r'^(?:export\s+)?(?:const|let|var)\s+(\w+)\s*=', re.MULTILINE), 'import': re.compile(r'^import\s+(?:\{[^}]+\}|\*\s+as\s+\w+|\w+)(?:\s*,\s*\{[^}]+\})?\s+from\s+[\'"]([^\'\"]+)[\'"]', re.MULTILINE), 'export': re.compile(r'^export\s+(?:default\s+)?(?:const|let|var|function|class)\s+(\w+)', re.MULTILINE), 'component': re.compile(r'^(?:export\s+)?(?:function|const)\s+([A-Z]\w+).*?(?:=.*?=>.*?(?:<|React\.|jsx)|:\s*(?:React\.)?FC)', re.MULTILINE) }, 'typescript': { 'extensions': ['.ts', '.tsx'], 'function': re.compile(r'(?:export\s+)?(?:async\s+)?(?:function\s+(\w+)|const\s+(\w+)\s*(?::\s*[^=]+)?\s*=\s*(?:async\s+)?(?:\([^)]*\)|[^=]+)\s*=>)', re.MULTILINE), 'class': re.compile(r'(?:export\s+)?class\s+(\w+)(?:\s+extends\s+\w+)?', re.MULTILINE), 'interface': re.compile(r'(?:export\s+)?interface\s+(\w+)', re.MULTILINE), 'type': re.compile(r'^(?:export\s+)?type\s+(\w+)\s*=', re.MULTILINE), 'enum': re.compile(r'^(?:export\s+)?enum\s+(\w+)', re.MULTILINE), 'variable': re.compile(r'^(?:export\s+)?(?:const|let|var)\s+(\w+)\s*(?::\s*[^=]+)?\s*=', re.MULTILINE), 'import': re.compile(r'^import\s+(?:type\s+)?(?:\{[^}]+\}|\*\s+as\s+\w+|\w+)(?:\s*,\s*\{[^}]+\})?\s+from\s+[\'"]([^\'\"]+)[\'"]', re.MULTILINE), 'component': re.compile(r'^(?:export\s+)?(?:function|const)\s+([A-Z]\w+).*?(?:=.*?=>.*?(?:<|React\.|jsx|tsx)|:\s*(?:React\.)?FC)', re.MULTILINE) }, 'java': { 'extensions': ['.java'], 'class': re.compile(r'(?:public\s+)?class\s+(\w+)', re.MULTILINE), 'method': re.compile(r'(?:public|private|protected)?\s*(?:static\s+)?(?:\w+\s+)?(\w+)\s*\([^)]*\)\s*(?:throws\s+[^{]+)?\s*{', re.MULTILINE), 'interface': re.compile(r'(?:public\s+)?interface\s+(\w+)', re.MULTILINE) }, 'cpp': { 'extensions': ['.cpp', '.cc', '.cxx', '.hpp', '.h', '.hxx'], 'function': re.compile(r'(?:\w+\s+)*(\w+)\s*\([^)]*\)\s*(?:const\s*)?(?:override\s*)?{', re.MULTILINE), 'class': re.compile(r'class\s+(\w+)(?:\s*:\s*(?:public|private|protected)\s+\w+)?', re.MULTILINE), 'struct': re.compile(r'struct\s+(\w+)', re.MULTILINE) }, 'go': { 'extensions': ['.go'], 'function': re.compile(r'func\s+(?:\(\s*\w+\s+[^)]+\)\s+)?(\w+)\s*\([^)]*\)', re.MULTILINE), 'struct': re.compile(r'type\s+(\w+)\s+struct', re.MULTILINE), 'interface': re.compile(r'type\s+(\w+)\s+interface', re.MULTILINE) }, 'markdown': { 'extensions': ['.md', '.markdown'], 'header': re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE), 'code_block': re.compile(r'^```(\w+)?\n(.*?)```', re.MULTILINE | re.DOTALL) }, 'text': { 'extensions': ['.txt', '.rst', '.adoc'], 'section': re.compile(r'^([A-Z][^.!?]*(?:[.!?]\s*|$))', re.MULTILINE) }, 'yaml': { 'extensions': ['.yml', '.yaml'], 'key': re.compile(r'^(\w+):\s*$', re.MULTILINE), 'section': re.compile(r'^(\w+):\s*(?:#.*)?$', re.MULTILINE), 'list_item': re.compile(r'^\s*-\s+(\w+)', re.MULTILINE) }, 'docker': { 'extensions': ['Dockerfile', '.dockerfile'], 'instruction': re.compile(r'^(FROM|RUN|CMD|EXPOSE|ENV|ADD|COPY|ENTRYPOINT|VOLUME|USER|WORKDIR|ARG|ONBUILD|LABEL)\s+', re.MULTILINE | re.IGNORECASE), 'stage': re.compile(r'^FROM\s+.*?\s+AS\s+(\w+)', re.MULTILINE | re.IGNORECASE) }, 'json': { 'extensions': ['.json', '.jsonc'], 'key': re.compile(r'"(\w+)":\s*["{[]', re.MULTILINE) }, 'toml': { 'extensions': ['.toml'], 'section': re.compile(r'^\[([^\]]+)\]', re.MULTILINE), 'key': re.compile(r'^(\w+)\s*=', re.MULTILINE) }, 'ini': { 'extensions': ['.ini', '.cfg', '.conf'], 'section': re.compile(r'^\[([^\]]+)\]', re.MULTILINE), 'key': re.compile(r'^(\w+)\s*=', re.MULTILINE) }, 'shell': { 'extensions': ['.sh', '.bash', '.zsh', '.fish'], 'function': re.compile(r'^(?:function\s+)?(\w+)\s*\(\s*\)', re.MULTILINE), 'variable': re.compile(r'^export\s+(\w+)=', re.MULTILINE) }, 'makefile': { 'extensions': ['Makefile', 'makefile', '.mk'], 'target': re.compile(r'^([a-zA-Z0-9_-]+):', re.MULTILINE), 'variable': re.compile(r'^(\w+)\s*[:?]?=', re.MULTILINE) }, 'sql': { 'extensions': ['.sql'], 'table': re.compile(r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(\w+)', re.MULTILINE | re.IGNORECASE), 'function': re.compile(r'CREATE\s+(?:OR\s+REPLACE\s+)?FUNCTION\s+(\w+)', re.MULTILINE | re.IGNORECASE), 'procedure': re.compile(r'CREATE\s+(?:OR\s+REPLACE\s+)?PROCEDURE\s+(\w+)', re.MULTILINE | re.IGNORECASE) }, 'terraform': { 'extensions': ['.tf', '.tfvars'], 'resource': re.compile(r'^resource\s+"[^"]+"\s+"([^"]+)"', re.MULTILINE), 'variable': re.compile(r'^variable\s+"([^"]+)"', re.MULTILINE), 'module': re.compile(r'^module\s+"([^"]+)"', re.MULTILINE) }, 'ruby': { 'extensions': ['.rb', '.rake', 'Gemfile'], 'class': re.compile(r'class\s+(\w+)', re.MULTILINE), 'method': re.compile(r'def\s+(\w+)', re.MULTILINE), 'module': re.compile(r'module\s+(\w+)', re.MULTILINE) }, 'rust': { 'extensions': ['.rs'], 'function': re.compile(r'fn\s+(\w+)', re.MULTILINE), 'struct': re.compile(r'struct\s+(\w+)', re.MULTILINE), 'enum': re.compile(r'enum\s+(\w+)', re.MULTILINE), 'impl': re.compile(r'impl(?:\s+\w+)?\s+for\s+(\w+)|impl\s+(\w+)', re.MULTILINE) }, 'csharp': { 'extensions': ['.cs', '.csx'], 'class': re.compile(r'(?:public\s+|private\s+|internal\s+)?class\s+(\w+)', re.MULTILINE), 'method': re.compile(r'(?:public\s+|private\s+|protected\s+|internal\s+)?(?:static\s+)?(?:async\s+)?(?:\w+\s+)?(\w+)\s*\([^)]*\)\s*{', re.MULTILINE), 'interface': re.compile(r'interface\s+(\w+)', re.MULTILINE) }, 'swift': { 'extensions': ['.swift'], 'class': re.compile(r'class\s+(\w+)', re.MULTILINE), 'func': re.compile(r'func\s+(\w+)', re.MULTILINE), 'struct': re.compile(r'struct\s+(\w+)', re.MULTILINE), 'enum': re.compile(r'enum\s+(\w+)', re.MULTILINE) }, 'kotlin': { 'extensions': ['.kt', '.kts'], 'class': re.compile(r'class\s+(\w+)', re.MULTILINE), 'fun': re.compile(r'fun\s+(\w+)', re.MULTILINE), 'interface': re.compile(r'interface\s+(\w+)', re.MULTILINE) }, 'scala': { 'extensions': ['.scala'], 'class': re.compile(r'class\s+(\w+)', re.MULTILINE), 'def': re.compile(r'def\s+(\w+)', re.MULTILINE), 'object': re.compile(r'object\s+(\w+)', re.MULTILINE), 'trait': re.compile(r'trait\s+(\w+)', re.MULTILINE) }, 'php': { 'extensions': ['.php'], 'class': re.compile(r'class\s+(\w+)', re.MULTILINE), 'function': re.compile(r'function\s+(\w+)', re.MULTILINE), 'interface': re.compile(r'interface\s+(\w+)', re.MULTILINE) }, 'lua': { 'extensions': ['.lua'], 'function': re.compile(r'function\s+(\w+)|(\w+)\s*=\s*function', re.MULTILINE) }, 'r': { 'extensions': ['.r', '.R'], 'function': re.compile(r'(\w+)\s*<-\s*function', re.MULTILINE) }, 'julia': { 'extensions': ['.jl'], 'function': re.compile(r'function\s+(\w+)', re.MULTILINE), 'struct': re.compile(r'struct\s+(\w+)', re.MULTILINE) }, 'dart': { 'extensions': ['.dart'], 'class': re.compile(r'class\s+(\w+)', re.MULTILINE), 'function': re.compile(r'(?:void|int|String|bool|double|var|dynamic)\s+(\w+)\s*\(', re.MULTILINE) }, 'elixir': { 'extensions': ['.ex', '.exs'], 'defmodule': re.compile(r'defmodule\s+([\w.]+)', re.MULTILINE), 'def': re.compile(r'def\s+(\w+)', re.MULTILINE), 'defp': re.compile(r'defp\s+(\w+)', re.MULTILINE) }, 'clojure': { 'extensions': ['.clj', '.cljs'], 'defn': re.compile(r'\(defn\s+(\S+)', re.MULTILINE), 'def': re.compile(r'\(def\s+(\S+)', re.MULTILINE) }, 'haskell': { 'extensions': ['.hs'], 'function': re.compile(r'^(\w+)\s*::', re.MULTILINE) }, 'perl': { 'extensions': ['.pl', '.pm'], 'sub': re.compile(r'sub\s+(\w+)', re.MULTILINE), 'package': re.compile(r'package\s+([\w:]+)', re.MULTILINE) }, 'objc': { 'extensions': ['.m', '.mm'], 'interface': re.compile(r'@interface\s+(\w+)', re.MULTILINE), 'implementation': re.compile(r'@implementation\s+(\w+)', re.MULTILINE), 'method': re.compile(r'[-+]\s*\([^)]+\)\s*(\w+)', re.MULTILINE) }, 'vue': { 'extensions': ['.vue'], 'component': re.compile(r'name:\s*[\'"](\w+)[\'"]', re.MULTILINE), 'script': re.compile(r'<script[^>]*>', re.MULTILINE), 'template': re.compile(r'<template[^>]*>', re.MULTILINE) }, 'svelte': { 'extensions': ['.svelte'], 'script': re.compile(r'<script[^>]*>', re.MULTILINE) }, 'html': { 'extensions': ['.html', '.htm'], 'tag': re.compile(r'<(\w+)[^>]*id=[\'"]([^\'"]*)[\'"]', re.MULTILINE) }, 'css': { 'extensions': ['.css', '.scss', '.sass', '.less'], 'class': re.compile(r'\.(\w[\w-]*)\s*\{', re.MULTILINE), 'id': re.compile(r'#(\w[\w-]*)\s*\{', re.MULTILINE) }, 'xml': { 'extensions': ['.xml', '.xsl', '.xslt'], 'element': re.compile(r'<(\w+)[^>]*>', re.MULTILINE) }, 'c': { 'extensions': ['.c', '.h'], 'function': re.compile(r'(?:\w+\s+)*(\w+)\s*\([^)]*\)\s*{', re.MULTILINE), 'struct': re.compile(r'struct\s+(\w+)', re.MULTILINE) }, 'properties': { 'extensions': ['.properties'], 'property': re.compile(r'^(\w[\w.]*)\s*=', re.MULTILINE) }, 'env': { 'extensions': ['.env'], 'variable': re.compile(r'^(\w+)=', re.MULTILINE) }, 'gradle': { 'extensions': ['.gradle', '.gradle.kts', 'build.gradle', 'settings.gradle'], 'task': re.compile(r'task\s+(\w+)', re.MULTILINE), 'dependency': re.compile(r'dependencies\s*\{', re.MULTILINE) }, 'maven': { 'extensions': ['pom.xml'], 'artifactId': re.compile(r'<artifactId>([^<]+)</artifactId>', re.MULTILINE), 'groupId': re.compile(r'<groupId>([^<]+)</groupId>', re.MULTILINE) }, 'nginx': { 'extensions': ['nginx.conf', '.nginx'], 'server': re.compile(r'server\s*\{', re.MULTILINE), 'location': re.compile(r'location\s+([^\s{]+)', re.MULTILINE) }, 'apache': { 'extensions': ['.htaccess', 'httpd.conf'], 'directive': re.compile(r'^(\w+)\s+', re.MULTILINE) }, 'ansible': { 'extensions': ['playbook.yml', 'ansible.cfg'], 'task': re.compile(r'-\s+name:\s+(.+)', re.MULTILINE), 'role': re.compile(r'role:\s+(\w+)', re.MULTILINE) }, 'vagrant': { 'extensions': ['Vagrantfile'], 'config': re.compile(r'config\.vm\.(\w+)', re.MULTILINE) }, 'csv': { 'extensions': ['.csv', '.tsv'], 'header': re.compile(r'^([^\n]+)', re.MULTILINE) } } # Load config if exists self.config = self._load_config() def _db_execute(self, query, params=None): """Execute database query with timing""" start = time.time() if params: result = self.conn.execute(query, params) else: result = self.conn.execute(query) elapsed = time.time() - start self.db_operations += 1 self.db_time += elapsed if self.debug and elapsed > 0.1: # Log slow queries print(f"\n⚠️ Slow DB query ({elapsed:.2f}s): {query[:50]}...") return result def _init_db(self): """Create tables for search""" self.conn.executescript(''' CREATE TABLE IF NOT EXISTS symbols ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, type TEXT NOT NULL, file_path TEXT NOT NULL, line_num INTEGER NOT NULL, end_line INTEGER, signature TEXT, docstring TEXT, code_context TEXT, embedding BLOB, last_updated TEXT, access_count INTEGER DEFAULT 0, language TEXT, file_type TEXT, UNIQUE(file_path, name, line_num) ); CREATE TABLE IF NOT EXISTS search_history ( id INTEGER PRIMARY KEY, query TEXT, timestamp TEXT, results_clicked TEXT ); CREATE INDEX IF NOT EXISTS idx_symbols_name ON symbols(name); CREATE INDEX IF NOT EXISTS idx_symbols_type ON symbols(type); CREATE INDEX IF NOT EXISTS idx_symbols_file ON symbols(file_path); ''') self.conn.commit() def _load_config(self): """Load project-specific configuration""" config_path = self.root / ".scs.json" default_config = { "include": [ "**/*.py", "**/*.js", "**/*.jsx", "**/*.ts", "**/*.tsx", "**/*.java", "**/*.cpp", "**/*.cc", "**/*.h", "**/*.hpp", "**/*.go", "**/*.rs", "**/*.md", "**/*.txt", "**/*.yml", "**/*.yaml", "**/Dockerfile", "**/*.dockerfile", "**/*.json", "**/*.toml", "**/*.ini", "**/*.cfg", "**/*.conf", "**/*.sh", "**/*.bash", "**/Makefile", "**/*.mk", "**/*.sql", "**/*.tf", "**/*.tfvars" ], "exclude": [ "**/node_modules/**", "**/.venv/**", "**/venv/**", "**/__pycache__/**", "**/dist/**", "**/build/**", "**/.git/**", "**/target/**", "**/*.min.js", "**/*.min.css", "**/package-lock.json", "**/yarn.lock" ], "minScore": 0.2, "languages": [ "python", "javascript", "typescript", "java", "cpp", "go", "markdown", "yaml", "docker", "json", "toml", "ini", "shell", "makefile", "sql", "terraform" ] } if config_path.exists(): try: with open(config_path, 'r') as f: user_config = json.load(f) default_config.update(user_config) except Exception as e: print(f"Warning: Could not load config: {e}") return default_config def _fix_pattern(self, pattern): """Fix invalid ** patterns to be valid glob patterns""" if pattern.startswith('**') and not pattern.startswith('**/'): # Handle cases like **.py -> **/*.py if pattern[2:].startswith('.'): return '**/*' + pattern[2:] else: # Handle cases like **test*.py -> **/test*.py return '**/' + pattern[2:] return pattern def index_project(self, file_pattern=None, force_reindex=False, incremental=True): """Index all supported files in the project""" # Show initial message mode = 'incremental' if incremental and not force_reindex else 'full' print(f"\n🔍 Indexing {self.root} ({mode} mode)") # Collect all files based on config or pattern start_time = time.time() all_files = [] if file_pattern: # Use provided pattern raw_patterns = [file_pattern] if isinstance(file_pattern, str) else file_pattern # Apply pattern fixing to direct patterns too patterns = [self._fix_pattern(p) for p in raw_patterns] else: # Use config patterns config_patterns = self.config.get('include', ['**/*.py']) # Apply pattern fixing to all config patterns patterns = [self._fix_pattern(p) for p in config_patterns] # Gather files using single directory walk # Prepare exclude directory names for fast lookup exclude_dirs = { 'node_modules', '.git', '__pycache__', 'venv', '.venv', 'dist', 'build', '.next', 'coverage', '.pytest_cache', 'tmp', 'temp', '.idea', '.vscode', 'target' } # Get all exclude patterns once exclude_patterns = self.config.get('exclude', []) # Single walk through directory tree all_files = [] scanned_dirs = 0 scanned_files = 0 for root, dirs, files in os.walk(self.root): # Early directory exclusion - modify dirs in-place to skip them dirs[:] = [d for d in dirs if d not in exclude_dirs] scanned_dirs += 1 # Progress update every 100 directories if scanned_dirs % 100 == 0: print(f"\rScanning: {scanned_dirs} dirs, {len(all_files)} files found...", end='', flush=True) # Convert root to Path for easier pattern matching root_path = Path(root) # Check files in this directory for file in files: scanned_files += 1 file_path = root_path / file # Check if file matches any include pattern for pattern in patterns: # Convert pattern to be relative to current root if needed if file_path.match(pattern): all_files.append(file_path) break # Clear progress line print(f"\r" + " " * 60 + "\r", end='') # Apply exclusions more efficiently filtered_files = [] excluded_count = 0 if all_files: # Pre-compile simple exclusions for faster matching simple_excludes = set() pattern_excludes = [] for exclude in exclude_patterns: if '*' not in exclude and '?' not in exclude: # Simple filename exclusion simple_excludes.add(exclude) else: # Pattern-based exclusion pattern_excludes.append(exclude) # Filter files for i, file_path in enumerate(all_files): # Check simple excludes first (fast) if file_path.name in simple_excludes: excluded_count += 1 continue # Check pattern excludes excluded = False for exclude_pattern in pattern_excludes: if file_path.match(exclude_pattern): excluded = True excluded_count += 1 break if not excluded: filtered_files.append(file_path) files = filtered_files total_files = len(files) if total_files == 0: print(f"\n❌ No files found to index!") print(f" • Check your include patterns: {patterns[:3]}...") print(f" • Check your exclude patterns: {exclude_patterns[:3]}...") print(f" • Current directory: {self.root}") return print(f"Found {len(all_files)} files, {total_files} after filtering ({excluded_count} excluded)") # Initialize tracking variables indexed = 0 skipped = 0 errors = 0 error_details = [] # Hang detection setup last_progress_time = time.time() hang_detected = False current_file = None def check_for_hang(): nonlocal hang_detected while indexed + skipped + errors < total_files and not hang_detected: time.sleep(5) # Check every 5 seconds if current_file and time.time() - last_progress_time > 10: hang_detected = True print(f"\n\n⚠️ Indexing appears to be hanging on: {current_file}") print(f" Consider checking if the file is too large or has encoding issues.") # Start hang detection thread hang_thread = threading.Thread(target=check_for_hang, daemon=True) hang_thread.start() print(f"\nIndexing {total_files} files...") for i, file_path in enumerate(files): current_file = file_path last_progress_time = time.time() # Calculate progress and time estimates progress = (i + 1) / total_files elapsed = time.time() - start_time if i > 0: rate = i / elapsed remaining = (total_files - i) / rate if rate > 0 else 0 eta = f"ETA: {int(remaining//60)}m {int(remaining%60)}s" else: eta = "Calculating..." # Enhanced progress bar bar_length = 40 filled = int(bar_length * progress) bar = '█' * filled + '░' * (bar_length - filled) percent = int(progress * 100) # Show progress with file name and stats file_display = file_path.name[:25] + '...' if len(file_path.name) > 25 else file_path.name stats = f"[✓{indexed} ↪{skipped} ✗{errors}]" print(f"\r[{bar}] {percent:3d}% - {file_display:<30} {stats} {eta}", end='', flush=True) try: if incremental and not force_reindex: # Check if file needs indexing (modified since last index) if self._should_index_file(file_path, force_reindex): self._index_file(file_path) indexed += 1 else: skipped += 1 else: # Force index all files self._index_file(file_path) indexed += 1 except Exception as e: errors += 1 error_details.append({ 'file': str(file_path), 'error': str(e), 'type': type(e).__name__ }) if "--debug" in sys.argv: print(f"\n❌ Error in {file_path}: {e}") # Clear progress bar print(f"\r" + " " * 100 + "\r", end='') # Final statistics total_time = time.time() - start_time print(f"\n✅ Indexed {indexed} files in {total_time:.1f}s ({skipped} unchanged, {errors} errors)") # Show error details if any if errors > 0 and error_details: print(f"\n⚠️ Errors encountered:") # Group errors by type error_types = {} for err in error_details[:10]: # Show first 10 errors err_type = err['type'] if err_type not in error_types: error_types[err_type] = [] error_types[err_type].append(err['file']) for err_type, files in error_types.items(): print(f" • {err_type}: {len(files)} file(s)") for f in files[:3]: # Show first 3 files per error type print(f" - {f}") if len(files) > 3: print(f" ... and {len(files) - 3} more") # Show compact stats by language cursor = self.conn.execute(""" SELECT language, COUNT(DISTINCT type), COUNT(*) FROM symbols GROUP BY language ORDER BY COUNT(*) DESC """) print(f"\nSymbols indexed:") for language, type_count, symbol_count in cursor: print(f" • {language}: {symbol_count} symbols in {type_count} types") def _should_index_file(self, file_path: Path, force: bool) -> bool: """Check if file needs indexing""" if force: return True # Check if file was modified since last index try: mtime = file_path.stat().st_mtime cursor = self.conn.execute( "SELECT MAX(last_updated) FROM symbols WHERE file_path = ?", (str(file_path),) ) last_indexed = cursor.fetchone()[0] if last_indexed: last_indexed_time = datetime.fromisoformat(last_indexed).timestamp() return mtime > last_indexed_time except Exception: pass return True def _index_file(self, file_path: Path): """Index a single file based on its type""" # Check file size first try: file_size = file_path.stat().st_size max_size = 10 * 1024 * 1024 # 10MB limit if file_size > max_size: if "--debug" in sys.argv: print(f"\n⚠️ Skipping {file_path}: File too large ({file_size / 1024 / 1024:.1f}MB)") raise Exception(f"File too large: {file_size / 1024 / 1024:.1f}MB (max: 10MB)") except OSError as e: raise Exception(f"Cannot access file: {e}") # Detect language based on file extension suffix = file_path.suffix.lower() language = None for lang, info in self.language_patterns.items(): if suffix in info['extensions']: language = lang break if not language: return # Skip unsupported files try: # Try different encodings content = None encodings = ['utf-8', 'latin-1', 'cp1252', 'ascii'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() break except UnicodeDecodeError: continue if content is None: raise Exception("Could not decode file with any supported encoding") # Clear existing symbols for this file (for re-indexing) self.conn.execute("DELETE FROM symbols WHERE file_path = ?", (str(file_path),)) if language == 'python': self._index_python_file(file_path, content) elif language in ['javascript', 'typescript', 'java', 'cpp', 'go', 'shell', 'sql', 'ruby', 'rust', 'csharp', 'swift', 'kotlin', 'scala', 'php', 'lua', 'r', 'julia', 'dart', 'elixir', 'clojure', 'haskell', 'perl', 'objc', 'c']: self._index_generic_language(file_path, content, language) elif language in ['vue', 'svelte', 'html', 'css']: self._index_web_file(file_path, content, language) elif language in ['markdown', 'text']: self._index_document(file_path, content, language) elif language in ['yaml', 'docker', 'json', 'toml', 'ini', 'makefile', 'terraform', 'properties', 'env', 'gradle', 'maven', 'nginx', 'apache', 'ansible', 'vagrant', 'csv']: self._index_config_file(file_path, content, language) elif language == 'xml': self._index_xml_file(file_path, content) except UnicodeDecodeError as e: raise Exception(f"Encoding error: {e}") except MemoryError: raise Exception("Out of memory while processing file") except Exception as e: # Re-raise with more context raise Exception(f"{type(e).__name__}: {str(e)}") def _index_python_file(self, file_path: Path, content: str): """Index Python file using AST""" try: tree = ast.parse(content, filename=str(file_path)) for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): self._index_python_symbol(node, file_path, content, 'python') except: # Fall back to regex if AST fails self._index_generic_language(file_path, content, 'python') def _index_generic_language(self, file_path: Path, content: str, language: str): """Index non-Python languages using regex""" patterns = self.language_patterns.get(language, {}) lines = content.split('\n') # Track which lines have been indexed to avoid duplicates indexed_lines = set() for pattern_type, pattern in patterns.items(): if pattern_type == 'extensions': continue for match in pattern.finditer(content): # Get the line number line_num = content[:match.start()].count('\n') + 1 if line_num in indexed_lines: continue indexed_lines.add(line_num) # Extract name (handle multiple capture groups) name = None for group in match.groups(): if group: name = group break if not name: continue # Get context context_start = max(0, line_num - 3) context_end = min(len(lines), line_num + 5) code_context = '\n'.join(lines[context_start:context_end]) # Simple signature extraction signature = "" if pattern_type in ['function', 'method']: # Try to extract parameters line = lines[line_num - 1] if line_num > 0 else "" sig_match = re.search(rf'{name}\s*\((.*?)\)', line) if sig_match: signature = f"({sig_match.group(1)})" # Create embedding with error handling try: # Build richer embedding text based on symbol type if pattern_type == 'import': # For imports, include the module being imported embedding_text = f"import {name} from module in {language}" elif pattern_type in ['variable', 'type', 'type_alias']: # Include more context for variables and types embedding_text = f"{name} {pattern_type} definition {code_context[:100]} {language}" elif pattern_type == 'component': # Special handling for React components embedding_text = f"{name} React component {signature} {language}" elif pattern_type == 'decorator': # Include decorator context embedding_text = f"{name} decorator {code_context[:50]} {language}" else: # Default for functions, classes, methods embedding_text = f"{name} {pattern_type} {signature} {language}" embedding = self.model.encode(embedding_text, convert_to_numpy=True) if embedding is None or len(embedding) == 0: raise Exception("Failed to generate embedding") # Store in database self.conn.execute(''' INSERT OR REPLACE INTO symbols (name, type, file_path, line_num, end_line, signature, docstring, code_context, embedding, last_updated, language, file_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( name, pattern_type, str(file_path), line_num, line_num + 5, signature, "", code_context, embedding.tobytes(), datetime.now().isoformat(), language, 'code' )) except Exception as e: if "--debug" in sys.argv: print(f"\n⚠️ Embedding error for {name} in {file_path}: {e}") # Store without embedding self.conn.execute(''' INSERT OR REPLACE INTO symbols (name, type, file_path, line_num, end_line, signature, docstring, code_context, embedding, last_updated, language, file_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( name, pattern_type, str(file_path), line_num, line_num + 5, signature, "", code_context, None, datetime.now().isoformat(), language, 'code' )) self.conn.commit() def _index_document(self, file_path: Path, content: str, language: str): """Index document files (markdown, text)""" patterns = self.language_patterns.get(language, {}) lines = content.split('\n') if language == 'markdown': # Index headers header_pattern = patterns.get('header') if header_pattern: for match in header_pattern.finditer(content): level = len(match.group(1)) # Number of # title = match.group(2).strip() line_num = content[:match.start()].count('\n') + 1 # Get section content (until next header of same or higher level) section_lines = [] for i in range(line_num, len(lines)): line = lines[i] if re.match(rf'^#{{{1},{level}}}\s', line) and i > line_num: break section_lines.append(line) section_content = '\n'.join(section_lines[:10]) # First 10 lines # Create embedding embedding_text = f"{title} section {section_content}" embedding = self.model.encode(embedding_text, convert_to_numpy=True) # Store self.conn.execute(''' INSERT OR REPLACE INTO symbols (name, type, file_path, line_num, end_line, signature, docstring, code_context, embedding, last_updated, language, file_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( title, f"h{level}", str(file_path), line_num, line_num + len(section_lines), "", "", section_content, embedding.tobytes(), datetime.now().isoformat(), language, 'document' )) self.conn.commit() def _index_config_file(self, file_path: Path, content: str, language: str): """Index configuration files (YAML, Docker, JSON, etc)""" patterns = self.language_patterns.get(language, {}) lines = content.split('\n') # Special handling for Dockerfile if language == 'docker' and file_path.name == 'Dockerfile': # Index the entire Dockerfile as one entity embedding_text = f"Dockerfile configuration {content[:500]}" embedding = self.model.encode(embedding_text, convert_to_numpy=True) self.conn.execute(''' INSERT OR REPLACE INTO symbols (name, type, file_path, line_num, end_line, signature, docstring, code_context, embedding, last_updated, language, file_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( file_path.name, 'dockerfile', str(file_path), 1, len(lines), "", "", content[:1000], embedding.tobytes(), datetime.now().isoformat(), language, 'config' )) # Track which lines have been indexed indexed_lines = set() for pattern_type, pattern in patterns.items(): if pattern_type == 'extensions': continue for match in pattern.finditer(content): line_num = content[:match.start()].count('\n') + 1 if line_num in indexed_lines: continue indexed_lines.add(line_num) # Extract name name = match.group(1) if match.groups() else match.group(0) # Get context (more lines for config files) context_start = max(0, line_num - 2) context_end = min(len(lines), line_num + 10) code_context = '\n'.join(lines[context_start:context_end]) # For YAML/JSON, try to capture the value too if language in ['yaml', 'json', 'toml', 'ini']: # Look for multi-line values value_lines = [] indent_level = len(lines[line_num - 1]) - len(lines[line_num - 1].lstrip()) for i in range(line_num, min(len(lines), line_num + 20)): line = lines[i] if line.strip() and len(line) - len(line.lstrip()) <= indent_level: break value_lines.append(line) if value_lines: code_context = '\n'.join([lines[line_num - 1]] + value_lines[:10]) # Create embedding with context embedding_text = f"{name} {pattern_type} {language} {code_context[:200]}" embedding = self.model.encode(embedding_text, convert_to_numpy=True) # Store in database self.conn.execute(''' INSERT OR REPLACE INTO symbols (name, type, file_path, line_num, end_line, signature, docstring, code_context, embedding, last_updated, language, file_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( name, pattern_type, str(file_path), line_num, line_num + len(value_lines) if 'value_lines' in locals() else line_num + 3, "", "", code_context, embedding.tobytes(), datetime.now().isoformat(), language, 'config' )) self.conn.commit() def _index_web_file(self, file_path: Path, content: str, language: str): """Index web files (Vue, Svelte, HTML, CSS)""" patterns = self.language_patterns.get(language, {}) lines = content.split('\n') # For Vue/Svelte components, try to extract component name if language in ['vue', 'svelte']: # Look for component name in various ways name_match = patterns.get('component', re.compile(r'')).search(content) if name_match: component_name = name_match.group(1) else: # Use filename as component name component_name = file_path.stem # Index the whole component embedding_text = f"{language} component {component_name} {content[:500]}" embedding = self.model.encode(embedding_text, convert_to_numpy=True) self.conn.execute(''' INSERT OR REPLACE INTO symbols (name, type, file_path, line_num, end_line, signature, docstring, code_context, embedding, last_updated, language, file_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( component_name, 'component', str(file_path), 1, len(lines), "", "", content[:1000], embedding.tobytes(), datetime.now().isoformat(), language, 'code' )) # For CSS files, just index the file as a whole instead of every selector elif language == 'css': # Extract a summary of main classes/IDs (first 20 of each) classes = [] ids = [] for pattern_type, pattern in patterns.items(): if pattern_type == 'class': classes = [m.group(1) for m in pattern.finditer(content)][:20] elif pattern_type == 'id': ids = [m.group(1) for m in pattern.finditer(content)][:20] summary = f"CSS file with classes: {', '.join(classes[:10])}" if ids: summary += f" and IDs: {', '.join(ids[:10])}" # Index the whole CSS file embedding_text = f"CSS stylesheet {file_path.name} {summary} {content[:500]}" embedding = self.model.encode(embedding_text, convert_to_numpy=True) self.conn.execute(''' INSERT OR REPLACE INTO symbols (name, type, file_path, line_num, end_line, signature, docstring, code_context, embedding, last_updated, language, file_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( file_path.name, 'stylesheet', str(file_path), 1, len(lines), "", summary, content[:1000], embedding.tobytes(), datetime.now().isoformat(), language, 'code' )) # For HTML, index IDs but limit to reasonable number elif language == 'html': indexed_count = 0 max_items = 50 # Limit to 50 elements with IDs for pattern_type, pattern in patterns.items(): if pattern_type == 'extensions': continue for match in pattern.finditer(content): if indexed_count >= max_items: break line_num = content[:match.start()].count('\n') + 1 if pattern_type == 'tag' and len(match.groups()) >= 2: tag_name = match.group(1) id_value = match.group(2) name = f"{tag_name}#{id_value}" # Get context start_line = max(0, line_num - 2) end_line = min(len(lines), line_num + 3) code_context = '\n'.join(lines[start_line-1:end_line]) # Create embedding embedding_text = f"HTML element {name} {code_context}" embedding = self.model.encode(embedding_text, convert_to_numpy=True) self.conn.execute(''' INSERT OR REPLACE INTO symbols (name, type, file_path, line_num, end_line, signature, docstring, code_context, embedding, last_updated, language, file_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( name, 'element', str(file_path), line_num, line_num + 1, "", "", code_context, embedding.tobytes(), datetime.now().isoformat(), language, 'code' )) indexed_count += 1 self.conn.commit() def _index_xml_file(self, file_path: Path, content: str): """Index XML files efficiently""" lines = content.split('\n') # For XML files, just index the file as a whole # Extract root element and some structure info import re root_match = re.search(r'<(\w+)[^>]*>', content[:1000]) root_element = root_match.group(1) if root_match else 'xml' # Count main element types (limit to first 5000 chars for performance) element_types = set() for match in re.finditer(r'<(\w+)[^>]*>', content[:5000]): element_types.add(match.group(1)) if len(element_types) >= 10: # Limit to first 10 unique elements break summary = f"XML file with root <{root_element}> containing elements: {', '.join(list(element_types)[:10])}" # Index the whole XML file embedding_text = f"XML configuration {file_path.name} {summary} {content[:500]}" embedding = self.model.encode(embedding_text, convert_to_numpy=True) self.conn.execute(''' INSERT OR REPLACE INTO symbols (name, type, file_path, line_num, end_line, signature, docstring, code_context, embedding, last_updated, language, file_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( file_path.name, 'xml_file', str(file_path), 1, len(lines), "", summary, content[:1000], embedding.tobytes(), datetime.now().isoformat(), 'xml', 'config' )) self.conn.commit() def _index_python_symbol(self, node, file_path: Path, file_content: str, language: str): """Index a single Python symbol (function or class)""" # Basic info name = node.name sym_type = 'class' if isinstance(node, ast.ClassDef) else 'function' line_num = node.lineno end_line = node.end_lineno or line_num # Get docstring docstring = ast.get_docstring(node) or "" # Get signature for functions signature = "" if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): signature = self._get_function_signature(node) # Get surrounding code context (few lines before/after) lines = file_content.split('\n') context_start = max(0, line_num - 3) context_end = min(len(lines), end_line + 2) code_context = '\n'.join(lines[context_start:context_end]) # Create embedding from all available info with error handling try: embedding_text = f"{name} {sym_type} {signature} {docstring}" embedding = self.model.encode(embedding_text, convert_to_numpy=True) if embedding is None or len(embedding) == 0: raise Exception("Failed to generate embedding") embedding_bytes = embedding.tobytes() except Exception as e: if "--debug" in sys.argv: print(f"\n⚠️ Embedding error for {name} in {file_path}: {e}") embedding_bytes = None # Store in database self.conn.execute(''' INSERT OR REPLACE INTO symbols (name, type, file_path, line_num, end_line, signature, docstring, code_context, embedding, last_updated, language, file_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( name, sym_type, str(file_path), line_num, end_line, signature, docstring, code_context, embedding_bytes, datetime.now().isoformat(), language, 'code' )) self.conn.commit() def _get_function_signature(self, node): """Extract function signature""" args = [] for arg in node.args.args: args.append(arg.arg) # Add *args and **kwargs if present if node.args.vararg: args.append(f"*{node.args.vararg.arg}") if node.args.kwarg: args.append(f"**{node.args.kwarg.arg}") return f"({', '.join(args)})" def search(self, query: str, limit: int = 10, search_type: str = "hybrid", file_type: str = None, language_filter: str = None) -> List[Dict]: """ Search for symbols by meaning search_type: 'semantic', 'text', or 'hybrid' file_type: 'code', 'document', or None for all language_filter: specific language to filter by """ results = [] # Log search self._log_search(query) if search_type in ["semantic", "hybrid"]: # Semantic search using embeddings semantic_results = self._semantic_search(query, limit * 2, file_type, language_filter) results.extend(semantic_results) if search_type in ["text", "hybrid"]: # Traditional text search text_results = self._text_search(query, limit, file_type, language_filter) results.extend(text_results) # Deduplicate and sort by score seen = set() unique_results = [] for r in results: key = (r['file_path'], r['name'], r['line_num']) if key not in seen: seen.add(key) unique_results.append(r) # Sort by score and filter by minimum score unique_results.sort(key=lambda x: x['score'], reverse=True) min_score = self.config.get('minScore', 0.2) filtered_results = [r for r in unique_results if r['score'] >= min_score] return filtered_results[:limit] def _semantic_search(self, query: str, limit: int, file_type: str = None, language_filter: str = None) -> List[Dict]: """Search using AI embeddings""" # Encode query query_embedding = self.model.encode(query, convert_to_numpy=True) # Build SQL with filters sql = ''' SELECT id, name, type, file_path, line_num, signature, docstring, embedding, language, file_type FROM symbols WHERE embedding IS NOT NULL ''' params = [] if file_type: sql += " AND file_type = ?" params.append(file_type) if language_filter: sql += " AND language = ?" params.append(language_filter) cursor = self.conn.execute(sql, params) results = [] for row in cursor: sym_id, name, sym_type, file_path, line_num, sig, doc, emb_bytes, lang, ftype = row # Calculate similarity embedding = np.frombuffer(emb_bytes, dtype=np.float32) similarity = np.dot(query_embedding, embedding) / ( np.linalg.norm(query_embedding) * np.linalg.norm(embedding) ) # Scale semantic scores to be comparable with text scores # Cosine similarity is 0-1, scale to 0-10 range scaled_score = float(similarity) * 10 results.append({ 'id': sym_id, 'name': name, 'type': sym_type, 'file_path': file_path, 'line_num': line_num, 'signature': sig, 'docstring': doc, 'score': scaled_score, 'match_type': 'semantic', 'language': lang, 'file_type': ftype }) # Sort by similarity results.sort(key=lambda x: x['score'], reverse=True) return results[:limit] def _text_search(self, query: str, limit: int, file_type: str = None, language_filter: str = None) -> List[Dict]: """Traditional text matching with improved scoring""" query_lower = query.lower() words = query_lower.split() # Build SQL with filters sql = ''' SELECT id, name, type, file_path, line_num, signature, docstring, code_context, language, file_type FROM symbols WHERE (lower(name) LIKE ? OR lower(docstring) LIKE ? OR lower(code_context) LIKE ?) ''' base_params = [] if file_type: sql += " AND file_type = ?" base_params.append(file_type) if language_filter: sql += " AND language = ?" base_params.append(language_filter) # Get all results that match any word all_matches = {} for word in words: pattern = f"%{word}%" params = [pattern, pattern, pattern] + base_params cursor = self.conn.execute(sql, params) for row in cursor: sym_id, name, sym_type, file_path, line_num, sig, doc, context, lang, ftype = row key = (sym_id, name, file_path, line_num) if key not in all_matches: all_matches[key] = { 'id': sym_id, 'name': name, 'type': sym_type, 'file_path': file_path, 'line_num': line_num, 'signature': sig, 'docstring': doc, 'code_context': context, 'language': lang, 'file_type': ftype, 'words_matched': set() } all_matches[key]['words_matched'].add(word) # Score each result results = [] for key, match in all_matches.items(): score = 0 name_lower = match['name'].lower() doc_lower = (match['docstring'] or '').lower() context_lower = (match['code_context'] or '').lower() # Check for exact phrase match (huge bonus) if len(words) > 1 and query_lower in name_lower: score += 20 elif len(words) > 1 and query_lower in doc_lower: score += 15 elif len(words) > 1 and query_lower in context_lower: score += 10 # Score based on individual word matches words_found = len(match['words_matched']) for word in match['words_matched']: # Reduced base score for single word match if word in name_lower: score += 2 if name_lower.startswith(word): score += 3 if doc_lower and word in doc_lower: score += 1 if context_lower and word in context_lower: score += 0.5 # Bonus for multiple word matches (compound scoring) if words_found > 1: score += words_found * 3 # Special boost for command-like queries if 'git' in words and match['language'] in ['shell', 'bash', 'python']: score += 2 match['score'] = score match['match_type'] = 'text' # Remove helper fields match.pop('words_matched') match.pop('code_context') results.append(match) # Sort by score and return top results results.sort(key=lambda x: x['score'], reverse=True) return results[:limit * 2] # Return more for deduplication later def _log_search(self, query: str): """Log search for analytics""" self.conn.execute( "INSERT INTO search_history (query, timestamp) VALUES (?, ?)", (query, datetime.now().isoformat()) ) self.conn.commit() def show_code(self, symbol_id: int): """Display code for a symbol""" cursor = self.conn.execute( "SELECT name, file_path, line_num, code_context FROM symbols WHERE id = ?", (symbol_id,) ) row = cursor.fetchone() if row: name, file_path, line_num, context = row print(f"\n📄 {file_path}:{line_num}") print(f"🔍 {name}") print("-" * 60) print(context) print("-" * 60) # Update access count self.conn.execute( "UPDATE symbols SET access_count = access_count + 1 WHERE id = ?", (symbol_id,) ) self.conn.commit() def interactive_mode(search): """Interactive search mode with VSCode integration""" import subprocess import readline # For better input handling print(""" 🔍 Interactive Smart Code Search Type queries to search, then select files to open in VSCode. Commands: - Enter number(s) to open files (e.g., '1' or '1,3,5' or '1-5') - 'a' to open all results - 'n' for next page, 'p' for previous page - 'q' to quit - Just press Enter to search again """) last_results = [] current_page = 1 results_per_page = 20 while True: try: # Get search query query = input("\n🔎 Search (or 'q' to quit): ").strip() if query.lower() == 'q': print("👋 Goodbye!") break if not query: continue # Perform search results = search.search(query, limit=100) if not results: print(f"❌ No results found for '{query}'") continue last_results = results current_page = 1 # Reset to first page for new search # Calculate paging info total_pages = (len(results) + results_per_page - 1) // results_per_page # Display current page results while True: start_idx = (current_page - 1) * results_per_page end_idx = min(start_idx + results_per_page, len(results)) page_results = results[start_idx:end_idx] # Display results print(f"\n📋 Found {len(results)} results (Page {current_page} of {total_pages}):\n") # Print header row print(f"{'#':>2} {'File':<40} {'Symbol':<20} {'Type':<9} {'Line':<6} {'Score':<6}") # Display in compact single-line format file_map = {} seen_files = set() for r in page_results: # Calculate global index (1-based) global_idx = results.index(r) + 1 file_path = r['file_path'] # Track unique files for opening if file_path not in seen_files: seen_files.add(file_path) file_num = len(seen_files) file_map[file_num] = file_path # Truncate path for display display_path = file_path if len(display_path) > 40: # Show start and end of path display_path = display_path[:20] + '...' + display_path[-17:] # Format: number, file path, symbol name (type) line score name = r['name'] if len(name) > 20: name = name[:17] + '...' print(f"{global_idx:2d}. {display_path:<40} {name:<20} {r['type']:<9} {r.get('line_num', 0):<5} [{r['score']:.3f}]") # Get user action if total_pages > 1: action = input("\nOpen files by: #, range: #-#, (a)ll, (n)ext, (p)rev, or <enter> new search: ").strip().lower() else: action = input("\nOpen files by: #, range: #-#, (a)ll, or <enter> new search: ").strip().lower() # Handle paging navigation if action == 'n' and current_page < total_pages: current_page += 1 continue # Re-display current page elif action == 'p' and current_page > 1: current_page -= 1 continue # Re-display current page else: # Break inner loop to handle other actions break if not action: continue elif action == 'a': # Open all unique files files_to_open = list(seen_files) else: # Parse result numbers files_to_open = [] result_indices = [] parts = action.replace(' ', '').split(',') for part in parts: if '-' in part: # Handle range try: start, end = map(int, part.split('-')) for n in range(start, min(end + 1, len(results) + 1)): result_indices.append(n) except: continue else: # Single number try: n = int(part) if 1 <= n <= len(results): result_indices.append(n) except: continue # Get files with specific line numbers from selected results file_line_map = {} for idx in result_indices: if idx <= len(results): result = results[idx - 1] file_path = result['file_path'] line_num = result.get('line_num', 0) # Keep first occurrence or lowest line number if file_path not in file_line_map or line_num < file_line_map[file_path]: file_line_map[file_path] = line_num files_to_open = list(file_line_map.keys()) # Open in VSCode if files_to_open: try: # Build VSCode command with line numbers using relative paths vscode_args = ["code"] for f in files_to_open: # Use relative paths as stored in database if f in file_line_map and file_line_map[f] > 0: # Use -g flag for goto line vscode_args.extend(["-g", f"{f}:{file_line_map[f]}"]) else: vscode_args.append(f) subprocess.run(vscode_args) print(f"✅ Opened {len(files_to_open)} file(s) in VSCode") except Exception as e: print(f"❌ Error opening VSCode: {e}") except KeyboardInterrupt: print("\n\n👋 Goodbye!") break except EOFError: print("\n\n👋 Goodbye!") break def show_help(): """Display help information""" print(""" 🔍 Smart Code Search (scs) - Find code by meaning! Usage: scs index # Index your project scs interactive # Interactive search mode scs <query> # Search for code scs show <id> # Show code for result scs help # Show this help Examples: scs "validate email" scs "handle user authentication" scs "calculate tax" --docs scs interactive Options: --json Output results in JSON format --export=<type> Export results (vscode, list) --docs Search only documentation --code Search only code (default) --config Search only config files (yaml, docker, json, etc) --all Search code, docs, and config --lang=<lang> Filter by language (python, js, etc) --type=<ext> Filter by file extension --force Force re-index all files --incremental Only index changed files (default) -h, --help Show this help message """) def main(): """CLI interface""" # Check for help flags anywhere in arguments if '--help' in sys.argv or '-h' in sys.argv or (len(sys.argv) > 1 and sys.argv[1] == 'help'): show_help() return # Check if JSON output is requested to suppress initialization message quiet = "--json" in sys.argv or "--export" in sys.argv search = SmartCodeSearch(quiet=quiet) if len(sys.argv) < 2: show_help() return command = sys.argv[1] if command == "index": force = "--force" in sys.argv incremental = "--incremental" in sys.argv or not force search.index_project(force_reindex=force, incremental=incremental) elif command == "interactive": interactive_mode(search) elif command == "show" and len(sys.argv) > 2: symbol_id = int(sys.argv[2]) search.show_code(symbol_id) else: # Search args = [arg for arg in sys.argv[1:] if not arg.startswith("--")] query = " ".join(args) # Parse options json_output = "--json" in sys.argv export_type = None file_type_filter = None language_filter = None # Check for export option for arg in sys.argv: if arg.startswith("--export="): export_type = arg.split("=")[1] elif arg.startswith("--lang="): language_filter = arg.split("=")[1] elif arg.startswith("--type="): # Filter by file extension ext = arg.split("=")[1] if not ext.startswith('.'): ext = '.' + ext # Map extension to language for lang, info in search.language_patterns.items(): if ext in info.get('extensions', []): language_filter = lang break # Determine file type filter if "--docs" in sys.argv: file_type_filter = "document" elif "--code" in sys.argv: file_type_filter = "code" elif "--config" in sys.argv: file_type_filter = "config" # --all means no filter results = search.search(query, file_type=file_type_filter, language_filter=language_filter) # Handle different output formats if export_type == "list": # Just output file paths unique_files = sorted(set(r['file_path'] for r in results)) for f in unique_files: print(f) elif export_type == "vscode": # Open files in VSCode unique_files = sorted(set(r['file_path'] for r in results)) if unique_files: import subprocess cmd = ["code"] + unique_files subprocess.run(cmd) print(f"✅ Opened {len(unique_files)} files in VSCode") else: print("❌ No files to open") elif json_output: # JSON output for machine consumption output = { "query": query, "results": results } print(json.dumps(output, indent=2)) else: # Human-readable output if not results: print(f"❌ No results found for '{query}'") return # Group by file type code_results = [r for r in results if r.get('file_type') == 'code'] doc_results = [r for r in results if r.get('file_type') == 'document'] config_results = [r for r in results if r.get('file_type') == 'config'] print(f"\n🔍 Results for '{query}':") if code_results: print(f"\n📄 CODE ({len(code_results)} results):") print(f"{'ID':<4} {'File':<40} {'Name':<25} {'Type':<12} {'Line':<6} {'Score':<6}") print("-" * 100) for r in code_results[:10]: # Show top 10 # Truncate file path for display file_short = r['file_path'] if len(file_short) > 40: file_short = file_short[:20] + '...' + file_short[-17:] # Truncate name if too long name = r['name'] if len(name) > 24: name = name[:21] + '...' print(f"{r['id']:<4} {file_short:<40} {name:<25} {r['type'][:11]:<12} " f"{r.get('line_num', 0):<6} {r['score']:.3f}") # Show signature or first line of docstring if r.get('signature') and r['type'] in ['function', 'method']: print(f" → {r['name']}{r['signature']}") elif r.get('docstring'): first_line = r['docstring'].split('\n')[0][:55] if first_line: print(f" 📝 {first_line}") if doc_results: print(f"\n📚 DOCUMENTATION ({len(doc_results)} results):") print(f"{'ID':<4} {'Title':<40} {'Type':<15} {'File':<35} {'Score':<6}") print("-" * 95) for r in doc_results[:10]: # Show top 10 file_short = r['file_path'] if len(file_short) > 32: file_short = "..." + file_short[-32:] print(f"{r['id']:<4} {r['name'][:39]:<40} {r['type'][:14]:<15} " f"{file_short:<35} {r['score']:.3f}") if config_results: print(f"\n⚙️ CONFIG ({len(config_results)} results):") print(f"{'ID':<4} {'File':<40} {'Name':<25} {'Type':<12} {'Line':<6} {'Score':<6}") print("-" * 100) for r in config_results[:10]: # Show top 10 # Truncate file path for display file_short = r['file_path'] if len(file_short) > 40: file_short = file_short[:20] + '...' + file_short[-17:] # Truncate name if too long name_short = r['name'][:24] if len(r['name']) > 24 else r['name'] print(f"{r['id']:<4} {file_short:<40} {name_short:<25} " f"{r['type']:<12} L{r.get('line_num', 0):<5} {r['score']:.3f}") print(f"\n💡 Tip: Use 'scs show <id>' to see the full content") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stevenjjobson/scs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server