scsold.py•82.6 kB
#!/usr/bin/env python3
"""
Smart Code Search - Find code by meaning, not just text
One file to rule them all!
"""
import ast
import sqlite3
import json
import subprocess
import os
import sys
import re
from pathlib import Path
from datetime import datetime
from typing import List, Tuple, Dict, Optional
import hashlib
import time
import threading
from queue import Queue
# Check dependencies
try:
from sentence_transformers import SentenceTransformer
import numpy as np
except ImportError:
print("Installing required dependencies...")
subprocess.check_call([sys.executable, "-m", "pip", "install",
"sentence-transformers", "numpy", "torch"])
from sentence_transformers import SentenceTransformer
import numpy as np
class SmartCodeSearch:
def __init__(self, project_root=".", quiet=False):
self.root = Path(project_root)
self.db_path = self.root / ".claude-symbols" / "search.db"
self.db_path.parent.mkdir(exist_ok=True)
self.debug = "--debug" in sys.argv
self.db_operations = 0
self.db_time = 0
# Initialize model (downloads ~90MB on first run)
if not quiet:
print("🤖 Initializing AI model...")
try:
model_name = 'all-MiniLM-L6-v2'
local_model_path = self.root / model_name
# Check if model exists locally
if local_model_path.exists():
# Use local model
self.model = SentenceTransformer(str(local_model_path))
if not quiet:
print(f" ✅ Model loaded from local directory: {model_name}")
else:
# Model not found locally, need to download
if not quiet:
print(f" 📥 Model not found locally. Setting up Git LFS and downloading...")
# Check if git-lfs is installed
try:
subprocess.run(["git", "lfs", "version"], check=True, capture_output=True)
except (subprocess.CalledProcessError, FileNotFoundError):
if not quiet:
print(" 📦 Installing Git LFS...")
try:
# Try to install git-lfs
subprocess.run(["sudo", "apt-get", "update"], check=True, capture_output=True)
subprocess.run(["sudo", "apt-get", "install", "-y", "git-lfs"], check=True, capture_output=True)
subprocess.run(["git", "lfs", "install"], check=True, capture_output=True)
if not quiet:
print(" ✅ Git LFS installed successfully")
except subprocess.CalledProcessError:
print("\n❌ Failed to install Git LFS automatically.")
print(" Please install manually with: sudo apt-get install git-lfs")
print(" Then run: git lfs install")
raise
# Clone the model repository
try:
if not quiet:
print(f" 🔄 Cloning model from HuggingFace...")
# Clone the repository
subprocess.run([
"git", "clone",
"https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2",
str(local_model_path)
], check=True, capture_output=True)
# Pull LFS files
subprocess.run(
["git", "lfs", "pull"],
cwd=str(local_model_path),
check=True,
capture_output=True
)
if not quiet:
print(" ✅ Model downloaded successfully")
# Load the model from local directory
self.model = SentenceTransformer(str(local_model_path))
except subprocess.CalledProcessError as e:
print(f"\n❌ Failed to download model: {e}")
print(" Please download manually:")
print(" git clone https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2")
print(" cd all-MiniLM-L6-v2/")
print(" git lfs pull")
raise
# Test the model
test_embedding = self.model.encode("test", convert_to_numpy=True)
if test_embedding is None or len(test_embedding) == 0:
raise Exception("Model failed to generate embeddings")
except Exception as e:
print(f"\n❌ Error initializing AI model: {e}")
print(" The search tool requires the sentence-transformers library.")
print(" Please ensure you have an internet connection for the initial download.")
raise
self.conn = sqlite3.connect(self.db_path)
self._init_db()
# Language patterns for different file types
self.language_patterns = {
'python': {
'extensions': ['.py'],
'function': re.compile(r'^(?:async\s+)?def\s+(\w+)\s*\((.*?)\):', re.MULTILINE),
'class': re.compile(r'^class\s+(\w+)(?:\((.*?)\))?:', re.MULTILINE),
'method': re.compile(r'^\s+(?:async\s+)?def\s+(\w+)\s*\((.*?)\):', re.MULTILINE),
'variable': re.compile(r'^([A-Z_]+)\s*=\s*[^=]', re.MULTILINE), # Constants
'import': re.compile(r'^(?:from\s+(\S+)\s+)?import\s+(.+)$', re.MULTILINE),
'decorator': re.compile(r'^@(\w+(?:\.\w+)*)', re.MULTILINE),
'type_alias': re.compile(r'^(\w+)\s*=\s*(?:Union|Optional|List|Dict|Tuple|Type|Callable)\[', re.MULTILINE)
},
'javascript': {
'extensions': ['.js', '.jsx', '.mjs'],
'function': re.compile(r'(?:function\s+(\w+)|const\s+(\w+)\s*=\s*(?:async\s+)?(?:\([^)]*\)|[^=]+)\s*=>)', re.MULTILINE),
'class': re.compile(r'class\s+(\w+)(?:\s+extends\s+\w+)?', re.MULTILINE),
'method': re.compile(r'(?:async\s+)?(\w+)\s*\([^)]*\)\s*{', re.MULTILINE),
'variable': re.compile(r'^(?:export\s+)?(?:const|let|var)\s+(\w+)\s*=', re.MULTILINE),
'import': re.compile(r'^import\s+(?:\{[^}]+\}|\*\s+as\s+\w+|\w+)(?:\s*,\s*\{[^}]+\})?\s+from\s+[\'"]([^\'\"]+)[\'"]', re.MULTILINE),
'export': re.compile(r'^export\s+(?:default\s+)?(?:const|let|var|function|class)\s+(\w+)', re.MULTILINE),
'component': re.compile(r'^(?:export\s+)?(?:function|const)\s+([A-Z]\w+).*?(?:=.*?=>.*?(?:<|React\.|jsx)|:\s*(?:React\.)?FC)', re.MULTILINE)
},
'typescript': {
'extensions': ['.ts', '.tsx'],
'function': re.compile(r'(?:export\s+)?(?:async\s+)?(?:function\s+(\w+)|const\s+(\w+)\s*(?::\s*[^=]+)?\s*=\s*(?:async\s+)?(?:\([^)]*\)|[^=]+)\s*=>)', re.MULTILINE),
'class': re.compile(r'(?:export\s+)?class\s+(\w+)(?:\s+extends\s+\w+)?', re.MULTILINE),
'interface': re.compile(r'(?:export\s+)?interface\s+(\w+)', re.MULTILINE),
'type': re.compile(r'^(?:export\s+)?type\s+(\w+)\s*=', re.MULTILINE),
'enum': re.compile(r'^(?:export\s+)?enum\s+(\w+)', re.MULTILINE),
'variable': re.compile(r'^(?:export\s+)?(?:const|let|var)\s+(\w+)\s*(?::\s*[^=]+)?\s*=', re.MULTILINE),
'import': re.compile(r'^import\s+(?:type\s+)?(?:\{[^}]+\}|\*\s+as\s+\w+|\w+)(?:\s*,\s*\{[^}]+\})?\s+from\s+[\'"]([^\'\"]+)[\'"]', re.MULTILINE),
'component': re.compile(r'^(?:export\s+)?(?:function|const)\s+([A-Z]\w+).*?(?:=.*?=>.*?(?:<|React\.|jsx|tsx)|:\s*(?:React\.)?FC)', re.MULTILINE)
},
'java': {
'extensions': ['.java'],
'class': re.compile(r'(?:public\s+)?class\s+(\w+)', re.MULTILINE),
'method': re.compile(r'(?:public|private|protected)?\s*(?:static\s+)?(?:\w+\s+)?(\w+)\s*\([^)]*\)\s*(?:throws\s+[^{]+)?\s*{', re.MULTILINE),
'interface': re.compile(r'(?:public\s+)?interface\s+(\w+)', re.MULTILINE)
},
'cpp': {
'extensions': ['.cpp', '.cc', '.cxx', '.hpp', '.h', '.hxx'],
'function': re.compile(r'(?:\w+\s+)*(\w+)\s*\([^)]*\)\s*(?:const\s*)?(?:override\s*)?{', re.MULTILINE),
'class': re.compile(r'class\s+(\w+)(?:\s*:\s*(?:public|private|protected)\s+\w+)?', re.MULTILINE),
'struct': re.compile(r'struct\s+(\w+)', re.MULTILINE)
},
'go': {
'extensions': ['.go'],
'function': re.compile(r'func\s+(?:\(\s*\w+\s+[^)]+\)\s+)?(\w+)\s*\([^)]*\)', re.MULTILINE),
'struct': re.compile(r'type\s+(\w+)\s+struct', re.MULTILINE),
'interface': re.compile(r'type\s+(\w+)\s+interface', re.MULTILINE)
},
'markdown': {
'extensions': ['.md', '.markdown'],
'header': re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE),
'code_block': re.compile(r'^```(\w+)?\n(.*?)```', re.MULTILINE | re.DOTALL)
},
'text': {
'extensions': ['.txt', '.rst', '.adoc'],
'section': re.compile(r'^([A-Z][^.!?]*(?:[.!?]\s*|$))', re.MULTILINE)
},
'yaml': {
'extensions': ['.yml', '.yaml'],
'key': re.compile(r'^(\w+):\s*$', re.MULTILINE),
'section': re.compile(r'^(\w+):\s*(?:#.*)?$', re.MULTILINE),
'list_item': re.compile(r'^\s*-\s+(\w+)', re.MULTILINE)
},
'docker': {
'extensions': ['Dockerfile', '.dockerfile'],
'instruction': re.compile(r'^(FROM|RUN|CMD|EXPOSE|ENV|ADD|COPY|ENTRYPOINT|VOLUME|USER|WORKDIR|ARG|ONBUILD|LABEL)\s+', re.MULTILINE | re.IGNORECASE),
'stage': re.compile(r'^FROM\s+.*?\s+AS\s+(\w+)', re.MULTILINE | re.IGNORECASE)
},
'json': {
'extensions': ['.json', '.jsonc'],
'key': re.compile(r'"(\w+)":\s*["{[]', re.MULTILINE)
},
'toml': {
'extensions': ['.toml'],
'section': re.compile(r'^\[([^\]]+)\]', re.MULTILINE),
'key': re.compile(r'^(\w+)\s*=', re.MULTILINE)
},
'ini': {
'extensions': ['.ini', '.cfg', '.conf'],
'section': re.compile(r'^\[([^\]]+)\]', re.MULTILINE),
'key': re.compile(r'^(\w+)\s*=', re.MULTILINE)
},
'shell': {
'extensions': ['.sh', '.bash', '.zsh', '.fish'],
'function': re.compile(r'^(?:function\s+)?(\w+)\s*\(\s*\)', re.MULTILINE),
'variable': re.compile(r'^export\s+(\w+)=', re.MULTILINE)
},
'makefile': {
'extensions': ['Makefile', 'makefile', '.mk'],
'target': re.compile(r'^([a-zA-Z0-9_-]+):', re.MULTILINE),
'variable': re.compile(r'^(\w+)\s*[:?]?=', re.MULTILINE)
},
'sql': {
'extensions': ['.sql'],
'table': re.compile(r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(\w+)', re.MULTILINE | re.IGNORECASE),
'function': re.compile(r'CREATE\s+(?:OR\s+REPLACE\s+)?FUNCTION\s+(\w+)', re.MULTILINE | re.IGNORECASE),
'procedure': re.compile(r'CREATE\s+(?:OR\s+REPLACE\s+)?PROCEDURE\s+(\w+)', re.MULTILINE | re.IGNORECASE)
},
'terraform': {
'extensions': ['.tf', '.tfvars'],
'resource': re.compile(r'^resource\s+"[^"]+"\s+"([^"]+)"', re.MULTILINE),
'variable': re.compile(r'^variable\s+"([^"]+)"', re.MULTILINE),
'module': re.compile(r'^module\s+"([^"]+)"', re.MULTILINE)
},
'ruby': {
'extensions': ['.rb', '.rake', 'Gemfile'],
'class': re.compile(r'class\s+(\w+)', re.MULTILINE),
'method': re.compile(r'def\s+(\w+)', re.MULTILINE),
'module': re.compile(r'module\s+(\w+)', re.MULTILINE)
},
'rust': {
'extensions': ['.rs'],
'function': re.compile(r'fn\s+(\w+)', re.MULTILINE),
'struct': re.compile(r'struct\s+(\w+)', re.MULTILINE),
'enum': re.compile(r'enum\s+(\w+)', re.MULTILINE),
'impl': re.compile(r'impl(?:\s+\w+)?\s+for\s+(\w+)|impl\s+(\w+)', re.MULTILINE)
},
'csharp': {
'extensions': ['.cs', '.csx'],
'class': re.compile(r'(?:public\s+|private\s+|internal\s+)?class\s+(\w+)', re.MULTILINE),
'method': re.compile(r'(?:public\s+|private\s+|protected\s+|internal\s+)?(?:static\s+)?(?:async\s+)?(?:\w+\s+)?(\w+)\s*\([^)]*\)\s*{', re.MULTILINE),
'interface': re.compile(r'interface\s+(\w+)', re.MULTILINE)
},
'swift': {
'extensions': ['.swift'],
'class': re.compile(r'class\s+(\w+)', re.MULTILINE),
'func': re.compile(r'func\s+(\w+)', re.MULTILINE),
'struct': re.compile(r'struct\s+(\w+)', re.MULTILINE),
'enum': re.compile(r'enum\s+(\w+)', re.MULTILINE)
},
'kotlin': {
'extensions': ['.kt', '.kts'],
'class': re.compile(r'class\s+(\w+)', re.MULTILINE),
'fun': re.compile(r'fun\s+(\w+)', re.MULTILINE),
'interface': re.compile(r'interface\s+(\w+)', re.MULTILINE)
},
'scala': {
'extensions': ['.scala'],
'class': re.compile(r'class\s+(\w+)', re.MULTILINE),
'def': re.compile(r'def\s+(\w+)', re.MULTILINE),
'object': re.compile(r'object\s+(\w+)', re.MULTILINE),
'trait': re.compile(r'trait\s+(\w+)', re.MULTILINE)
},
'php': {
'extensions': ['.php'],
'class': re.compile(r'class\s+(\w+)', re.MULTILINE),
'function': re.compile(r'function\s+(\w+)', re.MULTILINE),
'interface': re.compile(r'interface\s+(\w+)', re.MULTILINE)
},
'lua': {
'extensions': ['.lua'],
'function': re.compile(r'function\s+(\w+)|(\w+)\s*=\s*function', re.MULTILINE)
},
'r': {
'extensions': ['.r', '.R'],
'function': re.compile(r'(\w+)\s*<-\s*function', re.MULTILINE)
},
'julia': {
'extensions': ['.jl'],
'function': re.compile(r'function\s+(\w+)', re.MULTILINE),
'struct': re.compile(r'struct\s+(\w+)', re.MULTILINE)
},
'dart': {
'extensions': ['.dart'],
'class': re.compile(r'class\s+(\w+)', re.MULTILINE),
'function': re.compile(r'(?:void|int|String|bool|double|var|dynamic)\s+(\w+)\s*\(', re.MULTILINE)
},
'elixir': {
'extensions': ['.ex', '.exs'],
'defmodule': re.compile(r'defmodule\s+([\w.]+)', re.MULTILINE),
'def': re.compile(r'def\s+(\w+)', re.MULTILINE),
'defp': re.compile(r'defp\s+(\w+)', re.MULTILINE)
},
'clojure': {
'extensions': ['.clj', '.cljs'],
'defn': re.compile(r'\(defn\s+(\S+)', re.MULTILINE),
'def': re.compile(r'\(def\s+(\S+)', re.MULTILINE)
},
'haskell': {
'extensions': ['.hs'],
'function': re.compile(r'^(\w+)\s*::', re.MULTILINE)
},
'perl': {
'extensions': ['.pl', '.pm'],
'sub': re.compile(r'sub\s+(\w+)', re.MULTILINE),
'package': re.compile(r'package\s+([\w:]+)', re.MULTILINE)
},
'objc': {
'extensions': ['.m', '.mm'],
'interface': re.compile(r'@interface\s+(\w+)', re.MULTILINE),
'implementation': re.compile(r'@implementation\s+(\w+)', re.MULTILINE),
'method': re.compile(r'[-+]\s*\([^)]+\)\s*(\w+)', re.MULTILINE)
},
'vue': {
'extensions': ['.vue'],
'component': re.compile(r'name:\s*[\'"](\w+)[\'"]', re.MULTILINE),
'script': re.compile(r'<script[^>]*>', re.MULTILINE),
'template': re.compile(r'<template[^>]*>', re.MULTILINE)
},
'svelte': {
'extensions': ['.svelte'],
'script': re.compile(r'<script[^>]*>', re.MULTILINE)
},
'html': {
'extensions': ['.html', '.htm'],
'tag': re.compile(r'<(\w+)[^>]*id=[\'"]([^\'"]*)[\'"]', re.MULTILINE)
},
'css': {
'extensions': ['.css', '.scss', '.sass', '.less'],
'class': re.compile(r'\.(\w[\w-]*)\s*\{', re.MULTILINE),
'id': re.compile(r'#(\w[\w-]*)\s*\{', re.MULTILINE)
},
'xml': {
'extensions': ['.xml', '.xsl', '.xslt'],
'element': re.compile(r'<(\w+)[^>]*>', re.MULTILINE)
},
'c': {
'extensions': ['.c', '.h'],
'function': re.compile(r'(?:\w+\s+)*(\w+)\s*\([^)]*\)\s*{', re.MULTILINE),
'struct': re.compile(r'struct\s+(\w+)', re.MULTILINE)
},
'properties': {
'extensions': ['.properties'],
'property': re.compile(r'^(\w[\w.]*)\s*=', re.MULTILINE)
},
'env': {
'extensions': ['.env'],
'variable': re.compile(r'^(\w+)=', re.MULTILINE)
},
'gradle': {
'extensions': ['.gradle', '.gradle.kts', 'build.gradle', 'settings.gradle'],
'task': re.compile(r'task\s+(\w+)', re.MULTILINE),
'dependency': re.compile(r'dependencies\s*\{', re.MULTILINE)
},
'maven': {
'extensions': ['pom.xml'],
'artifactId': re.compile(r'<artifactId>([^<]+)</artifactId>', re.MULTILINE),
'groupId': re.compile(r'<groupId>([^<]+)</groupId>', re.MULTILINE)
},
'nginx': {
'extensions': ['nginx.conf', '.nginx'],
'server': re.compile(r'server\s*\{', re.MULTILINE),
'location': re.compile(r'location\s+([^\s{]+)', re.MULTILINE)
},
'apache': {
'extensions': ['.htaccess', 'httpd.conf'],
'directive': re.compile(r'^(\w+)\s+', re.MULTILINE)
},
'ansible': {
'extensions': ['playbook.yml', 'ansible.cfg'],
'task': re.compile(r'-\s+name:\s+(.+)', re.MULTILINE),
'role': re.compile(r'role:\s+(\w+)', re.MULTILINE)
},
'vagrant': {
'extensions': ['Vagrantfile'],
'config': re.compile(r'config\.vm\.(\w+)', re.MULTILINE)
},
'csv': {
'extensions': ['.csv', '.tsv'],
'header': re.compile(r'^([^\n]+)', re.MULTILINE)
}
}
# Load config if exists
self.config = self._load_config()
def _db_execute(self, query, params=None):
"""Execute database query with timing"""
start = time.time()
if params:
result = self.conn.execute(query, params)
else:
result = self.conn.execute(query)
elapsed = time.time() - start
self.db_operations += 1
self.db_time += elapsed
if self.debug and elapsed > 0.1: # Log slow queries
print(f"\n⚠️ Slow DB query ({elapsed:.2f}s): {query[:50]}...")
return result
def _init_db(self):
"""Create tables for search"""
self.conn.executescript('''
CREATE TABLE IF NOT EXISTS symbols (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
file_path TEXT NOT NULL,
line_num INTEGER NOT NULL,
end_line INTEGER,
signature TEXT,
docstring TEXT,
code_context TEXT,
embedding BLOB,
last_updated TEXT,
access_count INTEGER DEFAULT 0,
language TEXT,
file_type TEXT,
UNIQUE(file_path, name, line_num)
);
CREATE TABLE IF NOT EXISTS search_history (
id INTEGER PRIMARY KEY,
query TEXT,
timestamp TEXT,
results_clicked TEXT
);
CREATE INDEX IF NOT EXISTS idx_symbols_name ON symbols(name);
CREATE INDEX IF NOT EXISTS idx_symbols_type ON symbols(type);
CREATE INDEX IF NOT EXISTS idx_symbols_file ON symbols(file_path);
''')
self.conn.commit()
def _load_config(self):
"""Load project-specific configuration"""
config_path = self.root / ".scs.json"
default_config = {
"include": [
"**/*.py", "**/*.js", "**/*.jsx", "**/*.ts", "**/*.tsx",
"**/*.java", "**/*.cpp", "**/*.cc", "**/*.h", "**/*.hpp",
"**/*.go", "**/*.rs", "**/*.md", "**/*.txt",
"**/*.yml", "**/*.yaml", "**/Dockerfile", "**/*.dockerfile",
"**/*.json", "**/*.toml", "**/*.ini", "**/*.cfg", "**/*.conf",
"**/*.sh", "**/*.bash", "**/Makefile", "**/*.mk",
"**/*.sql", "**/*.tf", "**/*.tfvars"
],
"exclude": [
"**/node_modules/**", "**/.venv/**", "**/venv/**", "**/__pycache__/**",
"**/dist/**", "**/build/**", "**/.git/**", "**/target/**",
"**/*.min.js", "**/*.min.css", "**/package-lock.json", "**/yarn.lock"
],
"minScore": 0.2,
"languages": [
"python", "javascript", "typescript", "java", "cpp", "go",
"markdown", "yaml", "docker", "json", "toml", "ini", "shell",
"makefile", "sql", "terraform"
]
}
if config_path.exists():
try:
with open(config_path, 'r') as f:
user_config = json.load(f)
default_config.update(user_config)
except Exception as e:
print(f"Warning: Could not load config: {e}")
return default_config
def _fix_pattern(self, pattern):
"""Fix invalid ** patterns to be valid glob patterns"""
if pattern.startswith('**') and not pattern.startswith('**/'):
# Handle cases like **.py -> **/*.py
if pattern[2:].startswith('.'):
return '**/*' + pattern[2:]
else:
# Handle cases like **test*.py -> **/test*.py
return '**/' + pattern[2:]
return pattern
def index_project(self, file_pattern=None, force_reindex=False, incremental=True):
"""Index all supported files in the project"""
# Show initial message
mode = 'incremental' if incremental and not force_reindex else 'full'
print(f"\n🔍 Indexing {self.root} ({mode} mode)")
# Collect all files based on config or pattern
start_time = time.time()
all_files = []
if file_pattern:
# Use provided pattern
raw_patterns = [file_pattern] if isinstance(file_pattern, str) else file_pattern
# Apply pattern fixing to direct patterns too
patterns = [self._fix_pattern(p) for p in raw_patterns]
else:
# Use config patterns
config_patterns = self.config.get('include', ['**/*.py'])
# Apply pattern fixing to all config patterns
patterns = [self._fix_pattern(p) for p in config_patterns]
# Gather files using single directory walk
# Prepare exclude directory names for fast lookup
exclude_dirs = {
'node_modules', '.git', '__pycache__', 'venv', '.venv',
'dist', 'build', '.next', 'coverage', '.pytest_cache',
'tmp', 'temp', '.idea', '.vscode', 'target'
}
# Get all exclude patterns once
exclude_patterns = self.config.get('exclude', [])
# Single walk through directory tree
all_files = []
scanned_dirs = 0
scanned_files = 0
for root, dirs, files in os.walk(self.root):
# Early directory exclusion - modify dirs in-place to skip them
dirs[:] = [d for d in dirs if d not in exclude_dirs]
scanned_dirs += 1
# Progress update every 100 directories
if scanned_dirs % 100 == 0:
print(f"\rScanning: {scanned_dirs} dirs, {len(all_files)} files found...", end='', flush=True)
# Convert root to Path for easier pattern matching
root_path = Path(root)
# Check files in this directory
for file in files:
scanned_files += 1
file_path = root_path / file
# Check if file matches any include pattern
for pattern in patterns:
# Convert pattern to be relative to current root if needed
if file_path.match(pattern):
all_files.append(file_path)
break
# Clear progress line
print(f"\r" + " " * 60 + "\r", end='')
# Apply exclusions more efficiently
filtered_files = []
excluded_count = 0
if all_files:
# Pre-compile simple exclusions for faster matching
simple_excludes = set()
pattern_excludes = []
for exclude in exclude_patterns:
if '*' not in exclude and '?' not in exclude:
# Simple filename exclusion
simple_excludes.add(exclude)
else:
# Pattern-based exclusion
pattern_excludes.append(exclude)
# Filter files
for i, file_path in enumerate(all_files):
# Check simple excludes first (fast)
if file_path.name in simple_excludes:
excluded_count += 1
continue
# Check pattern excludes
excluded = False
for exclude_pattern in pattern_excludes:
if file_path.match(exclude_pattern):
excluded = True
excluded_count += 1
break
if not excluded:
filtered_files.append(file_path)
files = filtered_files
total_files = len(files)
if total_files == 0:
print(f"\n❌ No files found to index!")
print(f" • Check your include patterns: {patterns[:3]}...")
print(f" • Check your exclude patterns: {exclude_patterns[:3]}...")
print(f" • Current directory: {self.root}")
return
print(f"Found {len(all_files)} files, {total_files} after filtering ({excluded_count} excluded)")
# Initialize tracking variables
indexed = 0
skipped = 0
errors = 0
error_details = []
# Hang detection setup
last_progress_time = time.time()
hang_detected = False
current_file = None
def check_for_hang():
nonlocal hang_detected
while indexed + skipped + errors < total_files and not hang_detected:
time.sleep(5) # Check every 5 seconds
if current_file and time.time() - last_progress_time > 10:
hang_detected = True
print(f"\n\n⚠️ Indexing appears to be hanging on: {current_file}")
print(f" Consider checking if the file is too large or has encoding issues.")
# Start hang detection thread
hang_thread = threading.Thread(target=check_for_hang, daemon=True)
hang_thread.start()
print(f"\nIndexing {total_files} files...")
for i, file_path in enumerate(files):
current_file = file_path
last_progress_time = time.time()
# Calculate progress and time estimates
progress = (i + 1) / total_files
elapsed = time.time() - start_time
if i > 0:
rate = i / elapsed
remaining = (total_files - i) / rate if rate > 0 else 0
eta = f"ETA: {int(remaining//60)}m {int(remaining%60)}s"
else:
eta = "Calculating..."
# Enhanced progress bar
bar_length = 40
filled = int(bar_length * progress)
bar = '█' * filled + '░' * (bar_length - filled)
percent = int(progress * 100)
# Show progress with file name and stats
file_display = file_path.name[:25] + '...' if len(file_path.name) > 25 else file_path.name
stats = f"[✓{indexed} ↪{skipped} ✗{errors}]"
print(f"\r[{bar}] {percent:3d}% - {file_display:<30} {stats} {eta}", end='', flush=True)
try:
if incremental and not force_reindex:
# Check if file needs indexing (modified since last index)
if self._should_index_file(file_path, force_reindex):
self._index_file(file_path)
indexed += 1
else:
skipped += 1
else:
# Force index all files
self._index_file(file_path)
indexed += 1
except Exception as e:
errors += 1
error_details.append({
'file': str(file_path),
'error': str(e),
'type': type(e).__name__
})
if "--debug" in sys.argv:
print(f"\n❌ Error in {file_path}: {e}")
# Clear progress bar
print(f"\r" + " " * 100 + "\r", end='')
# Final statistics
total_time = time.time() - start_time
print(f"\n✅ Indexed {indexed} files in {total_time:.1f}s ({skipped} unchanged, {errors} errors)")
# Show error details if any
if errors > 0 and error_details:
print(f"\n⚠️ Errors encountered:")
# Group errors by type
error_types = {}
for err in error_details[:10]: # Show first 10 errors
err_type = err['type']
if err_type not in error_types:
error_types[err_type] = []
error_types[err_type].append(err['file'])
for err_type, files in error_types.items():
print(f" • {err_type}: {len(files)} file(s)")
for f in files[:3]: # Show first 3 files per error type
print(f" - {f}")
if len(files) > 3:
print(f" ... and {len(files) - 3} more")
# Show compact stats by language
cursor = self.conn.execute("""
SELECT language, COUNT(DISTINCT type), COUNT(*)
FROM symbols
GROUP BY language
ORDER BY COUNT(*) DESC
""")
print(f"\nSymbols indexed:")
for language, type_count, symbol_count in cursor:
print(f" • {language}: {symbol_count} symbols in {type_count} types")
def _should_index_file(self, file_path: Path, force: bool) -> bool:
"""Check if file needs indexing"""
if force:
return True
# Check if file was modified since last index
try:
mtime = file_path.stat().st_mtime
cursor = self.conn.execute(
"SELECT MAX(last_updated) FROM symbols WHERE file_path = ?",
(str(file_path),)
)
last_indexed = cursor.fetchone()[0]
if last_indexed:
last_indexed_time = datetime.fromisoformat(last_indexed).timestamp()
return mtime > last_indexed_time
except Exception:
pass
return True
def _index_file(self, file_path: Path):
"""Index a single file based on its type"""
# Check file size first
try:
file_size = file_path.stat().st_size
max_size = 10 * 1024 * 1024 # 10MB limit
if file_size > max_size:
if "--debug" in sys.argv:
print(f"\n⚠️ Skipping {file_path}: File too large ({file_size / 1024 / 1024:.1f}MB)")
raise Exception(f"File too large: {file_size / 1024 / 1024:.1f}MB (max: 10MB)")
except OSError as e:
raise Exception(f"Cannot access file: {e}")
# Detect language based on file extension
suffix = file_path.suffix.lower()
language = None
for lang, info in self.language_patterns.items():
if suffix in info['extensions']:
language = lang
break
if not language:
return # Skip unsupported files
try:
# Try different encodings
content = None
encodings = ['utf-8', 'latin-1', 'cp1252', 'ascii']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
break
except UnicodeDecodeError:
continue
if content is None:
raise Exception("Could not decode file with any supported encoding")
# Clear existing symbols for this file (for re-indexing)
self.conn.execute("DELETE FROM symbols WHERE file_path = ?", (str(file_path),))
if language == 'python':
self._index_python_file(file_path, content)
elif language in ['javascript', 'typescript', 'java', 'cpp', 'go', 'shell', 'sql',
'ruby', 'rust', 'csharp', 'swift', 'kotlin', 'scala', 'php',
'lua', 'r', 'julia', 'dart', 'elixir', 'clojure', 'haskell',
'perl', 'objc', 'c']:
self._index_generic_language(file_path, content, language)
elif language in ['vue', 'svelte', 'html', 'css']:
self._index_web_file(file_path, content, language)
elif language in ['markdown', 'text']:
self._index_document(file_path, content, language)
elif language in ['yaml', 'docker', 'json', 'toml', 'ini', 'makefile', 'terraform',
'properties', 'env', 'gradle', 'maven', 'nginx', 'apache',
'ansible', 'vagrant', 'csv']:
self._index_config_file(file_path, content, language)
elif language == 'xml':
self._index_xml_file(file_path, content)
except UnicodeDecodeError as e:
raise Exception(f"Encoding error: {e}")
except MemoryError:
raise Exception("Out of memory while processing file")
except Exception as e:
# Re-raise with more context
raise Exception(f"{type(e).__name__}: {str(e)}")
def _index_python_file(self, file_path: Path, content: str):
"""Index Python file using AST"""
try:
tree = ast.parse(content, filename=str(file_path))
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
self._index_python_symbol(node, file_path, content, 'python')
except:
# Fall back to regex if AST fails
self._index_generic_language(file_path, content, 'python')
def _index_generic_language(self, file_path: Path, content: str, language: str):
"""Index non-Python languages using regex"""
patterns = self.language_patterns.get(language, {})
lines = content.split('\n')
# Track which lines have been indexed to avoid duplicates
indexed_lines = set()
for pattern_type, pattern in patterns.items():
if pattern_type == 'extensions':
continue
for match in pattern.finditer(content):
# Get the line number
line_num = content[:match.start()].count('\n') + 1
if line_num in indexed_lines:
continue
indexed_lines.add(line_num)
# Extract name (handle multiple capture groups)
name = None
for group in match.groups():
if group:
name = group
break
if not name:
continue
# Get context
context_start = max(0, line_num - 3)
context_end = min(len(lines), line_num + 5)
code_context = '\n'.join(lines[context_start:context_end])
# Simple signature extraction
signature = ""
if pattern_type in ['function', 'method']:
# Try to extract parameters
line = lines[line_num - 1] if line_num > 0 else ""
sig_match = re.search(rf'{name}\s*\((.*?)\)', line)
if sig_match:
signature = f"({sig_match.group(1)})"
# Create embedding with error handling
try:
# Build richer embedding text based on symbol type
if pattern_type == 'import':
# For imports, include the module being imported
embedding_text = f"import {name} from module in {language}"
elif pattern_type in ['variable', 'type', 'type_alias']:
# Include more context for variables and types
embedding_text = f"{name} {pattern_type} definition {code_context[:100]} {language}"
elif pattern_type == 'component':
# Special handling for React components
embedding_text = f"{name} React component {signature} {language}"
elif pattern_type == 'decorator':
# Include decorator context
embedding_text = f"{name} decorator {code_context[:50]} {language}"
else:
# Default for functions, classes, methods
embedding_text = f"{name} {pattern_type} {signature} {language}"
embedding = self.model.encode(embedding_text, convert_to_numpy=True)
if embedding is None or len(embedding) == 0:
raise Exception("Failed to generate embedding")
# Store in database
self.conn.execute('''
INSERT OR REPLACE INTO symbols
(name, type, file_path, line_num, end_line, signature,
docstring, code_context, embedding, last_updated, language, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
name, pattern_type, str(file_path), line_num, line_num + 5,
signature, "", code_context, embedding.tobytes(),
datetime.now().isoformat(), language, 'code'
))
except Exception as e:
if "--debug" in sys.argv:
print(f"\n⚠️ Embedding error for {name} in {file_path}: {e}")
# Store without embedding
self.conn.execute('''
INSERT OR REPLACE INTO symbols
(name, type, file_path, line_num, end_line, signature,
docstring, code_context, embedding, last_updated, language, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
name, pattern_type, str(file_path), line_num, line_num + 5,
signature, "", code_context, None,
datetime.now().isoformat(), language, 'code'
))
self.conn.commit()
def _index_document(self, file_path: Path, content: str, language: str):
"""Index document files (markdown, text)"""
patterns = self.language_patterns.get(language, {})
lines = content.split('\n')
if language == 'markdown':
# Index headers
header_pattern = patterns.get('header')
if header_pattern:
for match in header_pattern.finditer(content):
level = len(match.group(1)) # Number of #
title = match.group(2).strip()
line_num = content[:match.start()].count('\n') + 1
# Get section content (until next header of same or higher level)
section_lines = []
for i in range(line_num, len(lines)):
line = lines[i]
if re.match(rf'^#{{{1},{level}}}\s', line) and i > line_num:
break
section_lines.append(line)
section_content = '\n'.join(section_lines[:10]) # First 10 lines
# Create embedding
embedding_text = f"{title} section {section_content}"
embedding = self.model.encode(embedding_text, convert_to_numpy=True)
# Store
self.conn.execute('''
INSERT OR REPLACE INTO symbols
(name, type, file_path, line_num, end_line, signature,
docstring, code_context, embedding, last_updated, language, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
title, f"h{level}", str(file_path), line_num,
line_num + len(section_lines), "", "", section_content,
embedding.tobytes(), datetime.now().isoformat(), language, 'document'
))
self.conn.commit()
def _index_config_file(self, file_path: Path, content: str, language: str):
"""Index configuration files (YAML, Docker, JSON, etc)"""
patterns = self.language_patterns.get(language, {})
lines = content.split('\n')
# Special handling for Dockerfile
if language == 'docker' and file_path.name == 'Dockerfile':
# Index the entire Dockerfile as one entity
embedding_text = f"Dockerfile configuration {content[:500]}"
embedding = self.model.encode(embedding_text, convert_to_numpy=True)
self.conn.execute('''
INSERT OR REPLACE INTO symbols
(name, type, file_path, line_num, end_line, signature,
docstring, code_context, embedding, last_updated, language, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
file_path.name, 'dockerfile', str(file_path), 1, len(lines),
"", "", content[:1000], embedding.tobytes(),
datetime.now().isoformat(), language, 'config'
))
# Track which lines have been indexed
indexed_lines = set()
for pattern_type, pattern in patterns.items():
if pattern_type == 'extensions':
continue
for match in pattern.finditer(content):
line_num = content[:match.start()].count('\n') + 1
if line_num in indexed_lines:
continue
indexed_lines.add(line_num)
# Extract name
name = match.group(1) if match.groups() else match.group(0)
# Get context (more lines for config files)
context_start = max(0, line_num - 2)
context_end = min(len(lines), line_num + 10)
code_context = '\n'.join(lines[context_start:context_end])
# For YAML/JSON, try to capture the value too
if language in ['yaml', 'json', 'toml', 'ini']:
# Look for multi-line values
value_lines = []
indent_level = len(lines[line_num - 1]) - len(lines[line_num - 1].lstrip())
for i in range(line_num, min(len(lines), line_num + 20)):
line = lines[i]
if line.strip() and len(line) - len(line.lstrip()) <= indent_level:
break
value_lines.append(line)
if value_lines:
code_context = '\n'.join([lines[line_num - 1]] + value_lines[:10])
# Create embedding with context
embedding_text = f"{name} {pattern_type} {language} {code_context[:200]}"
embedding = self.model.encode(embedding_text, convert_to_numpy=True)
# Store in database
self.conn.execute('''
INSERT OR REPLACE INTO symbols
(name, type, file_path, line_num, end_line, signature,
docstring, code_context, embedding, last_updated, language, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
name, pattern_type, str(file_path), line_num,
line_num + len(value_lines) if 'value_lines' in locals() else line_num + 3,
"", "", code_context, embedding.tobytes(),
datetime.now().isoformat(), language, 'config'
))
self.conn.commit()
def _index_web_file(self, file_path: Path, content: str, language: str):
"""Index web files (Vue, Svelte, HTML, CSS)"""
patterns = self.language_patterns.get(language, {})
lines = content.split('\n')
# For Vue/Svelte components, try to extract component name
if language in ['vue', 'svelte']:
# Look for component name in various ways
name_match = patterns.get('component', re.compile(r'')).search(content)
if name_match:
component_name = name_match.group(1)
else:
# Use filename as component name
component_name = file_path.stem
# Index the whole component
embedding_text = f"{language} component {component_name} {content[:500]}"
embedding = self.model.encode(embedding_text, convert_to_numpy=True)
self.conn.execute('''
INSERT OR REPLACE INTO symbols
(name, type, file_path, line_num, end_line, signature,
docstring, code_context, embedding, last_updated, language, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
component_name, 'component', str(file_path), 1, len(lines),
"", "", content[:1000], embedding.tobytes(),
datetime.now().isoformat(), language, 'code'
))
# For CSS files, just index the file as a whole instead of every selector
elif language == 'css':
# Extract a summary of main classes/IDs (first 20 of each)
classes = []
ids = []
for pattern_type, pattern in patterns.items():
if pattern_type == 'class':
classes = [m.group(1) for m in pattern.finditer(content)][:20]
elif pattern_type == 'id':
ids = [m.group(1) for m in pattern.finditer(content)][:20]
summary = f"CSS file with classes: {', '.join(classes[:10])}"
if ids:
summary += f" and IDs: {', '.join(ids[:10])}"
# Index the whole CSS file
embedding_text = f"CSS stylesheet {file_path.name} {summary} {content[:500]}"
embedding = self.model.encode(embedding_text, convert_to_numpy=True)
self.conn.execute('''
INSERT OR REPLACE INTO symbols
(name, type, file_path, line_num, end_line, signature,
docstring, code_context, embedding, last_updated, language, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
file_path.name, 'stylesheet', str(file_path), 1, len(lines),
"", summary, content[:1000], embedding.tobytes(),
datetime.now().isoformat(), language, 'code'
))
# For HTML, index IDs but limit to reasonable number
elif language == 'html':
indexed_count = 0
max_items = 50 # Limit to 50 elements with IDs
for pattern_type, pattern in patterns.items():
if pattern_type == 'extensions':
continue
for match in pattern.finditer(content):
if indexed_count >= max_items:
break
line_num = content[:match.start()].count('\n') + 1
if pattern_type == 'tag' and len(match.groups()) >= 2:
tag_name = match.group(1)
id_value = match.group(2)
name = f"{tag_name}#{id_value}"
# Get context
start_line = max(0, line_num - 2)
end_line = min(len(lines), line_num + 3)
code_context = '\n'.join(lines[start_line-1:end_line])
# Create embedding
embedding_text = f"HTML element {name} {code_context}"
embedding = self.model.encode(embedding_text, convert_to_numpy=True)
self.conn.execute('''
INSERT OR REPLACE INTO symbols
(name, type, file_path, line_num, end_line, signature,
docstring, code_context, embedding, last_updated, language, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
name, 'element', str(file_path), line_num, line_num + 1,
"", "", code_context, embedding.tobytes(),
datetime.now().isoformat(), language, 'code'
))
indexed_count += 1
self.conn.commit()
def _index_xml_file(self, file_path: Path, content: str):
"""Index XML files efficiently"""
lines = content.split('\n')
# For XML files, just index the file as a whole
# Extract root element and some structure info
import re
root_match = re.search(r'<(\w+)[^>]*>', content[:1000])
root_element = root_match.group(1) if root_match else 'xml'
# Count main element types (limit to first 5000 chars for performance)
element_types = set()
for match in re.finditer(r'<(\w+)[^>]*>', content[:5000]):
element_types.add(match.group(1))
if len(element_types) >= 10: # Limit to first 10 unique elements
break
summary = f"XML file with root <{root_element}> containing elements: {', '.join(list(element_types)[:10])}"
# Index the whole XML file
embedding_text = f"XML configuration {file_path.name} {summary} {content[:500]}"
embedding = self.model.encode(embedding_text, convert_to_numpy=True)
self.conn.execute('''
INSERT OR REPLACE INTO symbols
(name, type, file_path, line_num, end_line, signature,
docstring, code_context, embedding, last_updated, language, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
file_path.name, 'xml_file', str(file_path), 1, len(lines),
"", summary, content[:1000], embedding.tobytes(),
datetime.now().isoformat(), 'xml', 'config'
))
self.conn.commit()
def _index_python_symbol(self, node, file_path: Path, file_content: str, language: str):
"""Index a single Python symbol (function or class)"""
# Basic info
name = node.name
sym_type = 'class' if isinstance(node, ast.ClassDef) else 'function'
line_num = node.lineno
end_line = node.end_lineno or line_num
# Get docstring
docstring = ast.get_docstring(node) or ""
# Get signature for functions
signature = ""
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
signature = self._get_function_signature(node)
# Get surrounding code context (few lines before/after)
lines = file_content.split('\n')
context_start = max(0, line_num - 3)
context_end = min(len(lines), end_line + 2)
code_context = '\n'.join(lines[context_start:context_end])
# Create embedding from all available info with error handling
try:
embedding_text = f"{name} {sym_type} {signature} {docstring}"
embedding = self.model.encode(embedding_text, convert_to_numpy=True)
if embedding is None or len(embedding) == 0:
raise Exception("Failed to generate embedding")
embedding_bytes = embedding.tobytes()
except Exception as e:
if "--debug" in sys.argv:
print(f"\n⚠️ Embedding error for {name} in {file_path}: {e}")
embedding_bytes = None
# Store in database
self.conn.execute('''
INSERT OR REPLACE INTO symbols
(name, type, file_path, line_num, end_line, signature,
docstring, code_context, embedding, last_updated, language, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
name, sym_type, str(file_path), line_num, end_line,
signature, docstring, code_context,
embedding_bytes, datetime.now().isoformat(), language, 'code'
))
self.conn.commit()
def _get_function_signature(self, node):
"""Extract function signature"""
args = []
for arg in node.args.args:
args.append(arg.arg)
# Add *args and **kwargs if present
if node.args.vararg:
args.append(f"*{node.args.vararg.arg}")
if node.args.kwarg:
args.append(f"**{node.args.kwarg.arg}")
return f"({', '.join(args)})"
def search(self, query: str, limit: int = 10,
search_type: str = "hybrid", file_type: str = None,
language_filter: str = None) -> List[Dict]:
"""
Search for symbols by meaning
search_type: 'semantic', 'text', or 'hybrid'
file_type: 'code', 'document', or None for all
language_filter: specific language to filter by
"""
results = []
# Log search
self._log_search(query)
if search_type in ["semantic", "hybrid"]:
# Semantic search using embeddings
semantic_results = self._semantic_search(query, limit * 2, file_type, language_filter)
results.extend(semantic_results)
if search_type in ["text", "hybrid"]:
# Traditional text search
text_results = self._text_search(query, limit, file_type, language_filter)
results.extend(text_results)
# Deduplicate and sort by score
seen = set()
unique_results = []
for r in results:
key = (r['file_path'], r['name'], r['line_num'])
if key not in seen:
seen.add(key)
unique_results.append(r)
# Sort by score and filter by minimum score
unique_results.sort(key=lambda x: x['score'], reverse=True)
min_score = self.config.get('minScore', 0.2)
filtered_results = [r for r in unique_results if r['score'] >= min_score]
return filtered_results[:limit]
def _semantic_search(self, query: str, limit: int, file_type: str = None,
language_filter: str = None) -> List[Dict]:
"""Search using AI embeddings"""
# Encode query
query_embedding = self.model.encode(query, convert_to_numpy=True)
# Build SQL with filters
sql = '''
SELECT id, name, type, file_path, line_num, signature,
docstring, embedding, language, file_type
FROM symbols
WHERE embedding IS NOT NULL
'''
params = []
if file_type:
sql += " AND file_type = ?"
params.append(file_type)
if language_filter:
sql += " AND language = ?"
params.append(language_filter)
cursor = self.conn.execute(sql, params)
results = []
for row in cursor:
sym_id, name, sym_type, file_path, line_num, sig, doc, emb_bytes, lang, ftype = row
# Calculate similarity
embedding = np.frombuffer(emb_bytes, dtype=np.float32)
similarity = np.dot(query_embedding, embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(embedding)
)
# Scale semantic scores to be comparable with text scores
# Cosine similarity is 0-1, scale to 0-10 range
scaled_score = float(similarity) * 10
results.append({
'id': sym_id,
'name': name,
'type': sym_type,
'file_path': file_path,
'line_num': line_num,
'signature': sig,
'docstring': doc,
'score': scaled_score,
'match_type': 'semantic',
'language': lang,
'file_type': ftype
})
# Sort by similarity
results.sort(key=lambda x: x['score'], reverse=True)
return results[:limit]
def _text_search(self, query: str, limit: int, file_type: str = None,
language_filter: str = None) -> List[Dict]:
"""Traditional text matching with improved scoring"""
query_lower = query.lower()
words = query_lower.split()
# Build SQL with filters
sql = '''
SELECT id, name, type, file_path, line_num, signature, docstring, code_context, language, file_type
FROM symbols
WHERE (lower(name) LIKE ?
OR lower(docstring) LIKE ?
OR lower(code_context) LIKE ?)
'''
base_params = []
if file_type:
sql += " AND file_type = ?"
base_params.append(file_type)
if language_filter:
sql += " AND language = ?"
base_params.append(language_filter)
# Get all results that match any word
all_matches = {}
for word in words:
pattern = f"%{word}%"
params = [pattern, pattern, pattern] + base_params
cursor = self.conn.execute(sql, params)
for row in cursor:
sym_id, name, sym_type, file_path, line_num, sig, doc, context, lang, ftype = row
key = (sym_id, name, file_path, line_num)
if key not in all_matches:
all_matches[key] = {
'id': sym_id,
'name': name,
'type': sym_type,
'file_path': file_path,
'line_num': line_num,
'signature': sig,
'docstring': doc,
'code_context': context,
'language': lang,
'file_type': ftype,
'words_matched': set()
}
all_matches[key]['words_matched'].add(word)
# Score each result
results = []
for key, match in all_matches.items():
score = 0
name_lower = match['name'].lower()
doc_lower = (match['docstring'] or '').lower()
context_lower = (match['code_context'] or '').lower()
# Check for exact phrase match (huge bonus)
if len(words) > 1 and query_lower in name_lower:
score += 20
elif len(words) > 1 and query_lower in doc_lower:
score += 15
elif len(words) > 1 and query_lower in context_lower:
score += 10
# Score based on individual word matches
words_found = len(match['words_matched'])
for word in match['words_matched']:
# Reduced base score for single word match
if word in name_lower:
score += 2
if name_lower.startswith(word):
score += 3
if doc_lower and word in doc_lower:
score += 1
if context_lower and word in context_lower:
score += 0.5
# Bonus for multiple word matches (compound scoring)
if words_found > 1:
score += words_found * 3
# Special boost for command-like queries
if 'git' in words and match['language'] in ['shell', 'bash', 'python']:
score += 2
match['score'] = score
match['match_type'] = 'text'
# Remove helper fields
match.pop('words_matched')
match.pop('code_context')
results.append(match)
# Sort by score and return top results
results.sort(key=lambda x: x['score'], reverse=True)
return results[:limit * 2] # Return more for deduplication later
def _log_search(self, query: str):
"""Log search for analytics"""
self.conn.execute(
"INSERT INTO search_history (query, timestamp) VALUES (?, ?)",
(query, datetime.now().isoformat())
)
self.conn.commit()
def show_code(self, symbol_id: int):
"""Display code for a symbol"""
cursor = self.conn.execute(
"SELECT name, file_path, line_num, code_context FROM symbols WHERE id = ?",
(symbol_id,)
)
row = cursor.fetchone()
if row:
name, file_path, line_num, context = row
print(f"\n📄 {file_path}:{line_num}")
print(f"🔍 {name}")
print("-" * 60)
print(context)
print("-" * 60)
# Update access count
self.conn.execute(
"UPDATE symbols SET access_count = access_count + 1 WHERE id = ?",
(symbol_id,)
)
self.conn.commit()
def interactive_mode(search):
"""Interactive search mode with VSCode integration"""
import subprocess
import readline # For better input handling
print("""
🔍 Interactive Smart Code Search
Type queries to search, then select files to open in VSCode.
Commands:
- Enter number(s) to open files (e.g., '1' or '1,3,5' or '1-5')
- 'a' to open all results
- 'n' for next page, 'p' for previous page
- 'q' to quit
- Just press Enter to search again
""")
last_results = []
current_page = 1
results_per_page = 20
while True:
try:
# Get search query
query = input("\n🔎 Search (or 'q' to quit): ").strip()
if query.lower() == 'q':
print("👋 Goodbye!")
break
if not query:
continue
# Perform search
results = search.search(query, limit=100)
if not results:
print(f"❌ No results found for '{query}'")
continue
last_results = results
current_page = 1 # Reset to first page for new search
# Calculate paging info
total_pages = (len(results) + results_per_page - 1) // results_per_page
# Display current page results
while True:
start_idx = (current_page - 1) * results_per_page
end_idx = min(start_idx + results_per_page, len(results))
page_results = results[start_idx:end_idx]
# Display results
print(f"\n📋 Found {len(results)} results (Page {current_page} of {total_pages}):\n")
# Print header row
print(f"{'#':>2} {'File':<40} {'Symbol':<20} {'Type':<9} {'Line':<6} {'Score':<6}")
# Display in compact single-line format
file_map = {}
seen_files = set()
for r in page_results:
# Calculate global index (1-based)
global_idx = results.index(r) + 1
file_path = r['file_path']
# Track unique files for opening
if file_path not in seen_files:
seen_files.add(file_path)
file_num = len(seen_files)
file_map[file_num] = file_path
# Truncate path for display
display_path = file_path
if len(display_path) > 40:
# Show start and end of path
display_path = display_path[:20] + '...' + display_path[-17:]
# Format: number, file path, symbol name (type) line score
name = r['name']
if len(name) > 20:
name = name[:17] + '...'
print(f"{global_idx:2d}. {display_path:<40} {name:<20} {r['type']:<9} {r.get('line_num', 0):<5} [{r['score']:.3f}]")
# Get user action
if total_pages > 1:
action = input("\nOpen files by: #, range: #-#, (a)ll, (n)ext, (p)rev, or <enter> new search: ").strip().lower()
else:
action = input("\nOpen files by: #, range: #-#, (a)ll, or <enter> new search: ").strip().lower()
# Handle paging navigation
if action == 'n' and current_page < total_pages:
current_page += 1
continue # Re-display current page
elif action == 'p' and current_page > 1:
current_page -= 1
continue # Re-display current page
else:
# Break inner loop to handle other actions
break
if not action:
continue
elif action == 'a':
# Open all unique files
files_to_open = list(seen_files)
else:
# Parse result numbers
files_to_open = []
result_indices = []
parts = action.replace(' ', '').split(',')
for part in parts:
if '-' in part:
# Handle range
try:
start, end = map(int, part.split('-'))
for n in range(start, min(end + 1, len(results) + 1)):
result_indices.append(n)
except:
continue
else:
# Single number
try:
n = int(part)
if 1 <= n <= len(results):
result_indices.append(n)
except:
continue
# Get files with specific line numbers from selected results
file_line_map = {}
for idx in result_indices:
if idx <= len(results):
result = results[idx - 1]
file_path = result['file_path']
line_num = result.get('line_num', 0)
# Keep first occurrence or lowest line number
if file_path not in file_line_map or line_num < file_line_map[file_path]:
file_line_map[file_path] = line_num
files_to_open = list(file_line_map.keys())
# Open in VSCode
if files_to_open:
try:
# Build VSCode command with line numbers using relative paths
vscode_args = ["code"]
for f in files_to_open:
# Use relative paths as stored in database
if f in file_line_map and file_line_map[f] > 0:
# Use -g flag for goto line
vscode_args.extend(["-g", f"{f}:{file_line_map[f]}"])
else:
vscode_args.append(f)
subprocess.run(vscode_args)
print(f"✅ Opened {len(files_to_open)} file(s) in VSCode")
except Exception as e:
print(f"❌ Error opening VSCode: {e}")
except KeyboardInterrupt:
print("\n\n👋 Goodbye!")
break
except EOFError:
print("\n\n👋 Goodbye!")
break
def show_help():
"""Display help information"""
print("""
🔍 Smart Code Search (scs) - Find code by meaning!
Usage:
scs index # Index your project
scs interactive # Interactive search mode
scs <query> # Search for code
scs show <id> # Show code for result
scs help # Show this help
Examples:
scs "validate email"
scs "handle user authentication"
scs "calculate tax" --docs
scs interactive
Options:
--json Output results in JSON format
--export=<type> Export results (vscode, list)
--docs Search only documentation
--code Search only code (default)
--config Search only config files (yaml, docker, json, etc)
--all Search code, docs, and config
--lang=<lang> Filter by language (python, js, etc)
--type=<ext> Filter by file extension
--force Force re-index all files
--incremental Only index changed files (default)
-h, --help Show this help message
""")
def main():
"""CLI interface"""
# Check for help flags anywhere in arguments
if '--help' in sys.argv or '-h' in sys.argv or (len(sys.argv) > 1 and sys.argv[1] == 'help'):
show_help()
return
# Check if JSON output is requested to suppress initialization message
quiet = "--json" in sys.argv or "--export" in sys.argv
search = SmartCodeSearch(quiet=quiet)
if len(sys.argv) < 2:
show_help()
return
command = sys.argv[1]
if command == "index":
force = "--force" in sys.argv
incremental = "--incremental" in sys.argv or not force
search.index_project(force_reindex=force, incremental=incremental)
elif command == "interactive":
interactive_mode(search)
elif command == "show" and len(sys.argv) > 2:
symbol_id = int(sys.argv[2])
search.show_code(symbol_id)
else:
# Search
args = [arg for arg in sys.argv[1:] if not arg.startswith("--")]
query = " ".join(args)
# Parse options
json_output = "--json" in sys.argv
export_type = None
file_type_filter = None
language_filter = None
# Check for export option
for arg in sys.argv:
if arg.startswith("--export="):
export_type = arg.split("=")[1]
elif arg.startswith("--lang="):
language_filter = arg.split("=")[1]
elif arg.startswith("--type="):
# Filter by file extension
ext = arg.split("=")[1]
if not ext.startswith('.'):
ext = '.' + ext
# Map extension to language
for lang, info in search.language_patterns.items():
if ext in info.get('extensions', []):
language_filter = lang
break
# Determine file type filter
if "--docs" in sys.argv:
file_type_filter = "document"
elif "--code" in sys.argv:
file_type_filter = "code"
elif "--config" in sys.argv:
file_type_filter = "config"
# --all means no filter
results = search.search(query, file_type=file_type_filter,
language_filter=language_filter)
# Handle different output formats
if export_type == "list":
# Just output file paths
unique_files = sorted(set(r['file_path'] for r in results))
for f in unique_files:
print(f)
elif export_type == "vscode":
# Open files in VSCode
unique_files = sorted(set(r['file_path'] for r in results))
if unique_files:
import subprocess
cmd = ["code"] + unique_files
subprocess.run(cmd)
print(f"✅ Opened {len(unique_files)} files in VSCode")
else:
print("❌ No files to open")
elif json_output:
# JSON output for machine consumption
output = {
"query": query,
"results": results
}
print(json.dumps(output, indent=2))
else:
# Human-readable output
if not results:
print(f"❌ No results found for '{query}'")
return
# Group by file type
code_results = [r for r in results if r.get('file_type') == 'code']
doc_results = [r for r in results if r.get('file_type') == 'document']
config_results = [r for r in results if r.get('file_type') == 'config']
print(f"\n🔍 Results for '{query}':")
if code_results:
print(f"\n📄 CODE ({len(code_results)} results):")
print(f"{'ID':<4} {'File':<40} {'Name':<25} {'Type':<12} {'Line':<6} {'Score':<6}")
print("-" * 100)
for r in code_results[:10]: # Show top 10
# Truncate file path for display
file_short = r['file_path']
if len(file_short) > 40:
file_short = file_short[:20] + '...' + file_short[-17:]
# Truncate name if too long
name = r['name']
if len(name) > 24:
name = name[:21] + '...'
print(f"{r['id']:<4} {file_short:<40} {name:<25} {r['type'][:11]:<12} "
f"{r.get('line_num', 0):<6} {r['score']:.3f}")
# Show signature or first line of docstring
if r.get('signature') and r['type'] in ['function', 'method']:
print(f" → {r['name']}{r['signature']}")
elif r.get('docstring'):
first_line = r['docstring'].split('\n')[0][:55]
if first_line:
print(f" 📝 {first_line}")
if doc_results:
print(f"\n📚 DOCUMENTATION ({len(doc_results)} results):")
print(f"{'ID':<4} {'Title':<40} {'Type':<15} {'File':<35} {'Score':<6}")
print("-" * 95)
for r in doc_results[:10]: # Show top 10
file_short = r['file_path']
if len(file_short) > 32:
file_short = "..." + file_short[-32:]
print(f"{r['id']:<4} {r['name'][:39]:<40} {r['type'][:14]:<15} "
f"{file_short:<35} {r['score']:.3f}")
if config_results:
print(f"\n⚙️ CONFIG ({len(config_results)} results):")
print(f"{'ID':<4} {'File':<40} {'Name':<25} {'Type':<12} {'Line':<6} {'Score':<6}")
print("-" * 100)
for r in config_results[:10]: # Show top 10
# Truncate file path for display
file_short = r['file_path']
if len(file_short) > 40:
file_short = file_short[:20] + '...' + file_short[-17:]
# Truncate name if too long
name_short = r['name'][:24] if len(r['name']) > 24 else r['name']
print(f"{r['id']:<4} {file_short:<40} {name_short:<25} "
f"{r['type']:<12} L{r.get('line_num', 0):<5} {r['score']:.3f}")
print(f"\n💡 Tip: Use 'scs show <id>' to see the full content")
if __name__ == "__main__":
main()