clean_search.pyโข41.1 kB
"""
Clean SmartCodeSearch implementation for MCP server
No console output, fully thread-safe
"""
import sys
import os
import sqlite3
import hashlib
import re
import ast
import json
from pathlib import Path
from typing import List, Tuple, Optional, Dict, Any
import numpy as np
# Suppress all output from sentence_transformers
os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
# Redirect stderr to devnull during import
import io
old_stderr = sys.stderr
sys.stderr = io.StringIO()
try:
from sentence_transformers import SentenceTransformer
finally:
sys.stderr = old_stderr
from src.core.db_wrapper import ThreadSafeDB
from src.core.dependency_analyzer import DependencyAnalyzer
from src.core.usage_analyzer import UsageAnalyzer
class CleanSmartCodeSearch:
"""Clean version of SmartCodeSearch with no console output"""
def __init__(self, project_root=".", quiet=True):
self.root = Path(project_root)
self.db_path = self.root / ".claude-symbols" / "search.db"
self.db_path.parent.mkdir(exist_ok=True)
# Create README.md in .claude-symbols directory if it doesn't exist
self._create_symbols_readme()
# Load model silently
self.model = SentenceTransformer('all-MiniLM-L6-v2')
# Thread-safe database
self.db = ThreadSafeDB(self.db_path)
self._init_db()
# Initialize language patterns (copied from scsold.py)
self._init_patterns()
# Initialize dependency analyzer
self.dependency_analyzer = DependencyAnalyzer(self.root)
# Initialize usage analyzer
self.usage_analyzer = UsageAnalyzer(self.root)
def _init_db(self):
"""Initialize database schema - compatible with scsold.py"""
self.db.execute('''
CREATE TABLE IF NOT EXISTS symbols (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL,
file_path TEXT NOT NULL,
line_num INTEGER NOT NULL,
end_line INTEGER,
signature TEXT,
docstring TEXT,
code_context TEXT,
embedding BLOB,
last_updated TEXT,
access_count INTEGER DEFAULT 0,
language TEXT,
file_type TEXT
)
''')
# Create indexes
try:
self.db.execute('CREATE INDEX IF NOT EXISTS idx_file_path ON symbols(file_path)')
self.db.execute('CREATE INDEX IF NOT EXISTS idx_name ON symbols(name)')
self.db.execute('CREATE INDEX IF NOT EXISTS idx_type ON symbols(type)')
except:
pass
def _create_symbols_readme(self):
"""Create README.md in .claude-symbols directory to explain its purpose"""
readme_path = self.db_path.parent / "README.md"
# Only create if it doesn't exist
if not readme_path.exists():
readme_content = """# .claude-symbols Directory
This directory is automatically created and managed by the SCS-MCP (Smart Code Search) system.
## Purpose
The `.claude-symbols` folder contains the semantic search index for your project, enabling Claude Desktop and other MCP clients to intelligently search and understand your codebase.
## Contents
- **search.db**: SQLite database containing:
- Indexed code symbols (functions, classes, methods, variables)
- Documentation sections (markdown headers, config keys)
- Semantic embeddings for intelligent search
- Code relationships and dependencies
- Usage patterns and metrics
## Important Notes
- **DO NOT DELETE** this directory while using SCS-MCP
- **DO NOT EDIT** files in this directory manually
- **DO NOT COMMIT** this directory to version control (add to .gitignore)
- The index is automatically updated when files change
- Deleting this directory will require a full reindex
## Storage Size
The database size depends on your project:
- Small projects (<1000 files): ~10-50 MB
- Medium projects (1000-5000 files): ~50-200 MB
- Large projects (>5000 files): ~200 MB+
## Maintenance
To rebuild the index:
```bash
python scripts/reindex_all.py
```
To verify index integrity:
```bash
python scripts/verify_index.py
```
## Privacy
- All data stays local on your machine
- No code or data is sent to external services
- Embeddings are generated locally using sentence transformers
For more information, see: https://github.com/StevenJJobson/scs-mcp
"""
try:
readme_path.write_text(readme_content, encoding='utf-8')
except Exception:
# Silently fail if we can't write the README
pass
def _init_patterns(self):
"""Initialize language patterns for symbol extraction"""
self.language_patterns = {
'python': {
'extensions': ['.py'],
'function': re.compile(r'^(?:async\s+)?def\s+(\w+)\s*\((.*?)\):', re.MULTILINE),
'class': re.compile(r'^class\s+(\w+)(?:\((.*?)\))?:', re.MULTILINE),
'method': re.compile(r'^\s+(?:async\s+)?def\s+(\w+)\s*\((.*?)\):', re.MULTILINE),
},
'javascript': {
'extensions': ['.js', '.jsx', '.mjs'],
'function': re.compile(r'(?:function\s+(\w+)|const\s+(\w+)\s*=\s*(?:async\s+)?(?:\([^)]*\)|[^=]+)\s*=>)', re.MULTILINE),
'class': re.compile(r'class\s+(\w+)(?:\s+extends\s+\w+)?', re.MULTILINE),
},
'typescript': {
'extensions': ['.ts', '.tsx'],
'function': re.compile(r'(?:export\s+)?(?:async\s+)?(?:function\s+(\w+)|const\s+(\w+)\s*(?::\s*[^=]+)?\s*=)', re.MULTILINE),
'class': re.compile(r'(?:export\s+)?class\s+(\w+)(?:\s+extends\s+\w+)?', re.MULTILINE),
'interface': re.compile(r'(?:export\s+)?interface\s+(\w+)', re.MULTILINE),
},
}
# Add documentation and configuration file patterns
self.doc_patterns = {
'markdown': {
'extensions': ['.md'],
'section': re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE),
},
'json': {
'extensions': ['.json'],
'key': re.compile(r'"([^"]+)":', re.MULTILINE),
},
'yaml': {
'extensions': ['.yaml', '.yml'],
'key': re.compile(r'^(\w+):', re.MULTILINE),
},
'toml': {
'extensions': ['.toml'],
'section': re.compile(r'^\[([^\]]+)\]', re.MULTILINE),
},
'shell': {
'extensions': ['.sh'],
'function': re.compile(r'^(?:function\s+)?(\w+)\s*\(\)', re.MULTILINE),
},
'text': {
'extensions': ['.txt'],
'special_files': ['LICENSE', 'SECURITY', 'AUTHORS', 'CONTRIBUTORS'],
},
}
def search(self, query: str, limit: int = 10) -> List[dict]:
"""Search for code by semantic similarity - returns dict format like scsold.py"""
try:
# Encode query silently
query_embedding = self.model.encode(query, show_progress_bar=False)
# Fetch all symbols including all enhanced fields
rows = self.db.fetchall(
'''SELECT id, name, type, file_path, line_num, signature, docstring,
code_context, embedding, language, file_type,
todo_items, inline_comments, design_notes,
parameters, return_type, exceptions_raised, type_annotations,
imports, calls, called_by, inherits_from, inherited_by,
usage_frequency, test_files, example_usage, common_patterns, test_coverage
FROM symbols WHERE embedding IS NOT NULL'''
)
if not rows:
return []
# Check if query is looking for specific metadata types
is_todo_search = any(word in query.lower() for word in ['todo', 'fixme', 'xxx', 'hack', 'note'])
is_doc_search = any(word in query.lower() for word in ['documented', 'comment', 'documentation', 'docs'])
is_type_search = any(word in query.lower() for word in ['returns', 'raises', 'exception', 'type', 'parameter', '->', 'optional', 'list', 'dict'])
is_dep_search = any(word in query.lower() for word in ['imports', 'calls', 'inherits', 'uses', 'depends', 'extends'])
is_usage_search = any(word in query.lower() for word in ['tested', 'test', 'coverage', 'example', 'frequently', 'used'])
results = []
for row in rows:
(sym_id, name, sym_type, file_path, line_num, signature,
docstring, content, embedding_blob, language, file_type,
todo_items, inline_comments, design_notes,
parameters, return_type, exceptions_raised, type_annotations,
imports, calls, called_by, inherits_from, inherited_by,
usage_frequency, test_files, example_usage, common_patterns, test_coverage) = row
if not content:
content = signature or ''
embedding = np.frombuffer(embedding_blob, dtype=np.float32)
# Cosine similarity
similarity = np.dot(query_embedding, embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(embedding) + 1e-10
)
# Text matching boost
text_score = 0
if query.lower() in name.lower():
text_score = 0.3
elif content and query.lower() in content.lower():
text_score = 0.1
# Documentation boost
doc_score = 0
if is_todo_search and todo_items and todo_items != '[]':
todos = json.loads(todo_items)
for todo in todos:
if query.lower() in todo.get('text', '').lower():
doc_score = 0.4
break
elif todo.get('type', '').lower() in query.lower():
doc_score = 0.2
if is_doc_search:
# Boost well-documented code
has_docs = False
if docstring and len(docstring) > 20:
has_docs = True
doc_score += 0.15
if inline_comments and inline_comments != '[]' and len(json.loads(inline_comments)) > 3:
has_docs = True
doc_score += 0.1
if design_notes and design_notes != '[]' and len(json.loads(design_notes)) > 0:
has_docs = True
doc_score += 0.15
# Check if query terms appear in documentation
if inline_comments and inline_comments != '[]':
comments = json.loads(inline_comments)
for comment in comments:
if query.lower() in comment.lower():
doc_score += 0.2
break
# Type information boost
type_score = 0
if is_type_search:
# Check return type matches
if return_type and query.lower() in return_type.lower():
type_score += 0.3
# Check exception matches
if 'raises' in query.lower() or 'exception' in query.lower():
if exceptions_raised and exceptions_raised != '[]':
exc_list = json.loads(exceptions_raised)
for exc in exc_list:
if query.lower() in exc.lower() or exc.lower() in query.lower():
type_score += 0.35
break
# Check parameter types
if parameters and parameters != '[]':
param_list = json.loads(parameters)
for param in param_list:
if 'type' in param and param['type']:
if query.lower() in param['type'].lower():
type_score += 0.2
break
# Boost functions with rich type annotations
if return_type and parameters != '[]':
type_score += 0.05
# Dependency information boost
dep_score = 0
if is_dep_search:
# Check imports
if 'import' in query.lower() and imports and imports != '[]':
import_list = json.loads(imports)
for imp in import_list:
if query.lower() in imp.lower() or imp.lower() in query.lower():
dep_score += 0.3
break
# Check function calls
if 'calls' in query.lower() and calls and calls != '[]':
call_list = json.loads(calls)
for call in call_list:
if query.lower() in call.lower():
dep_score += 0.25
break
# Check inheritance
if ('inherits' in query.lower() or 'extends' in query.lower()) and inherits_from:
if query.lower() in inherits_from.lower():
dep_score += 0.35
# Usage context boost
usage_score = 0
if is_usage_search:
# Boost well-tested code
if 'test' in query.lower():
if test_coverage and test_coverage > 0:
usage_score += test_coverage * 0.3
if test_files and test_files != '[]':
test_file_list = json.loads(test_files)
if test_file_list:
usage_score += 0.2
# Boost frequently used code
if 'frequently' in query.lower() or 'used' in query.lower():
if usage_frequency and usage_frequency > 5:
usage_score += min(0.3, usage_frequency * 0.02)
# Boost if has examples
if 'example' in query.lower() and example_usage:
usage_score += 0.15
# General boost for well-tested, frequently used code
if test_coverage and test_coverage > 0.5:
usage_score += 0.05
if usage_frequency and usage_frequency > 10:
usage_score += 0.05
final_score = similarity + text_score + doc_score + type_score + dep_score + usage_score
# Return dict format compatible with scsold.py
results.append({
'id': sym_id,
'name': name,
'type': sym_type,
'file_path': file_path,
'line_num': line_num,
'signature': signature or '',
'docstring': docstring or '',
'content': content[:500] if content else '',
'score': final_score,
'match_type': 'semantic',
'language': language or 'unknown',
'file_type': file_type or 'code',
'todo_items': todo_items,
'inline_comments': inline_comments,
'design_notes': design_notes,
'parameters': parameters,
'return_type': return_type,
'exceptions_raised': exceptions_raised,
'type_annotations': type_annotations,
'imports': imports,
'calls': calls,
'called_by': called_by,
'inherits_from': inherits_from,
'inherited_by': inherited_by,
'usage_frequency': usage_frequency,
'test_files': test_files,
'example_usage': example_usage,
'common_patterns': common_patterns,
'test_coverage': test_coverage
})
# Sort and return top results
results.sort(key=lambda x: x['score'], reverse=True)
return results[:limit]
except Exception:
return []
def index_project(self, force: bool = False) -> None:
"""Index all files in the project"""
try:
# Get all code files
files_to_index = []
for pattern_info in self.language_patterns.values():
for ext in pattern_info['extensions']:
files_to_index.extend(self.root.rglob(f'*{ext}'))
# Get all documentation files
for pattern_info in self.doc_patterns.values():
if 'extensions' in pattern_info:
for ext in pattern_info['extensions']:
files_to_index.extend(self.root.rglob(f'*{ext}'))
# Add special files without extensions
if 'special_files' in pattern_info:
for special_file in pattern_info['special_files']:
files_to_index.extend(self.root.rglob(special_file))
# Filter out common directories to ignore
ignored_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', 'dist', 'build', 'dev-archive'}
files_to_index = [
f for f in files_to_index
if not any(ignored in f.parts for ignored in ignored_dirs)
]
for file_path in files_to_index:
self.index_file(file_path, force)
except Exception:
pass
def index_file(self, file_path: Path, force: bool = False) -> None:
"""Index a single file"""
try:
# Normalize path to relative
if file_path.is_absolute():
try:
file_path = file_path.relative_to(self.root)
except ValueError:
# Path is outside project root, use as is
pass
# Get file hash
file_hash = self._get_file_hash(self.root / file_path if not file_path.is_absolute() else file_path)
# Check if already indexed (check both relative and absolute paths)
if not force:
existing = self.db.fetchone(
'SELECT COUNT(*) FROM symbols WHERE file_path = ? OR file_path = ?',
(str(file_path), str(self.root / file_path))
)
if existing and existing[0] > 0:
return
# Delete old entries (both relative and absolute)
self.db.execute('DELETE FROM symbols WHERE file_path = ? OR file_path = ?',
(str(file_path), str(self.root / file_path)))
# Read file content for documentation extraction
full_path = self.root / file_path if not file_path.is_absolute() else file_path
try:
with open(full_path, 'r', encoding='utf-8') as f:
file_content = f.read()
except UnicodeDecodeError:
with open(full_path, 'r', encoding='latin-1') as f:
file_content = f.read()
# Determine language and file type from extension
ext = file_path.suffix
file_name = file_path.name
# Check if it's a code file
language = None
for lang, patterns in self.language_patterns.items():
if ext in patterns['extensions']:
language = lang
break
# Check if it's a documentation file
doc_type = None
for doc, patterns in self.doc_patterns.items():
if 'extensions' in patterns and ext in patterns['extensions']:
doc_type = doc
break
if 'special_files' in patterns and file_name in patterns['special_files']:
doc_type = 'text'
break
# Set file type
if language:
file_type = 'code'
elif doc_type:
file_type = 'documentation'
language = doc_type # Use doc type as language for documentation files
else:
file_type = 'unknown'
language = 'unknown'
# Extract documentation for the entire file
file_docs = self._extract_documentation(file_content, language)
# Extract symbols
symbols = self._extract_symbols(file_path)
# Index each symbol
from datetime import datetime
for symbol_name, symbol_type, content, node in symbols:
# Extract type information if we have an AST node
type_info = {'parameters': '[]', 'return_type': '', 'exceptions_raised': '[]', 'type_annotations': '{}'}
if node and language == 'python':
type_info = self._extract_type_info(node)
# Extract dependency information
dep_info = {'imports': '[]', 'calls': '[]', 'called_by': '[]', 'inherits_from': '', 'inherited_by': '[]'}
if language == 'python':
dep_info = self._extract_dependencies(file_path, node)
# Extract usage context
usage_info = {'usage_frequency': 0, 'test_files': '[]', 'example_usage': '', 'common_patterns': '[]', 'test_coverage': 0.0}
if language == 'python':
usage_info = self._extract_usage_context(symbol_name, file_path)
# Create enhanced content for embedding that includes documentation and types
enhanced_content = content
if file_docs.get('todo_items') and file_docs['todo_items'] != '[]':
todos = json.loads(file_docs['todo_items'])
todo_text = ' '.join([f"{t['type']}: {t['text']}" for t in todos[:5]])
enhanced_content = f"{content}\n{todo_text}"
# Add type information to enhanced content for better semantic search
if type_info['return_type']:
enhanced_content = f"{enhanced_content}\nReturns: {type_info['return_type']}"
if type_info['exceptions_raised'] != '[]':
exceptions = json.loads(type_info['exceptions_raised'])
if exceptions:
enhanced_content = f"{enhanced_content}\nRaises: {', '.join(exceptions)}"
embedding = self.model.encode(enhanced_content, show_progress_bar=False)
embedding_blob = embedding.tobytes()
# Get line number from AST node if available
line_num = getattr(node, 'lineno', 1) if node else 1
# Get signature if it's a function
signature = ''
if node and isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
try:
signature = f"{node.name}({', '.join([a.arg for a in node.args.args])})"
except:
signature = ''
self.db.execute(
'''INSERT INTO symbols
(name, type, file_path, line_num, signature, code_context, embedding,
last_updated, language, file_type, todo_items, inline_comments, design_notes,
parameters, return_type, exceptions_raised, type_annotations,
imports, calls, called_by, inherits_from, inherited_by,
usage_frequency, test_files, example_usage, common_patterns, test_coverage)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(symbol_name, symbol_type, str(file_path), line_num, signature, content,
embedding_blob, datetime.now().isoformat(), language, 'code',
file_docs.get('todo_items', '[]'),
file_docs.get('inline_comments', '[]'),
file_docs.get('design_notes', '[]'),
type_info['parameters'],
type_info['return_type'],
type_info['exceptions_raised'],
type_info['type_annotations'],
dep_info['imports'],
dep_info['calls'],
dep_info['called_by'],
dep_info['inherits_from'],
dep_info['inherited_by'],
usage_info['usage_frequency'],
usage_info['test_files'],
usage_info['example_usage'],
usage_info['common_patterns'],
usage_info['test_coverage'])
)
except Exception:
pass
def _get_file_hash(self, file_path: Path) -> str:
"""Get hash of file contents"""
try:
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
except:
return ""
def _extract_usage_context(self, symbol_name: str, file_path: Path) -> Dict[str, Any]:
"""Extract usage context and patterns for a symbol"""
# Get usage information from analyzer
usage_info = self.usage_analyzer.analyze_symbol_usage(symbol_name, file_path)
# Convert to database format
return {
'usage_frequency': usage_info.get('usage_frequency', 0),
'test_files': json.dumps(usage_info.get('test_files', [])),
'example_usage': usage_info.get('example_usage', '')[:500], # Limit size
'common_patterns': json.dumps(usage_info.get('common_patterns', [])),
'test_coverage': usage_info.get('test_coverage', 0.0)
}
def _extract_dependencies(self, file_path: Path, node: ast.AST = None, full_tree: ast.AST = None) -> Dict[str, str]:
"""Extract dependency information for a symbol"""
dep_info = {
'imports': [],
'calls': [],
'called_by': [],
'inherits_from': '',
'inherited_by': []
}
# Analyze the file for dependencies
full_path = self.root / file_path if not file_path.is_absolute() else file_path
file_deps = self.dependency_analyzer.analyze_file(full_path)
if node and hasattr(node, 'name'):
symbol_name = node.name
if symbol_name in file_deps:
symbol_info = file_deps[symbol_name]
dep_info['imports'] = symbol_info.get('imports', [])
dep_info['calls'] = symbol_info.get('calls', [])
dep_info['inherits_from'] = symbol_info.get('inherits_from', '') or ''
# Convert to JSON for storage
return {
'imports': json.dumps(dep_info['imports']),
'calls': json.dumps(dep_info['calls']),
'called_by': json.dumps(dep_info['called_by']),
'inherits_from': dep_info['inherits_from'],
'inherited_by': json.dumps(dep_info['inherited_by'])
}
def _extract_type_info(self, node: ast.AST, source_lines: List[str] = None) -> Dict[str, str]:
"""Extract type information from Python AST nodes"""
type_info = {
'parameters': [],
'return_type': None,
'exceptions_raised': [],
'type_annotations': {}
}
# Extract function/method parameters and types
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
# Extract parameters
for arg in node.args.args:
param_info = {'name': arg.arg}
# Get type annotation if present
if arg.annotation:
param_info['type'] = ast.unparse(arg.annotation) if hasattr(ast, 'unparse') else str(arg.annotation)
type_info['type_annotations'][arg.arg] = param_info['type']
type_info['parameters'].append(param_info)
# Extract return type
if node.returns:
type_info['return_type'] = ast.unparse(node.returns) if hasattr(ast, 'unparse') else str(node.returns)
# Find raised exceptions in function body
for child in ast.walk(node):
if isinstance(child, ast.Raise):
if child.exc:
if isinstance(child.exc, ast.Call) and isinstance(child.exc.func, ast.Name):
type_info['exceptions_raised'].append(child.exc.func.id)
elif isinstance(child.exc, ast.Name):
type_info['exceptions_raised'].append(child.exc.id)
# Extract class inheritance
elif isinstance(node, ast.ClassDef):
# Get base classes
base_classes = []
for base in node.bases:
if isinstance(base, ast.Name):
base_classes.append(base.id)
elif isinstance(base, ast.Attribute):
base_classes.append(ast.unparse(base) if hasattr(ast, 'unparse') else str(base))
if base_classes:
type_info['type_annotations']['inherits_from'] = base_classes
# Check for type hints in class attributes
for item in node.body:
if isinstance(item, ast.AnnAssign) and item.annotation:
attr_name = item.target.id if isinstance(item.target, ast.Name) else str(item.target)
attr_type = ast.unparse(item.annotation) if hasattr(ast, 'unparse') else str(item.annotation)
type_info['type_annotations'][attr_name] = attr_type
# Convert to JSON strings for database storage
return {
'parameters': json.dumps(type_info['parameters']),
'return_type': type_info['return_type'] or '',
'exceptions_raised': json.dumps(list(set(type_info['exceptions_raised']))), # Remove duplicates
'type_annotations': json.dumps(type_info['type_annotations'])
}
def _extract_documentation(self, content: str, language: str = 'python', node=None) -> Dict[str, str]:
"""Extract documentation, comments, and TODOs from source code"""
# Extract TODO/FIXME/XXX/HACK/NOTE items
todo_pattern = r'(?:#|//|/\*|\*)\s*(TODO|FIXME|XXX|HACK|NOTE)[:|\s]\s*(.+?)(?:\*/|\n|$)'
todos = re.findall(todo_pattern, content, re.MULTILINE | re.IGNORECASE)
# Extract inline comments based on language
inline_comments = []
if language == 'python':
# Python inline comments (excluding TODOs)
inline_pattern = r'#\s*(?!TODO|FIXME|XXX|HACK|NOTE)(.+?)$'
inline_comments = re.findall(inline_pattern, content, re.MULTILINE | re.IGNORECASE)
elif language in ['javascript', 'typescript']:
# JS/TS inline comments
inline_pattern = r'//\s*(?!TODO|FIXME|XXX|HACK|NOTE)(.+?)$'
inline_comments = re.findall(inline_pattern, content, re.MULTILINE | re.IGNORECASE)
# Extract block comments and design notes
design_notes = []
if language == 'python':
# Python docstrings and block comments
block_pattern = r'"""(.*?)"""|\'\'\'(.*?)\'\'\'|^\s*#\s*(.+(?:\n\s*#.+)*)'
blocks = re.findall(block_pattern, content, re.DOTALL | re.MULTILINE)
for block in blocks:
text = block[0] or block[1] or block[2]
if text and len(text.strip()) > 20: # Only meaningful blocks
design_notes.append(text.strip())
elif language in ['javascript', 'typescript']:
# JS/TS block comments
block_pattern = r'/\*\*(.*?)\*/|/\*(.*?)\*/'
blocks = re.findall(block_pattern, content, re.DOTALL)
for block in blocks:
text = block[0] or block[1]
if text and len(text.strip()) > 20:
design_notes.append(text.strip())
# Format as JSON strings for database storage
todo_items = [{"type": t[0].upper(), "text": t[1].strip()} for t in todos]
return {
"todo_items": json.dumps(todo_items[:50]), # Limit to 50 TODOs
"inline_comments": json.dumps([c.strip() for c in inline_comments[:30] if c.strip()]), # Limit to 30 comments
"design_notes": json.dumps(design_notes[:10]) # Limit to 10 design blocks
}
def _extract_symbols(self, file_path: Path) -> List[Tuple[str, str, str, Optional[ast.AST]]]:
"""Extract symbols from a file - returns (name, type, content, node)"""
symbols = []
try:
# Resolve full path for reading
full_path = self.root / file_path if not file_path.is_absolute() else file_path
# Read file content
try:
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
with open(full_path, 'r', encoding='latin-1') as f:
content = f.read()
# Determine file type
ext = file_path.suffix
file_name = file_path.name
# Special handling for Python files using AST
if ext == '.py':
try:
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
func_content = self._get_node_content(content, node)
symbols.append((node.name, 'function', func_content, node))
elif isinstance(node, ast.ClassDef):
class_content = self._get_node_content(content, node)
symbols.append((node.name, 'class', class_content, node))
except:
pass
# Use regex patterns for other code languages
for lang, patterns in self.language_patterns.items():
if ext in patterns['extensions']:
for pattern_name, pattern in patterns.items():
if pattern_name != 'extensions':
matches = pattern.finditer(content)
for match in matches:
symbol_name = match.group(1) or (match.group(2) if len(match.groups()) > 1 else None)
if symbol_name:
# Get surrounding context
start = max(0, match.start() - 100)
end = min(len(content), match.end() + 500)
context = content[start:end]
symbols.append((symbol_name, pattern_name, context, None))
# Handle documentation files
for doc_type, patterns in self.doc_patterns.items():
is_doc_file = False
if 'extensions' in patterns and ext in patterns['extensions']:
is_doc_file = True
elif 'special_files' in patterns and file_name in patterns['special_files']:
is_doc_file = True
if is_doc_file:
# Extract sections/headers from documentation files
if doc_type == 'markdown' and 'section' in patterns:
matches = patterns['section'].finditer(content)
for match in matches:
header_level = len(match.group(1)) # Number of # symbols
header_text = match.group(2).strip()
# Get content until next header of same or higher level
start = match.end()
end = len(content)
# Find next section
next_pattern = re.compile(f'^#{{{1},{header_level}}}\\s+', re.MULTILINE)
next_match = next_pattern.search(content, start)
if next_match:
end = next_match.start()
section_content = content[start:end][:1000] # Limit content size
symbols.append((header_text, f'h{header_level}', section_content, None))
elif doc_type in ['json', 'yaml'] and 'key' in patterns:
# For config files, extract top-level keys
matches = patterns['key'].finditer(content)
for i, match in enumerate(matches):
if i < 20: # Limit to first 20 keys
key_name = match.group(1)
# Get some context around the key
start = match.start()
end = min(len(content), match.end() + 200)
context = content[start:end]
symbols.append((key_name, 'config_key', context, None))
elif doc_type == 'shell' and 'function' in patterns:
# Extract shell functions
matches = patterns['function'].finditer(content)
for match in matches:
func_name = match.group(1)
# Get function body
start = match.start()
end = min(len(content), match.end() + 500)
context = content[start:end]
symbols.append((func_name, 'shell_function', context, None))
# Always index the whole file for documentation
if not symbols or doc_type in ['text', 'markdown']:
file_title = file_path.stem.replace('_', ' ').replace('-', ' ').title()
# Include more content for documentation files
symbols.append((file_title, 'document', content[:2000], None))
break
# If no symbols found, index the whole file
if not symbols and len(content) > 0:
file_name = file_path.stem
symbols.append((file_name, 'file', content[:1000], None))
except Exception:
pass
return symbols
def _get_node_content(self, content: str, node) -> str:
"""Get content for an AST node"""
try:
lines = content.split('\n')
start_line = node.lineno - 1
end_line = getattr(node, 'end_lineno', start_line + 10)
return '\n'.join(lines[start_line:end_line])[:1000]
except:
return ""
# Use this as the default export
SmartCodeSearch = CleanSmartCodeSearch
__all__ = ['SmartCodeSearch', 'CleanSmartCodeSearch']