"""
Code Indexer - Multi-language AST parsing for code understanding.
Phase 2 of Cognitive Architecture: Daem0n understands project structure
and can answer "what depends on X?"
Uses tree-sitter-language-pack for cross-language parsing without compilation.
(Supports Python 3.14+ with pre-built wheels)
"""
import hashlib
import logging
from pathlib import Path
from typing import Generator, List, Optional, Dict, Any, Tuple
from datetime import datetime, timezone
from .config import settings
logger = logging.getLogger(__name__)
# Language configuration: file extension -> tree-sitter language name
LANGUAGE_CONFIG = {
'.py': 'python',
'.js': 'javascript',
'.mjs': 'javascript',
'.cjs': 'javascript',
'.ts': 'typescript',
'.tsx': 'tsx',
'.go': 'go',
'.rs': 'rust',
'.java': 'java',
'.kt': 'kotlin',
'.kts': 'kotlin',
'.rb': 'ruby',
'.php': 'php',
'.c': 'c',
'.h': 'c',
'.cpp': 'cpp',
'.hpp': 'cpp',
'.cc': 'cpp',
'.cs': 'c_sharp',
}
# Tree-sitter queries for extracting entities (language-specific)
# These patterns capture function definitions, class definitions, etc.
ENTITY_QUERIES = {
'python': """
(class_definition
name: (identifier) @class.name) @class.def
(function_definition
name: (identifier) @function.name) @function.def
""",
'typescript': """
(class_declaration
name: (type_identifier) @class.name) @class.def
(function_declaration
name: (identifier) @function.name) @function.def
(method_definition
name: (property_identifier) @method.name) @method.def
(interface_declaration
name: (type_identifier) @interface.name) @interface.def
""",
'javascript': """
(class_declaration
name: (identifier) @class.name) @class.def
(function_declaration
name: (identifier) @function.name) @function.def
(method_definition
name: (property_identifier) @method.name) @method.def
""",
'tsx': """
(class_declaration
name: (type_identifier) @class.name) @class.def
(function_declaration
name: (identifier) @function.name) @function.def
(method_definition
name: (property_identifier) @method.name) @method.def
""",
'go': """
(type_declaration
(type_spec
name: (type_identifier) @class.name)) @class.def
(function_declaration
name: (identifier) @function.name) @function.def
(method_declaration
name: (field_identifier) @method.name) @method.def
""",
'rust': """
(struct_item
name: (type_identifier) @class.name) @class.def
(enum_item
name: (type_identifier) @enum.name) @enum.def
(impl_item
type: (type_identifier) @impl.name) @impl.def
(function_item
name: (identifier) @function.name) @function.def
(trait_item
name: (type_identifier) @trait.name) @trait.def
""",
'java': """
(class_declaration
name: (identifier) @class.name) @class.def
(interface_declaration
name: (identifier) @interface.name) @interface.def
(method_declaration
name: (identifier) @method.name) @method.def
""",
'c': """
(function_definition
declarator: (function_declarator
declarator: (identifier) @function.name)) @function.def
(struct_specifier
name: (type_identifier) @struct.name) @struct.def
""",
'cpp': """
(function_definition
declarator: (function_declarator
declarator: (identifier) @function.name)) @function.def
(class_specifier
name: (type_identifier) @class.name) @class.def
(struct_specifier
name: (type_identifier) @struct.name) @struct.def
""",
'c_sharp': """
(class_declaration
name: (identifier) @class.name) @class.def
(interface_declaration
name: (identifier) @interface.name) @interface.def
(method_declaration
name: (identifier) @method.name) @method.def
""",
'ruby': """
(class
name: (constant) @class.name) @class.def
(method
name: (identifier) @method.name) @method.def
(singleton_method
name: (identifier) @method.name) @method.def
""",
'php': """
(class_declaration
name: (name) @class.name) @class.def
(function_definition
name: (name) @function.name) @function.def
(method_declaration
name: (name) @method.name) @method.def
""",
'kotlin': """
(class_declaration
(type_identifier) @class.name) @class.def
(object_declaration
(type_identifier) @class.name) @class.def
(function_declaration
(simple_identifier) @function.name) @function.def
""",
}
# Import extraction queries per language
IMPORT_QUERIES = {
'python': """
(import_statement
name: (dotted_name) @import.name) @import.def
(import_from_statement
module_name: (dotted_name) @import.module) @import.def
""",
'typescript': """
(import_statement
source: (string) @import.source) @import.def
""",
'javascript': """
(import_statement
source: (string) @import.source) @import.def
""",
}
def _check_tree_sitter_available() -> bool:
"""Check if tree-sitter-language-pack is available."""
import importlib.util
return importlib.util.find_spec("tree_sitter_language_pack") is not None
def is_available() -> bool:
"""Check if code indexer is available (tree-sitter installed)."""
return _check_tree_sitter_available()
class TreeSitterIndexer:
"""
Universal code indexer using tree-sitter.
Supports multiple languages through tree-sitter-language-pack package.
Extracts code entities (classes, functions, methods) for indexing.
"""
def __init__(self):
self._parsers: Dict[str, Any] = {}
self._languages: Dict[str, Any] = {}
self._available = _check_tree_sitter_available()
# Parse tree cache
self._parse_cache: Dict[str, Tuple[str, Any]] = {} # path -> (hash, tree)
self._cache_maxsize: int = settings.parse_tree_cache_maxsize
self._cache_hits: int = 0
self._cache_misses: int = 0
@property
def available(self) -> bool:
"""Check if tree-sitter is available."""
return self._available
def get_parser(self, lang: str):
"""Get or create parser and language for the given language."""
if not self._available:
return None, None
if lang not in self._parsers:
try:
from tree_sitter_language_pack import get_parser, get_language
self._parsers[lang] = get_parser(lang)
self._languages[lang] = get_language(lang)
except Exception as e:
logger.warning(f"Failed to get parser for {lang}: {e}")
return None, None
return self._parsers.get(lang), self._languages.get(lang)
def _get_cached_tree(self, file_path: Path, source: bytes, lang: str):
"""Get parse tree from cache or parse and cache."""
content_hash = hashlib.md5(source).hexdigest()
cache_key = str(file_path)
# Check cache
if cache_key in self._parse_cache:
cached_hash, cached_tree = self._parse_cache[cache_key]
if cached_hash == content_hash:
self._cache_hits += 1
return cached_tree
# Cache miss - parse
self._cache_misses += 1
parser, language = self.get_parser(lang)
if parser is None:
return None
tree = parser.parse(source)
# Evict oldest if at capacity
if len(self._parse_cache) >= self._cache_maxsize:
oldest_key = next(iter(self._parse_cache))
del self._parse_cache[oldest_key]
self._parse_cache[cache_key] = (content_hash, tree)
return tree
@property
def cache_stats(self) -> Dict[str, Any]:
"""Return cache statistics."""
total = self._cache_hits + self._cache_misses
return {
"size": len(self._parse_cache),
"maxsize": self._cache_maxsize,
"hits": self._cache_hits,
"misses": self._cache_misses,
"hit_rate": self._cache_hits / total if total > 0 else 0.0
}
def clear_cache(self) -> int:
"""Clear the parse tree cache."""
count = len(self._parse_cache)
self._parse_cache.clear()
return count
def _extract_imports(self, tree, language, lang: str, source: bytes) -> List[str]:
"""Extract import statements from a parsed file."""
import tree_sitter
query_text = IMPORT_QUERIES.get(lang)
if not query_text:
return []
try:
query = tree_sitter.Query(language, query_text)
cursor = tree_sitter.QueryCursor(query)
matches = list(cursor.matches(tree.root_node))
except Exception as e:
logger.debug(f"Import query failed for {lang}: {e}")
return []
imports = []
for pattern_index, captures_dict in matches:
for capture_name, nodes in captures_dict.items():
if capture_name in ('import.name', 'import.module', 'import.source'):
for node in nodes:
text = source[node.start_byte:node.end_byte].decode('utf-8', errors='replace')
text = text.strip('"\'')
if text and text not in imports:
imports.append(text)
return imports
def get_supported_extensions(self) -> List[str]:
"""Get list of supported file extensions."""
return list(LANGUAGE_CONFIG.keys())
def index_file(self, file_path: Path, project_path: Path) -> Generator[Dict[str, Any], None, None]:
"""
Index a single file and yield code entities.
Args:
file_path: Absolute path to the file
project_path: Project root for relative path calculation
Yields:
CodeEntity dictionaries ready for database insertion
"""
if not self._available:
return
suffix = file_path.suffix.lower()
if suffix not in LANGUAGE_CONFIG:
return
lang = LANGUAGE_CONFIG[suffix]
try:
source = file_path.read_bytes()
# Use cached tree if available
tree = self._get_cached_tree(file_path, source, lang)
if tree is None:
return
# Get language for query operations
_, language = self.get_parser(lang)
if language is None:
return
except Exception as e:
logger.debug(f"Failed to parse {file_path}: {e}")
return
try:
relative_path = file_path.relative_to(project_path)
except ValueError:
# File is not under project path
relative_path = file_path
# Extract file-level imports
file_imports = self._extract_imports(tree, language, lang, source)
# Extract entities using tree-sitter queries
for entity in self._extract_entities(tree, language, lang, source, str(relative_path)):
entity['project_path'] = str(project_path)
entity['file_path'] = str(relative_path)
entity['imports'] = file_imports # Attach imports to all entities
yield self._make_entity_dict(**entity)
def _extract_entities(
self,
tree,
language,
lang: str,
source: bytes,
file_path: str = ""
) -> Generator[Dict[str, Any], None, None]:
"""Extract entities using language-specific queries."""
import tree_sitter
query_text = ENTITY_QUERIES.get(lang)
if not query_text:
# Fallback: walk tree manually for basic extraction
yield from self._walk_tree_fallback(tree.root_node, source, lang, file_path)
return
try:
# Use new tree-sitter 0.25+ API with Query constructor and QueryCursor
query = tree_sitter.Query(language, query_text)
cursor = tree_sitter.QueryCursor(query)
matches = list(cursor.matches(tree.root_node))
except Exception as e:
logger.debug(f"Query failed for {lang}: {e}")
yield from self._walk_tree_fallback(tree.root_node, source, lang, file_path)
return
# Process matches - each match is (pattern_index, captures_dict)
processed_defs = set()
for pattern_index, captures_dict in matches:
# Find the definition capture (ends with .def)
def_capture = None
def_nodes = []
name_nodes = []
for capture_name, nodes in captures_dict.items():
if capture_name.endswith('.def'):
def_capture = capture_name
def_nodes = nodes
elif capture_name.endswith('.name'):
name_nodes = nodes
if not def_nodes:
continue
for def_node in def_nodes:
# Skip if already processed
node_id = (def_node.start_byte, def_node.end_byte)
if node_id in processed_defs:
continue
processed_defs.add(node_id)
entity_type = def_capture.split('.')[0] if def_capture else 'unknown'
# Find the corresponding name node
name = "anonymous"
for name_node in name_nodes:
if self._is_descendant(def_node, name_node):
name = name_node.text.decode('utf-8', errors='replace')
break
# Get first line as signature (up to 200 chars)
signature = self._extract_signature(def_node, source)
docstring = self._extract_docstring(def_node, source, lang)
yield {
'entity_type': entity_type,
'name': name,
'qualified_name': self._compute_qualified_name(def_node, source, lang, file_path),
'line_start': def_node.start_point[0] + 1, # 1-indexed
'line_end': def_node.end_point[0] + 1,
'signature': signature,
'docstring': docstring,
}
def _is_descendant(self, ancestor, node) -> bool:
"""Check if node is a descendant of ancestor."""
current = node
while current is not None:
if current == ancestor:
return True
current = current.parent
return False
def _extract_signature(self, node, source: bytes) -> str:
"""Extract the first line of a definition as signature."""
try:
start = node.start_byte
end = min(start + 500, node.end_byte) # Get enough for first line
text = source[start:end].decode('utf-8', errors='replace')
# Get first line, limit to 200 chars
first_line = text.split('\n')[0]
return first_line[:200]
except Exception:
return ""
def _extract_docstring(self, node, source: bytes, lang: str) -> Optional[str]:
"""Extract docstring from a definition node."""
try:
# Language-specific docstring extraction
if lang == 'python':
return self._extract_python_docstring(node, source)
elif lang in ('javascript', 'typescript', 'tsx', 'java', 'c_sharp'):
return self._extract_jsdoc(node, source)
elif lang == 'go':
return self._extract_go_comment(node, source)
return None
except Exception:
return None
def _extract_python_docstring(self, node, source: bytes) -> Optional[str]:
"""Extract Python docstring (first string literal in function/class body)."""
for child in node.children:
if child.type == 'block':
for block_child in child.children:
if block_child.type == 'expression_statement':
for expr_child in block_child.children:
if expr_child.type == 'string':
text = source[expr_child.start_byte:expr_child.end_byte]
return text.decode('utf-8', errors='replace').strip('"\' \n\r')
break
return None
def _extract_jsdoc(self, node, source: bytes) -> Optional[str]:
"""Extract JSDoc comment preceding a definition."""
# Look at previous sibling for comment
prev = node.prev_sibling
while prev and prev.type == 'comment':
text = source[prev.start_byte:prev.end_byte].decode('utf-8', errors='replace')
if text.startswith('/**'):
# Clean up JSDoc
lines = text.split('\n')
cleaned = []
for line in lines:
line = line.strip()
if line.startswith('/**') or line.startswith('*/'):
continue
if line.startswith('*'):
line = line[1:].strip()
cleaned.append(line)
return ' '.join(cleaned)
prev = prev.prev_sibling
return None
def _extract_go_comment(self, node, source: bytes) -> Optional[str]:
"""Extract Go comment preceding a definition."""
prev = node.prev_sibling
comments = []
while prev and prev.type == 'comment':
text = source[prev.start_byte:prev.end_byte].decode('utf-8', errors='replace')
# Remove // prefix
if text.startswith('//'):
text = text[2:].strip()
comments.insert(0, text)
prev = prev.prev_sibling
return ' '.join(comments) if comments else None
def _walk_tree_fallback(
self, node, source: bytes, lang: str = "", file_path: str = ""
) -> Generator[Dict[str, Any], None, None]:
"""
Fallback tree walker for languages without specific queries.
Looks for common node types that typically represent definitions.
"""
definition_types = {
'function_definition', 'function_declaration', 'method_definition',
'class_definition', 'class_declaration', 'class_specifier',
'struct_specifier', 'interface_declaration', 'trait_item',
'impl_item', 'enum_item', 'method_declaration',
}
if node.type in definition_types:
name = self._extract_name_from_node(node, source)
if name:
entity_type = 'function' if 'function' in node.type or 'method' in node.type else 'class'
yield {
'entity_type': entity_type,
'name': name,
'qualified_name': self._compute_qualified_name(node, source, lang, file_path),
'line_start': node.start_point[0] + 1,
'line_end': node.end_point[0] + 1,
'signature': self._extract_signature(node, source),
'docstring': None,
}
for child in node.children:
yield from self._walk_tree_fallback(child, source, lang, file_path)
def _extract_name_from_node(self, node, source: bytes) -> Optional[str]:
"""Try to extract a name from a node by looking for identifier children."""
name_types = {'identifier', 'type_identifier', 'field_identifier',
'property_identifier', 'constant', 'name'}
for child in node.children:
if child.type in name_types:
return source[child.start_byte:child.end_byte].decode('utf-8', errors='replace')
# Some languages nest the name
if child.type in ('declarator', 'function_declarator', 'type_spec'):
for grandchild in child.children:
if grandchild.type in name_types:
return source[grandchild.start_byte:grandchild.end_byte].decode('utf-8', errors='replace')
return None
def _compute_qualified_name(self, node, source: bytes, lang: str, file_path: str) -> str:
"""
Compute fully qualified name by walking parent scopes.
Examples:
- Python class method: module.ClassName.method_name
- Nested class: module.Outer.Inner.method
- Top-level function: module.function_name
"""
parts = []
# Walk up the tree collecting scope names
current = node.parent
while current is not None:
scope_name = None
if lang == 'python':
if current.type == 'class_definition':
scope_name = self._extract_name_from_node(current, source)
elif lang in ('typescript', 'javascript', 'tsx'):
if current.type == 'class_declaration':
scope_name = self._extract_name_from_node(current, source)
elif lang == 'go':
if current.type == 'type_declaration':
scope_name = self._extract_name_from_node(current, source)
if scope_name:
parts.insert(0, scope_name)
current = current.parent
# Add the entity's own name
entity_name = self._extract_name_from_node(node, source)
if entity_name:
parts.append(entity_name)
# Prepend module name from file path
module_name = self._file_path_to_module(file_path)
if module_name:
parts.insert(0, module_name)
return '.'.join(parts) if parts else entity_name or "anonymous"
def _file_path_to_module(self, file_path: str) -> str:
"""Convert file path to module name."""
p = Path(file_path)
stem = p.stem
if stem == '__init__':
return p.parent.name if p.parent.name != '.' else ''
return stem
def _make_entity_dict(self, **kwargs) -> Dict[str, Any]:
"""Create a CodeEntity-compatible dictionary."""
# Use qualified_name for stable IDs that don't change when lines shift
identifier = kwargs.get('qualified_name') or kwargs['name']
id_string = f"{kwargs['project_path']}:{kwargs['file_path']}:{identifier}:{kwargs['entity_type']}"
entity_id = hashlib.sha256(id_string.encode()).hexdigest()[:16]
return {
'id': entity_id,
'project_path': kwargs['project_path'],
'entity_type': kwargs['entity_type'],
'name': kwargs['name'],
'qualified_name': kwargs.get('qualified_name'),
'file_path': kwargs['file_path'],
'line_start': kwargs.get('line_start'),
'line_end': kwargs.get('line_end'),
'signature': kwargs.get('signature'),
'docstring': kwargs.get('docstring'),
'calls': [],
'called_by': [],
'imports': kwargs.get('imports', []),
'inherits': [],
'indexed_at': datetime.now(timezone.utc),
}
class CodeIndexManager:
"""
Manages code indexing across a project.
Orchestrates the TreeSitterIndexer, stores results in SQLite,
and indexes embeddings in Qdrant for semantic search.
"""
# Default patterns for all supported languages
DEFAULT_PATTERNS = [
'**/*.py', '**/*.js', '**/*.mjs', '**/*.ts', '**/*.tsx',
'**/*.go', '**/*.rs', '**/*.java', '**/*.kt', '**/*.kts',
'**/*.rb', '**/*.php', '**/*.c', '**/*.h', '**/*.cpp', '**/*.cs',
]
# Directories to skip during indexing
SKIP_DIRS = {
'.git', 'node_modules', '__pycache__', '.venv', 'venv',
'dist', 'build', '.tox', '.eggs', '*.egg-info',
'target', '.cargo', '.rustup',
'vendor', '.bundle',
'.next', '.nuxt', '.output',
'coverage', '.nyc_output',
'.daem0nmcp', '.devilmcp',
}
def __init__(self, db=None, qdrant=None):
"""
Initialize CodeIndexManager.
Args:
db: DatabaseManager instance (optional)
qdrant: QdrantVectorStore instance (optional)
"""
self.db = db
self.qdrant = qdrant
self.indexer = TreeSitterIndexer()
@property
def available(self) -> bool:
"""Check if code indexing is available."""
return self.indexer.available
def _should_skip(self, path: Path) -> bool:
"""Check if a path should be skipped during indexing."""
parts = set(path.parts)
for skip_dir in self.SKIP_DIRS:
if skip_dir in parts:
return True
# Handle wildcards
if skip_dir.startswith('*'):
suffix = skip_dir[1:]
if any(p.endswith(suffix) for p in parts):
return True
return False
async def index_project(
self,
project_path: str,
patterns: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Index all code entities in a project.
Args:
project_path: Root path of the project
patterns: Glob patterns for files to index (default: all supported)
Returns:
Dict with indexing statistics
"""
if not self.indexer.available:
return {
'error': 'tree-sitter-languages not installed',
'indexed': 0,
'project': project_path,
}
project = Path(project_path).resolve()
patterns = patterns or self.DEFAULT_PATTERNS
entities = []
files_processed = 0
files_skipped = 0
for pattern in patterns:
for file_path in project.glob(pattern):
if self._should_skip(file_path):
files_skipped += 1
continue
if not file_path.is_file():
continue
for entity in self.indexer.index_file(file_path, project):
entities.append(entity)
files_processed += 1
# Store in database if available
if self.db is not None:
await self._store_entities(entities, str(project))
# Index in Qdrant if available
if self.qdrant is not None:
await self._index_in_qdrant(entities)
return {
'indexed': len(entities),
'files_processed': files_processed,
'files_skipped': files_skipped,
'project': str(project),
}
async def _store_entities(self, entities: List[Dict], project_path: str):
"""Store entities in SQLite database."""
from .models import CodeEntity
from sqlalchemy import delete
async with self.db.get_session() as session:
# Clear existing entities for this project
await session.execute(
delete(CodeEntity).where(CodeEntity.project_path == project_path)
)
# Insert new entities
for entity_dict in entities:
entity = CodeEntity(
id=entity_dict['id'],
project_path=entity_dict['project_path'],
entity_type=entity_dict['entity_type'],
name=entity_dict['name'],
qualified_name=entity_dict.get('qualified_name'),
file_path=entity_dict['file_path'],
line_start=entity_dict.get('line_start'),
line_end=entity_dict.get('line_end'),
signature=entity_dict.get('signature'),
docstring=entity_dict.get('docstring'),
calls=entity_dict.get('calls', []),
called_by=entity_dict.get('called_by', []),
imports=entity_dict.get('imports', []),
inherits=entity_dict.get('inherits', []),
indexed_at=entity_dict.get('indexed_at'),
)
session.add(entity)
await session.commit()
async def _index_in_qdrant(self, entities: List[Dict]):
"""Index entities in Qdrant for semantic search."""
from . import vectors
if not vectors.is_available():
return
points = []
for entity in entities:
# Create searchable text from signature and docstring
text = f"{entity['name']} {entity.get('signature', '')} {entity.get('docstring', '')}"
text = text.strip()
if not text:
continue
embedding = vectors.encode(text)
if embedding is None:
continue
# Decode the embedding bytes to list
embedding_list = vectors.decode(embedding)
if embedding_list is None:
continue
points.append({
"id": entity['id'],
"vector": embedding_list,
"payload": {
"entity_type": entity['entity_type'],
"name": entity['name'],
"file_path": entity['file_path'],
"project_path": entity['project_path'],
"signature": entity.get('signature', ''),
}
})
if points and self.qdrant is not None:
try:
self.qdrant.client.upsert(
collection_name="daem0n_code_entities",
points=points,
)
except Exception as e:
logger.warning(f"Failed to index in Qdrant: {e}")
async def find_entity(
self,
name: str,
project_path: Optional[str] = None,
entity_type: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""
Find a code entity by name.
Args:
name: Entity name to find
project_path: Limit search to a specific project
entity_type: Limit search to a specific type
Returns:
Entity dictionary or None
"""
if self.db is None:
return None
from .models import CodeEntity
from sqlalchemy import select
async with self.db.get_session() as session:
query = select(CodeEntity).where(CodeEntity.name == name)
if project_path:
query = query.where(CodeEntity.project_path == project_path)
if entity_type:
query = query.where(CodeEntity.entity_type == entity_type)
result = await session.execute(query)
entity = result.scalars().first()
if entity:
return {
'id': entity.id,
'name': entity.name,
'entity_type': entity.entity_type,
'file_path': entity.file_path,
'line_start': entity.line_start,
'line_end': entity.line_end,
'signature': entity.signature,
'docstring': entity.docstring,
}
return None
async def search_entities(
self,
query: str,
project_path: Optional[str] = None,
limit: int = 20,
) -> List[Dict[str, Any]]:
"""
Search across code entities.
Uses semantic search via Qdrant if available, falls back to
SQLite text matching on name and signature.
Args:
query: Search query
project_path: Limit search to a specific project
limit: Maximum results to return
Returns:
List of matching entities with scores
"""
from . import vectors
# Try semantic search first if Qdrant is available
if vectors.is_available() and self.qdrant is not None:
results = await self._semantic_search(query, project_path, limit)
if results:
return results
# Fall back to SQLite text search
return await self._text_search(query, project_path, limit)
async def _text_search(
self,
query: str,
project_path: Optional[str] = None,
limit: int = 20,
) -> List[Dict[str, Any]]:
"""Text-based search using SQLite LIKE queries."""
if self.db is None:
return []
from .models import CodeEntity
from sqlalchemy import select, or_
async with self.db.get_session() as session:
# Search name and signature for query terms
search_pattern = f"%{query}%"
stmt = select(CodeEntity).where(
or_(
CodeEntity.name.ilike(search_pattern),
CodeEntity.signature.ilike(search_pattern),
CodeEntity.docstring.ilike(search_pattern),
)
)
if project_path:
stmt = stmt.where(CodeEntity.project_path == project_path)
stmt = stmt.limit(limit)
result = await session.execute(stmt)
entities = result.scalars().all()
return [
{
'id': e.id,
'name': e.name,
'entity_type': e.entity_type,
'file_path': e.file_path,
'line_start': e.line_start,
'signature': e.signature,
'score': 1.0, # No relevance score for text search
}
for e in entities
]
async def _semantic_search(
self,
query: str,
project_path: Optional[str] = None,
limit: int = 20,
) -> List[Dict[str, Any]]:
"""Semantic search using Qdrant vector similarity."""
from . import vectors
if not vectors.is_available() or self.qdrant is None:
return []
# Encode query
embedding = vectors.encode(query)
if embedding is None:
return []
embedding_list = vectors.decode(embedding)
if embedding_list is None:
return []
try:
# Build filter if project_path specified
filter_conditions = None
if project_path:
filter_conditions = {
"must": [
{"key": "project_path", "match": {"value": project_path}}
]
}
results = self.qdrant.client.search(
collection_name="daem0n_code_entities",
query_vector=embedding_list,
limit=limit,
query_filter=filter_conditions,
)
return [
{
'id': r.id,
'score': r.score,
**r.payload,
}
for r in results
]
except Exception as e:
logger.warning(f"Qdrant search failed: {e}")
return []
async def analyze_impact(
self,
entity_name: str,
project_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
Analyze what would be affected by changing a code entity.
Args:
entity_name: Name of the entity to analyze
project_path: Limit analysis to a specific project
Returns:
Impact analysis with affected files and entities
"""
if self.db is None:
return {'error': 'Database not initialized'}
from .models import CodeEntity
from sqlalchemy import select
async with self.db.get_session() as session:
# Find the entity
query = select(CodeEntity).where(CodeEntity.name == entity_name)
if project_path:
query = query.where(CodeEntity.project_path == project_path)
result = await session.execute(query)
entity = result.scalars().first()
if not entity:
return {
'entity': entity_name,
'found': False,
'affected_files': [],
'affected_entities': [],
}
# Find entities that call this one or import it
affected = []
affected_files = set()
# Query for entities that reference this one in their calls or imports
# This is a simple implementation - in production, we'd want to analyze
# the actual code to find call sites
all_entities_query = select(CodeEntity)
if project_path:
all_entities_query = all_entities_query.where(
CodeEntity.project_path == project_path
)
all_result = await session.execute(all_entities_query)
all_entities = all_result.scalars().all()
for other in all_entities:
if other.id == entity.id:
continue
# Check if this entity's name appears in the other's calls
calls = other.calls or []
imports = other.imports or []
if entity_name in calls or entity_name in imports:
affected.append({
'name': other.name,
'type': other.entity_type,
'file': other.file_path,
'line': other.line_start,
})
affected_files.add(other.file_path)
return {
'entity': entity_name,
'found': True,
'entity_type': entity.entity_type,
'file_path': entity.file_path,
'line_start': entity.line_start,
'affected_files': list(affected_files),
'affected_entities': affected,
'message': f"Found {len(affected)} entities that may be affected",
}
async def index_file_if_changed(
self,
file_path: Path,
project_path: Path,
force: bool = False
) -> Dict[str, Any]:
"""
Index a single file only if its content has changed.
Returns:
Dict with changed, entities_count, reason, or error
"""
if file_path.suffix.lower() not in LANGUAGE_CONFIG:
return {'changed': False, 'reason': 'unsupported_extension'}
if self._should_skip(file_path):
return {'changed': False, 'reason': 'excluded_directory'}
try:
rel_path = str(file_path.relative_to(project_path))
except ValueError:
rel_path = str(file_path)
project_str = str(project_path.resolve())
# Compute current hash
try:
current_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
except (OSError, IOError) as e:
return {'changed': False, 'error': str(e)}
# Check stored hash
if not force and self.db is not None:
stored_hash = await self._get_stored_hash(project_str, rel_path)
if stored_hash == current_hash:
return {'changed': False, 'reason': 'unchanged'}
# Re-index
entities = list(self.indexer.index_file(file_path, project_path))
if self.db is not None:
await self._store_file_entities(entities, project_str, rel_path, current_hash)
if self.qdrant is not None:
await self._index_in_qdrant(entities)
return {'changed': True, 'entities_count': len(entities)}
async def _get_stored_hash(self, project_path: str, file_path: str) -> Optional[str]:
"""Get stored content hash for a file."""
from .models import FileHash
from sqlalchemy import select, and_
async with self.db.get_session() as session:
result = await session.execute(
select(FileHash.content_hash).where(
and_(
FileHash.project_path == project_path,
FileHash.file_path == file_path
)
)
)
row = result.scalar_one_or_none()
return row
async def _store_file_entities(
self,
entities: List[Dict],
project_path: str,
file_path: str,
content_hash: str
) -> None:
"""Store entities for a single file, replacing existing."""
from .models import CodeEntity, FileHash
from sqlalchemy import delete, and_, select
async with self.db.get_session() as session:
# Delete existing entities for this file only
await session.execute(
delete(CodeEntity).where(
and_(
CodeEntity.project_path == project_path,
CodeEntity.file_path == file_path
)
)
)
# Insert new entities
for entity_dict in entities:
entity = CodeEntity(
id=entity_dict['id'],
project_path=entity_dict['project_path'],
entity_type=entity_dict['entity_type'],
name=entity_dict['name'],
qualified_name=entity_dict.get('qualified_name'),
file_path=entity_dict['file_path'],
line_start=entity_dict.get('line_start'),
line_end=entity_dict.get('line_end'),
signature=entity_dict.get('signature'),
docstring=entity_dict.get('docstring'),
calls=entity_dict.get('calls', []),
called_by=entity_dict.get('called_by', []),
imports=entity_dict.get('imports', []),
inherits=entity_dict.get('inherits', []),
indexed_at=entity_dict.get('indexed_at'),
)
session.add(entity)
# Upsert file hash
existing = await session.execute(
select(FileHash).where(
and_(
FileHash.project_path == project_path,
FileHash.file_path == file_path
)
)
)
fh = existing.scalar_one_or_none()
if fh:
fh.content_hash = content_hash
fh.indexed_at = datetime.now(timezone.utc)
else:
session.add(FileHash(
project_path=project_path,
file_path=file_path,
content_hash=content_hash
))
await session.commit()