Skip to main content
Glama
analyzer.py17 kB
from pathlib import Path from typing import Optional, Dict, Any import asyncio import logging import threading import time from enum import Enum import watchdog.observers from watchdog.events import FileSystemEventHandler from code_flow_graph.core.python_extractor import PythonASTExtractor from code_flow_graph.core.python_extractor import PythonASTExtractor from code_flow_graph.core.typescript_extractor import TypeScriptASTExtractor from code_flow_graph.core.structured_extractor import StructuredDataExtractor from code_flow_graph.core.call_graph_builder import CallGraphBuilder from code_flow_graph.core.vector_store import CodeVectorStore from code_flow_graph.mcp_server.llm import SummaryGenerator, SummaryProcessor class AnalysisState(Enum): """Enum representing the current state of code analysis.""" NOT_STARTED = "not_started" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" class WatcherHandler(FileSystemEventHandler): def __init__(self, analyzer): self.analyzer = analyzer # Get supported file extensions based on language language = self.analyzer.config.get('language', 'python').lower() if language == 'typescript': self.supported_extensions = ['.py', '.ts', '.tsx', '.js', '.jsx', '.json', '.yaml', '.yml'] # Support both for mixed projects else: self.supported_extensions = ['.py', '.json', '.yaml', '.yml'] def on_modified(self, event): if any(event.src_path.endswith(ext) for ext in self.supported_extensions): logging.info(f"File modified: {event.src_path}") asyncio.run_coroutine_threadsafe(self.analyzer._incremental_update(event.src_path), self.analyzer.loop) class MCPAnalyzer: """Analyzer for MCP server that extracts, builds, and stores code graph data.""" def __init__(self, config: dict): """Initialize the analyzer with configuration. Args: config: Configuration dictionary containing watch_directories, chromadb_path, and optional language """ self.config = config root = Path(config['watch_directories'][0]).resolve() language = config.get('language', 'python').lower() logging.info(f"Initializing MCPAnalyzer with root: {root}, language: {language}") # Initialize appropriate extractor based on language if language == 'typescript': self.extractor = TypeScriptASTExtractor() else: self.extractor = PythonASTExtractor() self.extractor = PythonASTExtractor() self.extractor.project_root = root # Initialize structured data extractor ignored_filenames = set(config.get('ignored_filenames', [])) self.structured_extractor = StructuredDataExtractor(ignored_filenames=ignored_filenames) self.builder = CallGraphBuilder() self.builder.project_root = root self.vector_store: Optional[CodeVectorStore] = None self.observer = None # Analysis state tracking self.analysis_state = AnalysisState.NOT_STARTED self.analysis_task: Optional[asyncio.Task] = None self.analysis_error: Optional[Exception] = None # Initialize vector store if path exists if Path(config['chromadb_path']).exists(): self.vector_store = CodeVectorStore( persist_directory=config['chromadb_path'], embedding_model_name=config.get('embedding_model', 'all-MiniLM-L6-v2'), max_tokens=config.get('max_tokens', 256) ) else: logging.warning(f"ChromaDB path {config['chromadb_path']} does not exist, skipping vector store initialization") # Background cleanup configuration self.cleanup_interval = config.get('cleanup_interval_minutes', 30) # Default: 30 minutes self.cleanup_task = None self.cleanup_shutdown_event = threading.Event() # Initialize Summary Processor if enabled self.summary_processor = None if config.get('summary_generation_enabled', False): llm_config = config.get('llm_config', {}) self.summary_generator = SummaryGenerator(config) self.summary_processor = SummaryProcessor( generator=self.summary_generator, builder=self.builder, vector_store=self.vector_store, concurrency=llm_config.get('concurrency', 5), prioritize_entry_points=llm_config.get('prioritize_entry_points', False) ) else: logging.info("Summary generation disabled") def start_background_cleanup(self) -> None: """ Start the background cleanup task that periodically removes stale file references. This runs in a separate thread to avoid blocking the main event loop. """ if not self.vector_store: logging.info("Vector store not available, skipping background cleanup") return def cleanup_worker(): """Background worker function that runs the cleanup periodically.""" while not self.cleanup_shutdown_event.is_set(): try: # Run cleanup if vector store is available if self.vector_store: logging.info("Running background cleanup of stale file references") cleanup_stats = self.vector_store.cleanup_stale_references() if cleanup_stats['removed_documents'] > 0: logging.info(f"Cleanup removed {cleanup_stats['removed_documents']} stale references") # Wait for next cleanup interval or shutdown event self.cleanup_shutdown_event.wait(timeout=self.cleanup_interval * 60) except Exception as e: logging.error(f"Error in background cleanup: {e}") # Wait a bit before retrying on error time.sleep(60) # Start cleanup thread cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) cleanup_thread.start() self.cleanup_task = cleanup_thread logging.info(f"Background cleanup started (interval: {self.cleanup_interval} minutes)") def stop_background_cleanup(self) -> None: """Stop the background cleanup task.""" if self.cleanup_task and self.cleanup_task.is_alive(): self.cleanup_shutdown_event.set() self.cleanup_task.join(timeout=10) # Wait up to 10 seconds logging.info("Background cleanup stopped") def is_ready(self) -> bool: """Check if analysis is complete and ready for queries. Returns: True if analysis is completed, False otherwise """ return self.analysis_state == AnalysisState.COMPLETED async def wait_for_analysis(self, timeout: Optional[float] = None) -> bool: """Wait for analysis to complete. Args: timeout: Maximum time to wait in seconds (None = wait indefinitely) Returns: True if analysis completed successfully, False if timeout or failed """ if self.analysis_state == AnalysisState.COMPLETED: return True if self.analysis_task is None: return False try: await asyncio.wait_for(self.analysis_task, timeout=timeout) return self.analysis_state == AnalysisState.COMPLETED except asyncio.TimeoutError: return False async def start_analysis(self) -> None: """Start code analysis in the background. This method starts the analysis process as a background task, allowing the server to become available immediately. """ if self.analysis_state != AnalysisState.NOT_STARTED: logging.warning(f"Analysis already started (state: {self.analysis_state.value})") return async def run_analysis(): """Background task that runs the analysis.""" try: self.analysis_state = AnalysisState.IN_PROGRESS logging.info("Background analysis started") await self.analyze() self.analysis_state = AnalysisState.COMPLETED logging.info(f"Background analysis completed: indexed {len(self.builder.functions)} functions") except Exception as e: self.analysis_state = AnalysisState.FAILED self.analysis_error = e logging.error(f"Background analysis failed: {e}", exc_info=True) # Create the background task self.analysis_task = asyncio.create_task(run_analysis()) # Start summary processor if enabled if self.summary_processor: self.summary_processor.start() logging.info("Analysis task started in background") async def analyze(self) -> None: """Analyze the codebase by extracting elements, building graph, and populating vector store.""" # Extract code elements from all watch directories all_elements = [] for watch_dir in self.config['watch_directories']: logging.info(f"Extracting elements from directory: {watch_dir}") elements = await asyncio.to_thread(self.extractor.extract_from_directory, Path(watch_dir)) logging.info(f"Found {len(elements)} elements in {watch_dir}") all_elements.extend(elements) # Extract structured data logging.info(f"Extracting structured data from directory: {watch_dir}") structured_elements = await asyncio.to_thread(self.structured_extractor.extract_from_directory, Path(watch_dir)) logging.info(f"Found {len(structured_elements)} structured elements in {watch_dir}") # Add structured elements to vector store immediately (they don't need graph building) if self.vector_store and structured_elements: await asyncio.to_thread(self.vector_store.add_structured_elements_batch, structured_elements) logging.info(f"Total code elements extracted from {len(self.config['watch_directories'])} directories: {len(all_elements)}") # Build call graph with all elements self.builder.build_from_elements(all_elements) # Populate vector store if available if self.vector_store: await asyncio.to_thread(self._populate_vector_store) else: logging.info("Vector store not available, skipping population") # Start file watchers for all directories self.loop = asyncio.get_running_loop() observer = watchdog.observers.Observer() for watch_dir in self.config['watch_directories']: observer.schedule(WatcherHandler(analyzer=self), watch_dir, recursive=True) logging.info(f"Started watching directory: {watch_dir}") observer.start() self.observer = observer # Start background cleanup task self.start_background_cleanup() # Enqueue missing summaries if processor is active if self.summary_processor and self.vector_store: logging.info("Checking for nodes missing summaries...") missing_fqns = await asyncio.to_thread(self.vector_store.get_nodes_missing_summary) if missing_fqns: logging.info(f"Found {len(missing_fqns)} nodes missing summaries, enqueueing...") for fqn in missing_fqns: self.summary_processor.enqueue(fqn) # Also enqueue all newly added functions from this analysis run # (Optimization: get_nodes_missing_summary might cover this if we did it after population) # But since we just populated, they are likely missing summaries unless they were already there. # Actually, get_nodes_missing_summary is the source of truth. pass def _populate_vector_store(self) -> None: """Populate the vector store with functions and edges from the builder.""" graph_functions = list(self.builder.functions.values()) # Read all source files first sources = {} for node in graph_functions: if node.file_path not in sources: try: with open(node.file_path, 'r', encoding='utf-8') as f: sources[node.file_path] = f.read() except Exception as e: logging.warning(f"Could not read source file {node.file_path}: {e}") sources[node.file_path] = "" # Batch store functions try: self.vector_store.add_function_nodes_batch(graph_functions, sources, batch_size=512) except Exception as e: logging.warning(f"Batch function storage failed, falling back to individual: {e}") for node in graph_functions: try: source = sources.get(node.file_path, "") self.vector_store.add_function_node(node, source) except Exception as e2: logging.warning(f"Could not process/store node {node.fully_qualified_name}: {e2}") # Batch store edges try: self.vector_store.add_edges_batch(self.builder.edges, batch_size=512) except Exception as e: logging.warning(f"Batch edge storage failed, falling back to individual: {e}") for edge in self.builder.edges: try: self.vector_store.add_edge(edge) except Exception as e2: logging.warning(f"Could not add edge {edge.caller} -> {edge.callee}: {e2}") async def _incremental_update(self, file_path: str): logging.info(f"Starting incremental update for {file_path}") await asyncio.sleep(1) # Debounce stub # Handle structured data files if any(file_path.endswith(ext) for ext in ['.json', '.yaml', '.yml']): elements = await asyncio.to_thread(self.structured_extractor.extract_from_file, Path(file_path)) logging.info(f"Extracted {len(elements)} structured elements from {file_path}") if self.vector_store: await asyncio.to_thread(self.vector_store.add_structured_elements_batch, elements) return elements = await asyncio.to_thread(self.extractor.extract_from_file, Path(file_path)) logging.info(f"Extracted {len(elements)} elements from {file_path}") # Update builder/store incrementally (add new, skip same hash); if delete, remove by fqn. # For simplicity, re-analyze the file and update # Assuming elements have hash or something, but for now, just add/update for element in elements: if element.fqn not in self.builder.functions: self.builder.functions[element.fqn] = element # For vector store, if available, add if not present if self.vector_store and element.fqn not in [n.fqn for n in self.vector_store.get_all_nodes()]: with open(element.file_path, 'r', encoding='utf-8') as f: source = f.read() self.vector_store.add_function_node(element, source) # Enqueue for summarization if enabled if self.summary_processor: self.summary_processor.enqueue(element.fqn) async def cleanup_stale_references(self) -> Dict[str, Any]: """ Manually trigger cleanup of stale file references. Useful for immediate cleanup without waiting for the background task. Returns: Dict with cleanup statistics """ if not self.vector_store: return {'removed_documents': 0, 'errors': 0, 'message': 'Vector store not available'} return self.vector_store.cleanup_stale_references() def shutdown(self) -> None: """Shutdown the analyzer and cleanup resources.""" # Stop background cleanup self.stop_background_cleanup() # Stop file watcher if self.observer: self.observer.stop() self.observer.join() # Stop summary processor if self.summary_processor: # We need to run async stop in a sync context or fire and forget # Since shutdown is sync, we can try to run it if loop is running try: if self.loop and self.loop.is_running(): asyncio.run_coroutine_threadsafe(self.summary_processor.stop(), self.loop) except Exception as e: logging.error(f"Error stopping summary processor: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mrorigo/code-flow-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server