clean_search_backup.py•33.1 kB
"""
Clean SmartCodeSearch implementation for MCP server
No console output, fully thread-safe
"""
import sys
import os
import sqlite3
import hashlib
import re
import ast
import json
from pathlib import Path
from typing import List, Tuple, Optional, Dict, Any
import numpy as np
# Suppress all output from sentence_transformers
os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
# Redirect stderr to devnull during import
import io
old_stderr = sys.stderr
sys.stderr = io.StringIO()
try:
from sentence_transformers import SentenceTransformer
finally:
sys.stderr = old_stderr
from src.core.db_wrapper import ThreadSafeDB
from src.core.dependency_analyzer import DependencyAnalyzer
from src.core.usage_analyzer import UsageAnalyzer
class CleanSmartCodeSearch:
"""Clean version of SmartCodeSearch with no console output"""
def __init__(self, project_root=".", quiet=True):
self.root = Path(project_root)
self.db_path = self.root / ".claude-symbols" / "search.db"
self.db_path.parent.mkdir(exist_ok=True)
# Load model silently
self.model = SentenceTransformer('all-MiniLM-L6-v2')
# Thread-safe database
self.db = ThreadSafeDB(self.db_path)
self._init_db()
# Initialize language patterns (copied from scsold.py)
self._init_patterns()
# Initialize dependency analyzer
self.dependency_analyzer = DependencyAnalyzer(self.root)
# Initialize usage analyzer
self.usage_analyzer = UsageAnalyzer(self.root)
def _init_db(self):
"""Initialize database schema - compatible with scsold.py"""
self.db.execute('''
CREATE TABLE IF NOT EXISTS symbols (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL,
file_path TEXT NOT NULL,
line_num INTEGER NOT NULL,
end_line INTEGER,
signature TEXT,
docstring TEXT,
code_context TEXT,
embedding BLOB,
last_updated TEXT,
access_count INTEGER DEFAULT 0,
language TEXT,
file_type TEXT
)
''')
# Create indexes
try:
self.db.execute('CREATE INDEX IF NOT EXISTS idx_file_path ON symbols(file_path)')
self.db.execute('CREATE INDEX IF NOT EXISTS idx_name ON symbols(name)')
self.db.execute('CREATE INDEX IF NOT EXISTS idx_type ON symbols(type)')
except:
pass
def _init_patterns(self):
"""Initialize language patterns for symbol extraction"""
self.language_patterns = {
'python': {
'extensions': ['.py'],
'function': re.compile(r'^(?:async\s+)?def\s+(\w+)\s*\((.*?)\):', re.MULTILINE),
'class': re.compile(r'^class\s+(\w+)(?:\((.*?)\))?:', re.MULTILINE),
'method': re.compile(r'^\s+(?:async\s+)?def\s+(\w+)\s*\((.*?)\):', re.MULTILINE),
},
'javascript': {
'extensions': ['.js', '.jsx', '.mjs'],
'function': re.compile(r'(?:function\s+(\w+)|const\s+(\w+)\s*=\s*(?:async\s+)?(?:\([^)]*\)|[^=]+)\s*=>)', re.MULTILINE),
'class': re.compile(r'class\s+(\w+)(?:\s+extends\s+\w+)?', re.MULTILINE),
},
'typescript': {
'extensions': ['.ts', '.tsx'],
'function': re.compile(r'(?:export\s+)?(?:async\s+)?(?:function\s+(\w+)|const\s+(\w+)\s*(?::\s*[^=]+)?\s*=)', re.MULTILINE),
'class': re.compile(r'(?:export\s+)?class\s+(\w+)(?:\s+extends\s+\w+)?', re.MULTILINE),
'interface': re.compile(r'(?:export\s+)?interface\s+(\w+)', re.MULTILINE),
},
}
def search(self, query: str, limit: int = 10) -> List[dict]:
"""Search for code by semantic similarity - returns dict format like scsold.py"""
try:
# Encode query silently
query_embedding = self.model.encode(query, show_progress_bar=False)
# Fetch all symbols including all enhanced fields
rows = self.db.fetchall(
'''SELECT id, name, type, file_path, line_num, signature, docstring,
code_context, embedding, language, file_type,
todo_items, inline_comments, design_notes,
parameters, return_type, exceptions_raised, type_annotations,
imports, calls, called_by, inherits_from, inherited_by,
usage_frequency, test_files, example_usage, common_patterns, test_coverage
FROM symbols WHERE embedding IS NOT NULL'''
)
if not rows:
return []
# Check if query is looking for specific metadata types
is_todo_search = any(word in query.lower() for word in ['todo', 'fixme', 'xxx', 'hack', 'note'])
is_doc_search = any(word in query.lower() for word in ['documented', 'comment', 'documentation', 'docs'])
is_type_search = any(word in query.lower() for word in ['returns', 'raises', 'exception', 'type', 'parameter', '->', 'optional', 'list', 'dict'])
is_dep_search = any(word in query.lower() for word in ['imports', 'calls', 'inherits', 'uses', 'depends', 'extends'])
is_usage_search = any(word in query.lower() for word in ['tested', 'test', 'coverage', 'example', 'frequently', 'used'])
results = []
for row in rows:
(sym_id, name, sym_type, file_path, line_num, signature,
docstring, content, embedding_blob, language, file_type,
todo_items, inline_comments, design_notes,
parameters, return_type, exceptions_raised, type_annotations,
imports, calls, called_by, inherits_from, inherited_by,
usage_frequency, test_files, example_usage, common_patterns, test_coverage) = row
if not content:
content = signature or ''
embedding = np.frombuffer(embedding_blob, dtype=np.float32)
# Cosine similarity
similarity = np.dot(query_embedding, embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(embedding) + 1e-10
)
# Text matching boost
text_score = 0
if query.lower() in name.lower():
text_score = 0.3
elif content and query.lower() in content.lower():
text_score = 0.1
# Documentation boost
doc_score = 0
if is_todo_search and todo_items and todo_items != '[]':
todos = json.loads(todo_items)
for todo in todos:
if query.lower() in todo.get('text', '').lower():
doc_score = 0.4
break
elif todo.get('type', '').lower() in query.lower():
doc_score = 0.2
if is_doc_search:
# Boost well-documented code
has_docs = False
if docstring and len(docstring) > 20:
has_docs = True
doc_score += 0.15
if inline_comments and inline_comments != '[]' and len(json.loads(inline_comments)) > 3:
has_docs = True
doc_score += 0.1
if design_notes and design_notes != '[]' and len(json.loads(design_notes)) > 0:
has_docs = True
doc_score += 0.15
# Check if query terms appear in documentation
if inline_comments and inline_comments != '[]':
comments = json.loads(inline_comments)
for comment in comments:
if query.lower() in comment.lower():
doc_score += 0.2
break
# Type information boost
type_score = 0
if is_type_search:
# Check return type matches
if return_type and query.lower() in return_type.lower():
type_score += 0.3
# Check exception matches
if 'raises' in query.lower() or 'exception' in query.lower():
if exceptions_raised and exceptions_raised != '[]':
exc_list = json.loads(exceptions_raised)
for exc in exc_list:
if query.lower() in exc.lower() or exc.lower() in query.lower():
type_score += 0.35
break
# Check parameter types
if parameters and parameters != '[]':
param_list = json.loads(parameters)
for param in param_list:
if 'type' in param and param['type']:
if query.lower() in param['type'].lower():
type_score += 0.2
break
# Boost functions with rich type annotations
if return_type and parameters != '[]':
type_score += 0.05
# Dependency information boost
dep_score = 0
if is_dep_search:
# Check imports
if 'import' in query.lower() and imports and imports != '[]':
import_list = json.loads(imports)
for imp in import_list:
if query.lower() in imp.lower() or imp.lower() in query.lower():
dep_score += 0.3
break
# Check function calls
if 'calls' in query.lower() and calls and calls != '[]':
call_list = json.loads(calls)
for call in call_list:
if query.lower() in call.lower():
dep_score += 0.25
break
# Check inheritance
if ('inherits' in query.lower() or 'extends' in query.lower()) and inherits_from:
if query.lower() in inherits_from.lower():
dep_score += 0.35
# Usage context boost
usage_score = 0
if is_usage_search:
# Boost well-tested code
if 'test' in query.lower():
if test_coverage and test_coverage > 0:
usage_score += test_coverage * 0.3
if test_files and test_files != '[]':
test_file_list = json.loads(test_files)
if test_file_list:
usage_score += 0.2
# Boost frequently used code
if 'frequently' in query.lower() or 'used' in query.lower():
if usage_frequency and usage_frequency > 5:
usage_score += min(0.3, usage_frequency * 0.02)
# Boost if has examples
if 'example' in query.lower() and example_usage:
usage_score += 0.15
# General boost for well-tested, frequently used code
if test_coverage and test_coverage > 0.5:
usage_score += 0.05
if usage_frequency and usage_frequency > 10:
usage_score += 0.05
final_score = similarity + text_score + doc_score + type_score + dep_score + usage_score
# Return dict format compatible with scsold.py
results.append({
'id': sym_id,
'name': name,
'type': sym_type,
'file_path': file_path,
'line_num': line_num,
'signature': signature or '',
'docstring': docstring or '',
'content': content[:500] if content else '',
'score': final_score,
'match_type': 'semantic',
'language': language or 'unknown',
'file_type': file_type or 'code',
'todo_items': todo_items,
'inline_comments': inline_comments,
'design_notes': design_notes,
'parameters': parameters,
'return_type': return_type,
'exceptions_raised': exceptions_raised,
'type_annotations': type_annotations,
'imports': imports,
'calls': calls,
'called_by': called_by,
'inherits_from': inherits_from,
'inherited_by': inherited_by,
'usage_frequency': usage_frequency,
'test_files': test_files,
'example_usage': example_usage,
'common_patterns': common_patterns,
'test_coverage': test_coverage
})
# Sort and return top results
results.sort(key=lambda x: x['score'], reverse=True)
return results[:limit]
except Exception:
return []
def index_project(self, force: bool = False) -> None:
"""Index all files in the project"""
try:
# Get all code files
files_to_index = []
for pattern_info in self.language_patterns.values():
for ext in pattern_info['extensions']:
files_to_index.extend(self.root.rglob(f'*{ext}'))
# Filter out common directories to ignore
ignored_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', 'dist', 'build'}
files_to_index = [
f for f in files_to_index
if not any(ignored in f.parts for ignored in ignored_dirs)
]
for file_path in files_to_index:
self.index_file(file_path, force)
except Exception:
pass
def index_file(self, file_path: Path, force: bool = False) -> None:
"""Index a single file"""
try:
# Normalize path to relative
if file_path.is_absolute():
try:
file_path = file_path.relative_to(self.root)
except ValueError:
# Path is outside project root, use as is
pass
# Get file hash
file_hash = self._get_file_hash(self.root / file_path if not file_path.is_absolute() else file_path)
# Check if already indexed (check both relative and absolute paths)
if not force:
existing = self.db.fetchone(
'SELECT COUNT(*) FROM symbols WHERE file_path = ? OR file_path = ?',
(str(file_path), str(self.root / file_path))
)
if existing and existing[0] > 0:
return
# Delete old entries (both relative and absolute)
self.db.execute('DELETE FROM symbols WHERE file_path = ? OR file_path = ?',
(str(file_path), str(self.root / file_path)))
# Read file content for documentation extraction
full_path = self.root / file_path if not file_path.is_absolute() else file_path
try:
with open(full_path, 'r', encoding='utf-8') as f:
file_content = f.read()
except UnicodeDecodeError:
with open(full_path, 'r', encoding='latin-1') as f:
file_content = f.read()
# Determine language from file extension
ext = file_path.suffix
language = 'python' if ext == '.py' else 'javascript' if ext in ['.js', '.jsx'] else 'typescript' if ext in ['.ts', '.tsx'] else 'unknown'
# Extract documentation for the entire file
file_docs = self._extract_documentation(file_content, language)
# Extract symbols
symbols = self._extract_symbols(file_path)
# Index each symbol
from datetime import datetime
for symbol_name, symbol_type, content, node in symbols:
# Extract type information if we have an AST node
type_info = {'parameters': '[]', 'return_type': '', 'exceptions_raised': '[]', 'type_annotations': '{}'}
if node and language == 'python':
type_info = self._extract_type_info(node)
# Extract dependency information
dep_info = {'imports': '[]', 'calls': '[]', 'called_by': '[]', 'inherits_from': '', 'inherited_by': '[]'}
if language == 'python':
dep_info = self._extract_dependencies(file_path, node)
# Extract usage context
usage_info = {'usage_frequency': 0, 'test_files': '[]', 'example_usage': '', 'common_patterns': '[]', 'test_coverage': 0.0}
if language == 'python':
usage_info = self._extract_usage_context(symbol_name, file_path)
# Create enhanced content for embedding that includes documentation and types
enhanced_content = content
if file_docs.get('todo_items') and file_docs['todo_items'] != '[]':
todos = json.loads(file_docs['todo_items'])
todo_text = ' '.join([f"{t['type']}: {t['text']}" for t in todos[:5]])
enhanced_content = f"{content}\n{todo_text}"
# Add type information to enhanced content for better semantic search
if type_info['return_type']:
enhanced_content = f"{enhanced_content}\nReturns: {type_info['return_type']}"
if type_info['exceptions_raised'] != '[]':
exceptions = json.loads(type_info['exceptions_raised'])
if exceptions:
enhanced_content = f"{enhanced_content}\nRaises: {', '.join(exceptions)}"
embedding = self.model.encode(enhanced_content, show_progress_bar=False)
embedding_blob = embedding.tobytes()
# Get line number from AST node if available
line_num = getattr(node, 'lineno', 1) if node else 1
# Get signature if it's a function
signature = ''
if node and isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
try:
signature = f"{node.name}({', '.join([a.arg for a in node.args.args])})"
except:
signature = ''
self.db.execute(
'''INSERT INTO symbols
(name, type, file_path, line_num, signature, code_context, embedding,
last_updated, language, file_type, todo_items, inline_comments, design_notes,
parameters, return_type, exceptions_raised, type_annotations,
imports, calls, called_by, inherits_from, inherited_by,
usage_frequency, test_files, example_usage, common_patterns, test_coverage)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(symbol_name, symbol_type, str(file_path), line_num, signature, content,
embedding_blob, datetime.now().isoformat(), language, 'code',
file_docs.get('todo_items', '[]'),
file_docs.get('inline_comments', '[]'),
file_docs.get('design_notes', '[]'),
type_info['parameters'],
type_info['return_type'],
type_info['exceptions_raised'],
type_info['type_annotations'],
dep_info['imports'],
dep_info['calls'],
dep_info['called_by'],
dep_info['inherits_from'],
dep_info['inherited_by'],
usage_info['usage_frequency'],
usage_info['test_files'],
usage_info['example_usage'],
usage_info['common_patterns'],
usage_info['test_coverage'])
)
except Exception:
pass
def _get_file_hash(self, file_path: Path) -> str:
"""Get hash of file contents"""
try:
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
except:
return ""
def _extract_usage_context(self, symbol_name: str, file_path: Path) -> Dict[str, Any]:
"""Extract usage context and patterns for a symbol"""
# Get usage information from analyzer
usage_info = self.usage_analyzer.analyze_symbol_usage(symbol_name, file_path)
# Convert to database format
return {
'usage_frequency': usage_info.get('usage_frequency', 0),
'test_files': json.dumps(usage_info.get('test_files', [])),
'example_usage': usage_info.get('example_usage', '')[:500], # Limit size
'common_patterns': json.dumps(usage_info.get('common_patterns', [])),
'test_coverage': usage_info.get('test_coverage', 0.0)
}
def _extract_dependencies(self, file_path: Path, node: ast.AST = None, full_tree: ast.AST = None) -> Dict[str, str]:
"""Extract dependency information for a symbol"""
dep_info = {
'imports': [],
'calls': [],
'called_by': [],
'inherits_from': '',
'inherited_by': []
}
# Analyze the file for dependencies
full_path = self.root / file_path if not file_path.is_absolute() else file_path
file_deps = self.dependency_analyzer.analyze_file(full_path)
if node and hasattr(node, 'name'):
symbol_name = node.name
if symbol_name in file_deps:
symbol_info = file_deps[symbol_name]
dep_info['imports'] = symbol_info.get('imports', [])
dep_info['calls'] = symbol_info.get('calls', [])
dep_info['inherits_from'] = symbol_info.get('inherits_from', '') or ''
# Convert to JSON for storage
return {
'imports': json.dumps(dep_info['imports']),
'calls': json.dumps(dep_info['calls']),
'called_by': json.dumps(dep_info['called_by']),
'inherits_from': dep_info['inherits_from'],
'inherited_by': json.dumps(dep_info['inherited_by'])
}
def _extract_type_info(self, node: ast.AST, source_lines: List[str] = None) -> Dict[str, str]:
"""Extract type information from Python AST nodes"""
type_info = {
'parameters': [],
'return_type': None,
'exceptions_raised': [],
'type_annotations': {}
}
# Extract function/method parameters and types
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
# Extract parameters
for arg in node.args.args:
param_info = {'name': arg.arg}
# Get type annotation if present
if arg.annotation:
param_info['type'] = ast.unparse(arg.annotation) if hasattr(ast, 'unparse') else str(arg.annotation)
type_info['type_annotations'][arg.arg] = param_info['type']
type_info['parameters'].append(param_info)
# Extract return type
if node.returns:
type_info['return_type'] = ast.unparse(node.returns) if hasattr(ast, 'unparse') else str(node.returns)
# Find raised exceptions in function body
for child in ast.walk(node):
if isinstance(child, ast.Raise):
if child.exc:
if isinstance(child.exc, ast.Call) and isinstance(child.exc.func, ast.Name):
type_info['exceptions_raised'].append(child.exc.func.id)
elif isinstance(child.exc, ast.Name):
type_info['exceptions_raised'].append(child.exc.id)
# Extract class inheritance
elif isinstance(node, ast.ClassDef):
# Get base classes
base_classes = []
for base in node.bases:
if isinstance(base, ast.Name):
base_classes.append(base.id)
elif isinstance(base, ast.Attribute):
base_classes.append(ast.unparse(base) if hasattr(ast, 'unparse') else str(base))
if base_classes:
type_info['type_annotations']['inherits_from'] = base_classes
# Check for type hints in class attributes
for item in node.body:
if isinstance(item, ast.AnnAssign) and item.annotation:
attr_name = item.target.id if isinstance(item.target, ast.Name) else str(item.target)
attr_type = ast.unparse(item.annotation) if hasattr(ast, 'unparse') else str(item.annotation)
type_info['type_annotations'][attr_name] = attr_type
# Convert to JSON strings for database storage
return {
'parameters': json.dumps(type_info['parameters']),
'return_type': type_info['return_type'] or '',
'exceptions_raised': json.dumps(list(set(type_info['exceptions_raised']))), # Remove duplicates
'type_annotations': json.dumps(type_info['type_annotations'])
}
def _extract_documentation(self, content: str, language: str = 'python', node=None) -> Dict[str, str]:
"""Extract documentation, comments, and TODOs from source code"""
# Extract TODO/FIXME/XXX/HACK/NOTE items
todo_pattern = r'(?:#|//|/\*|\*)\s*(TODO|FIXME|XXX|HACK|NOTE)[:|\s]\s*(.+?)(?:\*/|\n|$)'
todos = re.findall(todo_pattern, content, re.MULTILINE | re.IGNORECASE)
# Extract inline comments based on language
inline_comments = []
if language == 'python':
# Python inline comments (excluding TODOs)
inline_pattern = r'#\s*(?!TODO|FIXME|XXX|HACK|NOTE)(.+?)$'
inline_comments = re.findall(inline_pattern, content, re.MULTILINE | re.IGNORECASE)
elif language in ['javascript', 'typescript']:
# JS/TS inline comments
inline_pattern = r'//\s*(?!TODO|FIXME|XXX|HACK|NOTE)(.+?)$'
inline_comments = re.findall(inline_pattern, content, re.MULTILINE | re.IGNORECASE)
# Extract block comments and design notes
design_notes = []
if language == 'python':
# Python docstrings and block comments
block_pattern = r'"""(.*?)"""|\'\'\'(.*?)\'\'\'|^\s*#\s*(.+(?:\n\s*#.+)*)'
blocks = re.findall(block_pattern, content, re.DOTALL | re.MULTILINE)
for block in blocks:
text = block[0] or block[1] or block[2]
if text and len(text.strip()) > 20: # Only meaningful blocks
design_notes.append(text.strip())
elif language in ['javascript', 'typescript']:
# JS/TS block comments
block_pattern = r'/\*\*(.*?)\*/|/\*(.*?)\*/'
blocks = re.findall(block_pattern, content, re.DOTALL)
for block in blocks:
text = block[0] or block[1]
if text and len(text.strip()) > 20:
design_notes.append(text.strip())
# Format as JSON strings for database storage
todo_items = [{"type": t[0].upper(), "text": t[1].strip()} for t in todos]
return {
"todo_items": json.dumps(todo_items[:50]), # Limit to 50 TODOs
"inline_comments": json.dumps([c.strip() for c in inline_comments[:30] if c.strip()]), # Limit to 30 comments
"design_notes": json.dumps(design_notes[:10]) # Limit to 10 design blocks
}
def _extract_symbols(self, file_path: Path) -> List[Tuple[str, str, str, Optional[ast.AST]]]:
"""Extract symbols from a file - returns (name, type, content, node)"""
symbols = []
try:
# Resolve full path for reading
full_path = self.root / file_path if not file_path.is_absolute() else file_path
# Read file content
try:
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
with open(full_path, 'r', encoding='latin-1') as f:
content = f.read()
# Determine file type
ext = file_path.suffix
# Special handling for Python files using AST
if ext == '.py':
try:
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
func_content = self._get_node_content(content, node)
symbols.append((node.name, 'function', func_content, node))
elif isinstance(node, ast.ClassDef):
class_content = self._get_node_content(content, node)
symbols.append((node.name, 'class', class_content, node))
except:
pass
# Use regex patterns for other languages
for lang, patterns in self.language_patterns.items():
if ext in patterns['extensions']:
for pattern_name, pattern in patterns.items():
if pattern_name != 'extensions':
matches = pattern.finditer(content)
for match in matches:
symbol_name = match.group(1) or (match.group(2) if len(match.groups()) > 1 else None)
if symbol_name:
# Get surrounding context
start = max(0, match.start() - 100)
end = min(len(content), match.end() + 500)
context = content[start:end]
symbols.append((symbol_name, pattern_name, context, None))
# If no symbols found, index the whole file
if not symbols and len(content) > 0:
file_name = file_path.stem
symbols.append((file_name, 'file', content[:1000], None))
except Exception:
pass
return symbols
def _get_node_content(self, content: str, node) -> str:
"""Get content for an AST node"""
try:
lines = content.split('\n')
start_line = node.lineno - 1
end_line = getattr(node, 'end_lineno', start_line + 10)
return '\n'.join(lines[start_line:end_line])[:1000]
except:
return ""
# Use this as the default export
SmartCodeSearch = CleanSmartCodeSearch
__all__ = ['SmartCodeSearch', 'CleanSmartCodeSearch']