from pathlib import Path
from typing import Optional, Dict, Any
import asyncio
import logging
import threading
import time
from enum import Enum
import watchdog.observers
from watchdog.events import FileSystemEventHandler
from code_flow_graph.core.python_extractor import PythonASTExtractor
from code_flow_graph.core.python_extractor import PythonASTExtractor
from code_flow_graph.core.typescript_extractor import TypeScriptASTExtractor
from code_flow_graph.core.structured_extractor import StructuredDataExtractor
from code_flow_graph.core.call_graph_builder import CallGraphBuilder
from code_flow_graph.core.vector_store import CodeVectorStore
from code_flow_graph.mcp_server.llm import SummaryGenerator, SummaryProcessor
class AnalysisState(Enum):
"""Enum representing the current state of code analysis."""
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class WatcherHandler(FileSystemEventHandler):
def __init__(self, analyzer):
self.analyzer = analyzer
# Get supported file extensions based on language
language = self.analyzer.config.get('language', 'python').lower()
if language == 'typescript':
self.supported_extensions = ['.py', '.ts', '.tsx', '.js', '.jsx', '.json', '.yaml', '.yml'] # Support both for mixed projects
else:
self.supported_extensions = ['.py', '.json', '.yaml', '.yml']
def on_modified(self, event):
if any(event.src_path.endswith(ext) for ext in self.supported_extensions):
logging.info(f"File modified: {event.src_path}")
asyncio.run_coroutine_threadsafe(self.analyzer._incremental_update(event.src_path), self.analyzer.loop)
class MCPAnalyzer:
"""Analyzer for MCP server that extracts, builds, and stores code graph data."""
def __init__(self, config: dict):
"""Initialize the analyzer with configuration.
Args:
config: Configuration dictionary containing watch_directories, chromadb_path, and optional language
"""
self.config = config
root = Path(config['watch_directories'][0]).resolve()
language = config.get('language', 'python').lower()
logging.info(f"Initializing MCPAnalyzer with root: {root}, language: {language}")
# Initialize appropriate extractor based on language
if language == 'typescript':
self.extractor = TypeScriptASTExtractor()
else:
self.extractor = PythonASTExtractor()
self.extractor = PythonASTExtractor()
self.extractor.project_root = root
# Initialize structured data extractor
ignored_filenames = set(config.get('ignored_filenames', []))
self.structured_extractor = StructuredDataExtractor(ignored_filenames=ignored_filenames)
self.builder = CallGraphBuilder()
self.builder.project_root = root
self.vector_store: Optional[CodeVectorStore] = None
self.observer = None
# Analysis state tracking
self.analysis_state = AnalysisState.NOT_STARTED
self.analysis_task: Optional[asyncio.Task] = None
self.analysis_error: Optional[Exception] = None
# Initialize vector store if path exists
if Path(config['chromadb_path']).exists():
self.vector_store = CodeVectorStore(
persist_directory=config['chromadb_path'],
embedding_model_name=config.get('embedding_model', 'all-MiniLM-L6-v2'),
max_tokens=config.get('max_tokens', 256)
)
else:
logging.warning(f"ChromaDB path {config['chromadb_path']} does not exist, skipping vector store initialization")
# Background cleanup configuration
self.cleanup_interval = config.get('cleanup_interval_minutes', 30) # Default: 30 minutes
self.cleanup_task = None
self.cleanup_shutdown_event = threading.Event()
# Initialize Summary Processor if enabled
self.summary_processor = None
if config.get('summary_generation_enabled', False):
llm_config = config.get('llm_config', {})
self.summary_generator = SummaryGenerator(config)
self.summary_processor = SummaryProcessor(
generator=self.summary_generator,
builder=self.builder,
vector_store=self.vector_store,
concurrency=llm_config.get('concurrency', 5),
prioritize_entry_points=llm_config.get('prioritize_entry_points', False)
)
else:
logging.info("Summary generation disabled")
def start_background_cleanup(self) -> None:
"""
Start the background cleanup task that periodically removes stale file references.
This runs in a separate thread to avoid blocking the main event loop.
"""
if not self.vector_store:
logging.info("Vector store not available, skipping background cleanup")
return
def cleanup_worker():
"""Background worker function that runs the cleanup periodically."""
while not self.cleanup_shutdown_event.is_set():
try:
# Run cleanup if vector store is available
if self.vector_store:
logging.info("Running background cleanup of stale file references")
cleanup_stats = self.vector_store.cleanup_stale_references()
if cleanup_stats['removed_documents'] > 0:
logging.info(f"Cleanup removed {cleanup_stats['removed_documents']} stale references")
# Wait for next cleanup interval or shutdown event
self.cleanup_shutdown_event.wait(timeout=self.cleanup_interval * 60)
except Exception as e:
logging.error(f"Error in background cleanup: {e}")
# Wait a bit before retrying on error
time.sleep(60)
# Start cleanup thread
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
self.cleanup_task = cleanup_thread
logging.info(f"Background cleanup started (interval: {self.cleanup_interval} minutes)")
def stop_background_cleanup(self) -> None:
"""Stop the background cleanup task."""
if self.cleanup_task and self.cleanup_task.is_alive():
self.cleanup_shutdown_event.set()
self.cleanup_task.join(timeout=10) # Wait up to 10 seconds
logging.info("Background cleanup stopped")
def is_ready(self) -> bool:
"""Check if analysis is complete and ready for queries.
Returns:
True if analysis is completed, False otherwise
"""
return self.analysis_state == AnalysisState.COMPLETED
async def wait_for_analysis(self, timeout: Optional[float] = None) -> bool:
"""Wait for analysis to complete.
Args:
timeout: Maximum time to wait in seconds (None = wait indefinitely)
Returns:
True if analysis completed successfully, False if timeout or failed
"""
if self.analysis_state == AnalysisState.COMPLETED:
return True
if self.analysis_task is None:
return False
try:
await asyncio.wait_for(self.analysis_task, timeout=timeout)
return self.analysis_state == AnalysisState.COMPLETED
except asyncio.TimeoutError:
return False
async def start_analysis(self) -> None:
"""Start code analysis in the background.
This method starts the analysis process as a background task,
allowing the server to become available immediately.
"""
if self.analysis_state != AnalysisState.NOT_STARTED:
logging.warning(f"Analysis already started (state: {self.analysis_state.value})")
return
async def run_analysis():
"""Background task that runs the analysis."""
try:
self.analysis_state = AnalysisState.IN_PROGRESS
logging.info("Background analysis started")
await self.analyze()
self.analysis_state = AnalysisState.COMPLETED
logging.info(f"Background analysis completed: indexed {len(self.builder.functions)} functions")
except Exception as e:
self.analysis_state = AnalysisState.FAILED
self.analysis_error = e
logging.error(f"Background analysis failed: {e}", exc_info=True)
# Create the background task
self.analysis_task = asyncio.create_task(run_analysis())
# Start summary processor if enabled
if self.summary_processor:
self.summary_processor.start()
logging.info("Analysis task started in background")
async def analyze(self) -> None:
"""Analyze the codebase by extracting elements, building graph, and populating vector store."""
# Extract code elements from all watch directories
all_elements = []
for watch_dir in self.config['watch_directories']:
logging.info(f"Extracting elements from directory: {watch_dir}")
elements = await asyncio.to_thread(self.extractor.extract_from_directory, Path(watch_dir))
logging.info(f"Found {len(elements)} elements in {watch_dir}")
all_elements.extend(elements)
# Extract structured data
logging.info(f"Extracting structured data from directory: {watch_dir}")
structured_elements = await asyncio.to_thread(self.structured_extractor.extract_from_directory, Path(watch_dir))
logging.info(f"Found {len(structured_elements)} structured elements in {watch_dir}")
# Add structured elements to vector store immediately (they don't need graph building)
if self.vector_store and structured_elements:
await asyncio.to_thread(self.vector_store.add_structured_elements_batch, structured_elements)
logging.info(f"Total code elements extracted from {len(self.config['watch_directories'])} directories: {len(all_elements)}")
# Build call graph with all elements
self.builder.build_from_elements(all_elements)
# Populate vector store if available
if self.vector_store:
await asyncio.to_thread(self._populate_vector_store)
else:
logging.info("Vector store not available, skipping population")
# Start file watchers for all directories
self.loop = asyncio.get_running_loop()
observer = watchdog.observers.Observer()
for watch_dir in self.config['watch_directories']:
observer.schedule(WatcherHandler(analyzer=self), watch_dir, recursive=True)
logging.info(f"Started watching directory: {watch_dir}")
observer.start()
self.observer = observer
# Start background cleanup task
self.start_background_cleanup()
# Enqueue missing summaries if processor is active
if self.summary_processor and self.vector_store:
logging.info("Checking for nodes missing summaries...")
missing_fqns = await asyncio.to_thread(self.vector_store.get_nodes_missing_summary)
if missing_fqns:
logging.info(f"Found {len(missing_fqns)} nodes missing summaries, enqueueing...")
for fqn in missing_fqns:
self.summary_processor.enqueue(fqn)
# Also enqueue all newly added functions from this analysis run
# (Optimization: get_nodes_missing_summary might cover this if we did it after population)
# But since we just populated, they are likely missing summaries unless they were already there.
# Actually, get_nodes_missing_summary is the source of truth.
pass
def _populate_vector_store(self) -> None:
"""Populate the vector store with functions and edges from the builder."""
graph_functions = list(self.builder.functions.values())
# Read all source files first
sources = {}
for node in graph_functions:
if node.file_path not in sources:
try:
with open(node.file_path, 'r', encoding='utf-8') as f:
sources[node.file_path] = f.read()
except Exception as e:
logging.warning(f"Could not read source file {node.file_path}: {e}")
sources[node.file_path] = ""
# Batch store functions
try:
self.vector_store.add_function_nodes_batch(graph_functions, sources, batch_size=512)
except Exception as e:
logging.warning(f"Batch function storage failed, falling back to individual: {e}")
for node in graph_functions:
try:
source = sources.get(node.file_path, "")
self.vector_store.add_function_node(node, source)
except Exception as e2:
logging.warning(f"Could not process/store node {node.fully_qualified_name}: {e2}")
# Batch store edges
try:
self.vector_store.add_edges_batch(self.builder.edges, batch_size=512)
except Exception as e:
logging.warning(f"Batch edge storage failed, falling back to individual: {e}")
for edge in self.builder.edges:
try:
self.vector_store.add_edge(edge)
except Exception as e2:
logging.warning(f"Could not add edge {edge.caller} -> {edge.callee}: {e2}")
async def _incremental_update(self, file_path: str):
logging.info(f"Starting incremental update for {file_path}")
await asyncio.sleep(1) # Debounce stub
# Handle structured data files
if any(file_path.endswith(ext) for ext in ['.json', '.yaml', '.yml']):
elements = await asyncio.to_thread(self.structured_extractor.extract_from_file, Path(file_path))
logging.info(f"Extracted {len(elements)} structured elements from {file_path}")
if self.vector_store:
await asyncio.to_thread(self.vector_store.add_structured_elements_batch, elements)
return
elements = await asyncio.to_thread(self.extractor.extract_from_file, Path(file_path))
logging.info(f"Extracted {len(elements)} elements from {file_path}")
# Update builder/store incrementally (add new, skip same hash); if delete, remove by fqn.
# For simplicity, re-analyze the file and update
# Assuming elements have hash or something, but for now, just add/update
for element in elements:
if element.fqn not in self.builder.functions:
self.builder.functions[element.fqn] = element
# For vector store, if available, add if not present
if self.vector_store and element.fqn not in [n.fqn for n in self.vector_store.get_all_nodes()]:
with open(element.file_path, 'r', encoding='utf-8') as f:
source = f.read()
self.vector_store.add_function_node(element, source)
# Enqueue for summarization if enabled
if self.summary_processor:
self.summary_processor.enqueue(element.fqn)
async def cleanup_stale_references(self) -> Dict[str, Any]:
"""
Manually trigger cleanup of stale file references.
Useful for immediate cleanup without waiting for the background task.
Returns:
Dict with cleanup statistics
"""
if not self.vector_store:
return {'removed_documents': 0, 'errors': 0, 'message': 'Vector store not available'}
return self.vector_store.cleanup_stale_references()
def shutdown(self) -> None:
"""Shutdown the analyzer and cleanup resources."""
# Stop background cleanup
self.stop_background_cleanup()
# Stop file watcher
if self.observer:
self.observer.stop()
self.observer.join()
# Stop summary processor
if self.summary_processor:
# We need to run async stop in a sync context or fire and forget
# Since shutdown is sync, we can try to run it if loop is running
try:
if self.loop and self.loop.is_running():
asyncio.run_coroutine_threadsafe(self.summary_processor.stop(), self.loop)
except Exception as e:
logging.error(f"Error stopping summary processor: {e}")