Skip to main content
Glama

Smart Code Search MCP Server

clean_search_backup.py33.1 kB
""" Clean SmartCodeSearch implementation for MCP server No console output, fully thread-safe """ import sys import os import sqlite3 import hashlib import re import ast import json from pathlib import Path from typing import List, Tuple, Optional, Dict, Any import numpy as np # Suppress all output from sentence_transformers os.environ['TRANSFORMERS_VERBOSITY'] = 'error' os.environ['TOKENIZERS_PARALLELISM'] = 'false' # Redirect stderr to devnull during import import io old_stderr = sys.stderr sys.stderr = io.StringIO() try: from sentence_transformers import SentenceTransformer finally: sys.stderr = old_stderr from src.core.db_wrapper import ThreadSafeDB from src.core.dependency_analyzer import DependencyAnalyzer from src.core.usage_analyzer import UsageAnalyzer class CleanSmartCodeSearch: """Clean version of SmartCodeSearch with no console output""" def __init__(self, project_root=".", quiet=True): self.root = Path(project_root) self.db_path = self.root / ".claude-symbols" / "search.db" self.db_path.parent.mkdir(exist_ok=True) # Load model silently self.model = SentenceTransformer('all-MiniLM-L6-v2') # Thread-safe database self.db = ThreadSafeDB(self.db_path) self._init_db() # Initialize language patterns (copied from scsold.py) self._init_patterns() # Initialize dependency analyzer self.dependency_analyzer = DependencyAnalyzer(self.root) # Initialize usage analyzer self.usage_analyzer = UsageAnalyzer(self.root) def _init_db(self): """Initialize database schema - compatible with scsold.py""" self.db.execute(''' CREATE TABLE IF NOT EXISTS symbols ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, type TEXT NOT NULL, file_path TEXT NOT NULL, line_num INTEGER NOT NULL, end_line INTEGER, signature TEXT, docstring TEXT, code_context TEXT, embedding BLOB, last_updated TEXT, access_count INTEGER DEFAULT 0, language TEXT, file_type TEXT ) ''') # Create indexes try: self.db.execute('CREATE INDEX IF NOT EXISTS idx_file_path ON symbols(file_path)') self.db.execute('CREATE INDEX IF NOT EXISTS idx_name ON symbols(name)') self.db.execute('CREATE INDEX IF NOT EXISTS idx_type ON symbols(type)') except: pass def _init_patterns(self): """Initialize language patterns for symbol extraction""" self.language_patterns = { 'python': { 'extensions': ['.py'], 'function': re.compile(r'^(?:async\s+)?def\s+(\w+)\s*\((.*?)\):', re.MULTILINE), 'class': re.compile(r'^class\s+(\w+)(?:\((.*?)\))?:', re.MULTILINE), 'method': re.compile(r'^\s+(?:async\s+)?def\s+(\w+)\s*\((.*?)\):', re.MULTILINE), }, 'javascript': { 'extensions': ['.js', '.jsx', '.mjs'], 'function': re.compile(r'(?:function\s+(\w+)|const\s+(\w+)\s*=\s*(?:async\s+)?(?:\([^)]*\)|[^=]+)\s*=>)', re.MULTILINE), 'class': re.compile(r'class\s+(\w+)(?:\s+extends\s+\w+)?', re.MULTILINE), }, 'typescript': { 'extensions': ['.ts', '.tsx'], 'function': re.compile(r'(?:export\s+)?(?:async\s+)?(?:function\s+(\w+)|const\s+(\w+)\s*(?::\s*[^=]+)?\s*=)', re.MULTILINE), 'class': re.compile(r'(?:export\s+)?class\s+(\w+)(?:\s+extends\s+\w+)?', re.MULTILINE), 'interface': re.compile(r'(?:export\s+)?interface\s+(\w+)', re.MULTILINE), }, } def search(self, query: str, limit: int = 10) -> List[dict]: """Search for code by semantic similarity - returns dict format like scsold.py""" try: # Encode query silently query_embedding = self.model.encode(query, show_progress_bar=False) # Fetch all symbols including all enhanced fields rows = self.db.fetchall( '''SELECT id, name, type, file_path, line_num, signature, docstring, code_context, embedding, language, file_type, todo_items, inline_comments, design_notes, parameters, return_type, exceptions_raised, type_annotations, imports, calls, called_by, inherits_from, inherited_by, usage_frequency, test_files, example_usage, common_patterns, test_coverage FROM symbols WHERE embedding IS NOT NULL''' ) if not rows: return [] # Check if query is looking for specific metadata types is_todo_search = any(word in query.lower() for word in ['todo', 'fixme', 'xxx', 'hack', 'note']) is_doc_search = any(word in query.lower() for word in ['documented', 'comment', 'documentation', 'docs']) is_type_search = any(word in query.lower() for word in ['returns', 'raises', 'exception', 'type', 'parameter', '->', 'optional', 'list', 'dict']) is_dep_search = any(word in query.lower() for word in ['imports', 'calls', 'inherits', 'uses', 'depends', 'extends']) is_usage_search = any(word in query.lower() for word in ['tested', 'test', 'coverage', 'example', 'frequently', 'used']) results = [] for row in rows: (sym_id, name, sym_type, file_path, line_num, signature, docstring, content, embedding_blob, language, file_type, todo_items, inline_comments, design_notes, parameters, return_type, exceptions_raised, type_annotations, imports, calls, called_by, inherits_from, inherited_by, usage_frequency, test_files, example_usage, common_patterns, test_coverage) = row if not content: content = signature or '' embedding = np.frombuffer(embedding_blob, dtype=np.float32) # Cosine similarity similarity = np.dot(query_embedding, embedding) / ( np.linalg.norm(query_embedding) * np.linalg.norm(embedding) + 1e-10 ) # Text matching boost text_score = 0 if query.lower() in name.lower(): text_score = 0.3 elif content and query.lower() in content.lower(): text_score = 0.1 # Documentation boost doc_score = 0 if is_todo_search and todo_items and todo_items != '[]': todos = json.loads(todo_items) for todo in todos: if query.lower() in todo.get('text', '').lower(): doc_score = 0.4 break elif todo.get('type', '').lower() in query.lower(): doc_score = 0.2 if is_doc_search: # Boost well-documented code has_docs = False if docstring and len(docstring) > 20: has_docs = True doc_score += 0.15 if inline_comments and inline_comments != '[]' and len(json.loads(inline_comments)) > 3: has_docs = True doc_score += 0.1 if design_notes and design_notes != '[]' and len(json.loads(design_notes)) > 0: has_docs = True doc_score += 0.15 # Check if query terms appear in documentation if inline_comments and inline_comments != '[]': comments = json.loads(inline_comments) for comment in comments: if query.lower() in comment.lower(): doc_score += 0.2 break # Type information boost type_score = 0 if is_type_search: # Check return type matches if return_type and query.lower() in return_type.lower(): type_score += 0.3 # Check exception matches if 'raises' in query.lower() or 'exception' in query.lower(): if exceptions_raised and exceptions_raised != '[]': exc_list = json.loads(exceptions_raised) for exc in exc_list: if query.lower() in exc.lower() or exc.lower() in query.lower(): type_score += 0.35 break # Check parameter types if parameters and parameters != '[]': param_list = json.loads(parameters) for param in param_list: if 'type' in param and param['type']: if query.lower() in param['type'].lower(): type_score += 0.2 break # Boost functions with rich type annotations if return_type and parameters != '[]': type_score += 0.05 # Dependency information boost dep_score = 0 if is_dep_search: # Check imports if 'import' in query.lower() and imports and imports != '[]': import_list = json.loads(imports) for imp in import_list: if query.lower() in imp.lower() or imp.lower() in query.lower(): dep_score += 0.3 break # Check function calls if 'calls' in query.lower() and calls and calls != '[]': call_list = json.loads(calls) for call in call_list: if query.lower() in call.lower(): dep_score += 0.25 break # Check inheritance if ('inherits' in query.lower() or 'extends' in query.lower()) and inherits_from: if query.lower() in inherits_from.lower(): dep_score += 0.35 # Usage context boost usage_score = 0 if is_usage_search: # Boost well-tested code if 'test' in query.lower(): if test_coverage and test_coverage > 0: usage_score += test_coverage * 0.3 if test_files and test_files != '[]': test_file_list = json.loads(test_files) if test_file_list: usage_score += 0.2 # Boost frequently used code if 'frequently' in query.lower() or 'used' in query.lower(): if usage_frequency and usage_frequency > 5: usage_score += min(0.3, usage_frequency * 0.02) # Boost if has examples if 'example' in query.lower() and example_usage: usage_score += 0.15 # General boost for well-tested, frequently used code if test_coverage and test_coverage > 0.5: usage_score += 0.05 if usage_frequency and usage_frequency > 10: usage_score += 0.05 final_score = similarity + text_score + doc_score + type_score + dep_score + usage_score # Return dict format compatible with scsold.py results.append({ 'id': sym_id, 'name': name, 'type': sym_type, 'file_path': file_path, 'line_num': line_num, 'signature': signature or '', 'docstring': docstring or '', 'content': content[:500] if content else '', 'score': final_score, 'match_type': 'semantic', 'language': language or 'unknown', 'file_type': file_type or 'code', 'todo_items': todo_items, 'inline_comments': inline_comments, 'design_notes': design_notes, 'parameters': parameters, 'return_type': return_type, 'exceptions_raised': exceptions_raised, 'type_annotations': type_annotations, 'imports': imports, 'calls': calls, 'called_by': called_by, 'inherits_from': inherits_from, 'inherited_by': inherited_by, 'usage_frequency': usage_frequency, 'test_files': test_files, 'example_usage': example_usage, 'common_patterns': common_patterns, 'test_coverage': test_coverage }) # Sort and return top results results.sort(key=lambda x: x['score'], reverse=True) return results[:limit] except Exception: return [] def index_project(self, force: bool = False) -> None: """Index all files in the project""" try: # Get all code files files_to_index = [] for pattern_info in self.language_patterns.values(): for ext in pattern_info['extensions']: files_to_index.extend(self.root.rglob(f'*{ext}')) # Filter out common directories to ignore ignored_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', 'dist', 'build'} files_to_index = [ f for f in files_to_index if not any(ignored in f.parts for ignored in ignored_dirs) ] for file_path in files_to_index: self.index_file(file_path, force) except Exception: pass def index_file(self, file_path: Path, force: bool = False) -> None: """Index a single file""" try: # Normalize path to relative if file_path.is_absolute(): try: file_path = file_path.relative_to(self.root) except ValueError: # Path is outside project root, use as is pass # Get file hash file_hash = self._get_file_hash(self.root / file_path if not file_path.is_absolute() else file_path) # Check if already indexed (check both relative and absolute paths) if not force: existing = self.db.fetchone( 'SELECT COUNT(*) FROM symbols WHERE file_path = ? OR file_path = ?', (str(file_path), str(self.root / file_path)) ) if existing and existing[0] > 0: return # Delete old entries (both relative and absolute) self.db.execute('DELETE FROM symbols WHERE file_path = ? OR file_path = ?', (str(file_path), str(self.root / file_path))) # Read file content for documentation extraction full_path = self.root / file_path if not file_path.is_absolute() else file_path try: with open(full_path, 'r', encoding='utf-8') as f: file_content = f.read() except UnicodeDecodeError: with open(full_path, 'r', encoding='latin-1') as f: file_content = f.read() # Determine language from file extension ext = file_path.suffix language = 'python' if ext == '.py' else 'javascript' if ext in ['.js', '.jsx'] else 'typescript' if ext in ['.ts', '.tsx'] else 'unknown' # Extract documentation for the entire file file_docs = self._extract_documentation(file_content, language) # Extract symbols symbols = self._extract_symbols(file_path) # Index each symbol from datetime import datetime for symbol_name, symbol_type, content, node in symbols: # Extract type information if we have an AST node type_info = {'parameters': '[]', 'return_type': '', 'exceptions_raised': '[]', 'type_annotations': '{}'} if node and language == 'python': type_info = self._extract_type_info(node) # Extract dependency information dep_info = {'imports': '[]', 'calls': '[]', 'called_by': '[]', 'inherits_from': '', 'inherited_by': '[]'} if language == 'python': dep_info = self._extract_dependencies(file_path, node) # Extract usage context usage_info = {'usage_frequency': 0, 'test_files': '[]', 'example_usage': '', 'common_patterns': '[]', 'test_coverage': 0.0} if language == 'python': usage_info = self._extract_usage_context(symbol_name, file_path) # Create enhanced content for embedding that includes documentation and types enhanced_content = content if file_docs.get('todo_items') and file_docs['todo_items'] != '[]': todos = json.loads(file_docs['todo_items']) todo_text = ' '.join([f"{t['type']}: {t['text']}" for t in todos[:5]]) enhanced_content = f"{content}\n{todo_text}" # Add type information to enhanced content for better semantic search if type_info['return_type']: enhanced_content = f"{enhanced_content}\nReturns: {type_info['return_type']}" if type_info['exceptions_raised'] != '[]': exceptions = json.loads(type_info['exceptions_raised']) if exceptions: enhanced_content = f"{enhanced_content}\nRaises: {', '.join(exceptions)}" embedding = self.model.encode(enhanced_content, show_progress_bar=False) embedding_blob = embedding.tobytes() # Get line number from AST node if available line_num = getattr(node, 'lineno', 1) if node else 1 # Get signature if it's a function signature = '' if node and isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): try: signature = f"{node.name}({', '.join([a.arg for a in node.args.args])})" except: signature = '' self.db.execute( '''INSERT INTO symbols (name, type, file_path, line_num, signature, code_context, embedding, last_updated, language, file_type, todo_items, inline_comments, design_notes, parameters, return_type, exceptions_raised, type_annotations, imports, calls, called_by, inherits_from, inherited_by, usage_frequency, test_files, example_usage, common_patterns, test_coverage) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', (symbol_name, symbol_type, str(file_path), line_num, signature, content, embedding_blob, datetime.now().isoformat(), language, 'code', file_docs.get('todo_items', '[]'), file_docs.get('inline_comments', '[]'), file_docs.get('design_notes', '[]'), type_info['parameters'], type_info['return_type'], type_info['exceptions_raised'], type_info['type_annotations'], dep_info['imports'], dep_info['calls'], dep_info['called_by'], dep_info['inherits_from'], dep_info['inherited_by'], usage_info['usage_frequency'], usage_info['test_files'], usage_info['example_usage'], usage_info['common_patterns'], usage_info['test_coverage']) ) except Exception: pass def _get_file_hash(self, file_path: Path) -> str: """Get hash of file contents""" try: with open(file_path, 'rb') as f: return hashlib.md5(f.read()).hexdigest() except: return "" def _extract_usage_context(self, symbol_name: str, file_path: Path) -> Dict[str, Any]: """Extract usage context and patterns for a symbol""" # Get usage information from analyzer usage_info = self.usage_analyzer.analyze_symbol_usage(symbol_name, file_path) # Convert to database format return { 'usage_frequency': usage_info.get('usage_frequency', 0), 'test_files': json.dumps(usage_info.get('test_files', [])), 'example_usage': usage_info.get('example_usage', '')[:500], # Limit size 'common_patterns': json.dumps(usage_info.get('common_patterns', [])), 'test_coverage': usage_info.get('test_coverage', 0.0) } def _extract_dependencies(self, file_path: Path, node: ast.AST = None, full_tree: ast.AST = None) -> Dict[str, str]: """Extract dependency information for a symbol""" dep_info = { 'imports': [], 'calls': [], 'called_by': [], 'inherits_from': '', 'inherited_by': [] } # Analyze the file for dependencies full_path = self.root / file_path if not file_path.is_absolute() else file_path file_deps = self.dependency_analyzer.analyze_file(full_path) if node and hasattr(node, 'name'): symbol_name = node.name if symbol_name in file_deps: symbol_info = file_deps[symbol_name] dep_info['imports'] = symbol_info.get('imports', []) dep_info['calls'] = symbol_info.get('calls', []) dep_info['inherits_from'] = symbol_info.get('inherits_from', '') or '' # Convert to JSON for storage return { 'imports': json.dumps(dep_info['imports']), 'calls': json.dumps(dep_info['calls']), 'called_by': json.dumps(dep_info['called_by']), 'inherits_from': dep_info['inherits_from'], 'inherited_by': json.dumps(dep_info['inherited_by']) } def _extract_type_info(self, node: ast.AST, source_lines: List[str] = None) -> Dict[str, str]: """Extract type information from Python AST nodes""" type_info = { 'parameters': [], 'return_type': None, 'exceptions_raised': [], 'type_annotations': {} } # Extract function/method parameters and types if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): # Extract parameters for arg in node.args.args: param_info = {'name': arg.arg} # Get type annotation if present if arg.annotation: param_info['type'] = ast.unparse(arg.annotation) if hasattr(ast, 'unparse') else str(arg.annotation) type_info['type_annotations'][arg.arg] = param_info['type'] type_info['parameters'].append(param_info) # Extract return type if node.returns: type_info['return_type'] = ast.unparse(node.returns) if hasattr(ast, 'unparse') else str(node.returns) # Find raised exceptions in function body for child in ast.walk(node): if isinstance(child, ast.Raise): if child.exc: if isinstance(child.exc, ast.Call) and isinstance(child.exc.func, ast.Name): type_info['exceptions_raised'].append(child.exc.func.id) elif isinstance(child.exc, ast.Name): type_info['exceptions_raised'].append(child.exc.id) # Extract class inheritance elif isinstance(node, ast.ClassDef): # Get base classes base_classes = [] for base in node.bases: if isinstance(base, ast.Name): base_classes.append(base.id) elif isinstance(base, ast.Attribute): base_classes.append(ast.unparse(base) if hasattr(ast, 'unparse') else str(base)) if base_classes: type_info['type_annotations']['inherits_from'] = base_classes # Check for type hints in class attributes for item in node.body: if isinstance(item, ast.AnnAssign) and item.annotation: attr_name = item.target.id if isinstance(item.target, ast.Name) else str(item.target) attr_type = ast.unparse(item.annotation) if hasattr(ast, 'unparse') else str(item.annotation) type_info['type_annotations'][attr_name] = attr_type # Convert to JSON strings for database storage return { 'parameters': json.dumps(type_info['parameters']), 'return_type': type_info['return_type'] or '', 'exceptions_raised': json.dumps(list(set(type_info['exceptions_raised']))), # Remove duplicates 'type_annotations': json.dumps(type_info['type_annotations']) } def _extract_documentation(self, content: str, language: str = 'python', node=None) -> Dict[str, str]: """Extract documentation, comments, and TODOs from source code""" # Extract TODO/FIXME/XXX/HACK/NOTE items todo_pattern = r'(?:#|//|/\*|\*)\s*(TODO|FIXME|XXX|HACK|NOTE)[:|\s]\s*(.+?)(?:\*/|\n|$)' todos = re.findall(todo_pattern, content, re.MULTILINE | re.IGNORECASE) # Extract inline comments based on language inline_comments = [] if language == 'python': # Python inline comments (excluding TODOs) inline_pattern = r'#\s*(?!TODO|FIXME|XXX|HACK|NOTE)(.+?)$' inline_comments = re.findall(inline_pattern, content, re.MULTILINE | re.IGNORECASE) elif language in ['javascript', 'typescript']: # JS/TS inline comments inline_pattern = r'//\s*(?!TODO|FIXME|XXX|HACK|NOTE)(.+?)$' inline_comments = re.findall(inline_pattern, content, re.MULTILINE | re.IGNORECASE) # Extract block comments and design notes design_notes = [] if language == 'python': # Python docstrings and block comments block_pattern = r'"""(.*?)"""|\'\'\'(.*?)\'\'\'|^\s*#\s*(.+(?:\n\s*#.+)*)' blocks = re.findall(block_pattern, content, re.DOTALL | re.MULTILINE) for block in blocks: text = block[0] or block[1] or block[2] if text and len(text.strip()) > 20: # Only meaningful blocks design_notes.append(text.strip()) elif language in ['javascript', 'typescript']: # JS/TS block comments block_pattern = r'/\*\*(.*?)\*/|/\*(.*?)\*/' blocks = re.findall(block_pattern, content, re.DOTALL) for block in blocks: text = block[0] or block[1] if text and len(text.strip()) > 20: design_notes.append(text.strip()) # Format as JSON strings for database storage todo_items = [{"type": t[0].upper(), "text": t[1].strip()} for t in todos] return { "todo_items": json.dumps(todo_items[:50]), # Limit to 50 TODOs "inline_comments": json.dumps([c.strip() for c in inline_comments[:30] if c.strip()]), # Limit to 30 comments "design_notes": json.dumps(design_notes[:10]) # Limit to 10 design blocks } def _extract_symbols(self, file_path: Path) -> List[Tuple[str, str, str, Optional[ast.AST]]]: """Extract symbols from a file - returns (name, type, content, node)""" symbols = [] try: # Resolve full path for reading full_path = self.root / file_path if not file_path.is_absolute() else file_path # Read file content try: with open(full_path, 'r', encoding='utf-8') as f: content = f.read() except UnicodeDecodeError: with open(full_path, 'r', encoding='latin-1') as f: content = f.read() # Determine file type ext = file_path.suffix # Special handling for Python files using AST if ext == '.py': try: tree = ast.parse(content) for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): func_content = self._get_node_content(content, node) symbols.append((node.name, 'function', func_content, node)) elif isinstance(node, ast.ClassDef): class_content = self._get_node_content(content, node) symbols.append((node.name, 'class', class_content, node)) except: pass # Use regex patterns for other languages for lang, patterns in self.language_patterns.items(): if ext in patterns['extensions']: for pattern_name, pattern in patterns.items(): if pattern_name != 'extensions': matches = pattern.finditer(content) for match in matches: symbol_name = match.group(1) or (match.group(2) if len(match.groups()) > 1 else None) if symbol_name: # Get surrounding context start = max(0, match.start() - 100) end = min(len(content), match.end() + 500) context = content[start:end] symbols.append((symbol_name, pattern_name, context, None)) # If no symbols found, index the whole file if not symbols and len(content) > 0: file_name = file_path.stem symbols.append((file_name, 'file', content[:1000], None)) except Exception: pass return symbols def _get_node_content(self, content: str, node) -> str: """Get content for an AST node""" try: lines = content.split('\n') start_line = node.lineno - 1 end_line = getattr(node, 'end_lineno', start_line + 10) return '\n'.join(lines[start_line:end_line])[:1000] except: return "" # Use this as the default export SmartCodeSearch = CleanSmartCodeSearch __all__ = ['SmartCodeSearch', 'CleanSmartCodeSearch']

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stevenjjobson/scs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server