Skip to main content
Glama

MCP Document Indexer

by yairwein
main.py13.1 kB
#!/usr/bin/env python3 """MCP Document Indexer - Main entry point.""" import asyncio import logging import signal import sys import subprocess import time import os from pathlib import Path from typing import Optional from fastmcp import FastMCP from .config import get_config from .parser import DocumentParser, is_supported_file from .llm import LocalLLM, DocumentProcessor from .indexer import DocumentIndexer from .monitor import FileMonitor, IndexingQueue from .tools import DocumentTools # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class DocumentIndexerService: """Main service orchestrating document indexing.""" def __init__(self): self.config = get_config() self.parser = None self.llm = None self.processor = None self.indexer = None self.monitor = None self.indexing_queue = None self.tools = None self.mcp = None self.running = False self._indexing_task = None async def initialize(self): """Initialize all components.""" logger.info("Initializing Document Indexer Service...") # Initialize components self.parser = DocumentParser( chunk_size=self.config.chunk_size, chunk_overlap=self.config.chunk_overlap ) self.llm = LocalLLM( model=self.config.llm_model, base_url=self.config.ollama_base_url ) await self.llm.initialize() self.processor = DocumentProcessor(self.llm) self.indexer = DocumentIndexer( db_path=self.config.lancedb_path, embedding_model=self.config.embedding_model ) await self.indexer.initialize() self.indexing_queue = IndexingQueue(max_concurrent=5) # Initialize MCP tools self.tools = DocumentTools(self.indexer, self.parser, self.processor) # Initialize file monitor if folders configured if self.config.watch_folders: self.monitor = FileMonitor( watch_folders=self.config.watch_folders, file_extensions=self.config.file_extensions, callback=self.handle_file_event ) logger.info("Service initialized successfully") async def handle_file_event(self, event_type: str, file_path: str): """Handle file system events.""" file_path = Path(file_path) if event_type == 'delete': # Remove from index await self.indexer.remove_document(str(file_path)) elif event_type in ['create', 'modify']: # Add to indexing queue priority = 3 if event_type == 'create' else 5 await self.indexing_queue.add_file(file_path, priority) async def index_file(self, file_path: Path) -> bool: """Index a single file.""" try: logger.info(f" → Starting processing: {file_path.name}") # Check file size file_size_mb = file_path.stat().st_size / (1024 * 1024) if file_size_mb > self.config.max_file_size_mb: logger.warning(f" → Skipping: File too large ({file_size_mb:.1f}MB > {self.config.max_file_size_mb}MB)") return False logger.info(f" → [1/3] Parsing document ({file_size_mb:.2f}MB)...") # Parse document doc_data = self.parser.parse_file(file_path) logger.info(f" → [1/3] ✓ Parsed: {doc_data['total_chars']:,} chars, {doc_data['num_chunks']} chunks, {doc_data['total_tokens']:,} tokens") # Check if document is already indexed before expensive LLM processing metadata = doc_data['metadata'] if await self.indexer.is_document_indexed(metadata['file_path'], metadata['file_hash']): logger.info(f" → [2/3] ⟳ Document unchanged (already indexed)") return False # Process with LLM (only if not already indexed) logger.info(f" → [2/3] Processing with LLM ({self.config.llm_model})...") doc_data = await self.processor.process_document(doc_data) logger.info(f" → [2/3] ✓ Generated summary ({len(doc_data.get('summary', ''))} chars) and {len(doc_data.get('keywords', []))} keywords") # Index document logger.info(f" → [3/3] Generating embeddings and indexing to LanceDB...") success = await self.indexer.index_document(doc_data) if success: logger.info(f" → [3/3] ✓ Successfully indexed document with {doc_data['num_chunks']} chunks") else: logger.info(f" → [3/3] ⟳ Document unchanged (already indexed)") return success except Exception as e: logger.error(f"Error indexing {file_path}: {e}") self.indexing_queue.mark_failed(file_path, str(e)) return False async def process_indexing_queue(self): """Process files in the indexing queue.""" processed_count = 0 total_to_process = 0 while self.running: try: # Get next file from queue file_path = await self.indexing_queue.get_next() if file_path: # Log progress queue_stats = self.indexing_queue.get_stats() if processed_count == 0: total_to_process = queue_stats['queued'] + queue_stats['processing'] processed_count += 1 logger.info(f"[{processed_count}/{total_to_process}] Processing: {file_path.name}") success = await self.index_file(file_path) if success: self.indexing_queue.mark_complete(file_path) logger.info(f"[{processed_count}/{total_to_process}] ✓ Indexed: {file_path.name}") else: self.indexing_queue.mark_complete(file_path) logger.info(f"[{processed_count}/{total_to_process}] ⟳ Unchanged: {file_path.name}") # Log queue status every 10 files if processed_count % 10 == 0: logger.info(f"Progress: {processed_count} processed, {queue_stats['queued']} remaining in queue") else: # No files to process, wait a bit await asyncio.sleep(1) except Exception as e: logger.error(f"Error processing file: {e}") if file_path: self.indexing_queue.mark_failed(file_path, str(e)) await asyncio.sleep(1) async def initial_scan(self): """Perform initial scan of watched folders.""" if not self.monitor: return logger.info("Starting initial document scan...") existing_files = await self.monitor.scan_existing_files() # Limit initial scan for large folders max_initial_files = int(os.getenv("MAX_INITIAL_FILES", "50")) if len(existing_files) > max_initial_files: logger.warning(f"Found {len(existing_files)} files, limiting initial scan to {max_initial_files} most recent files") logger.info("Set MAX_INITIAL_FILES env var to increase this limit") # Sort by modification time and take most recent existing_files = sorted(existing_files, key=lambda x: x.stat().st_mtime, reverse=True)[:max_initial_files] # Add files to indexing queue with lower priority added_count = 0 for file_path in existing_files: if is_supported_file(file_path, self.config.file_extensions): await self.indexing_queue.add_file(file_path, priority=8) added_count += 1 logger.info(f"Added {added_count} files to indexing queue") async def start(self): """Start the service.""" if self.running: return self.running = True # Start file monitor if self.monitor: self.monitor.start() # Start monitor event processing asyncio.create_task(self.monitor.process_events()) # Perform initial scan await self.initial_scan() # Start indexing queue processor self._indexing_task = asyncio.create_task(self.process_indexing_queue()) logger.info("Document Indexer Service started") async def stop(self): """Stop the service.""" if not self.running: return self.running = False # Stop monitor if self.monitor: self.monitor.stop() # Cancel indexing task if self._indexing_task: self._indexing_task.cancel() try: await self._indexing_task except asyncio.CancelledError: pass # Clean up resources if self.llm: await self.llm.close() if self.indexer: await self.indexer.close() logger.info("Document Indexer Service stopped") def setup_mcp_server(self) -> FastMCP: """Set up the MCP server with tools.""" mcp = FastMCP("mcp-doc-indexer", instructions="Local document indexing and search with LanceDB") # Register tools mcp.tool(self.tools.search_documents) mcp.tool(self.tools.get_catalog) mcp.tool(self.tools.get_document_info) mcp.tool(self.tools.reindex_document) mcp.tool(self.tools.get_indexing_stats) return mcp def ensure_ollama_running(): """Ensure Ollama is running, start it if not.""" try: # Check if Ollama is running result = subprocess.run( ["curl", "-s", "http://localhost:11434/api/tags"], capture_output=True, timeout=2 ) if result.returncode == 0: logger.info("Ollama is already running") return True except Exception: pass # Try to start Ollama logger.info("Starting Ollama serve in background...") try: # Start ollama serve in background process = subprocess.Popen( ["ollama", "serve"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True ) # Wait a bit for it to start time.sleep(3) # Check if it's running now result = subprocess.run( ["curl", "-s", "http://localhost:11434/api/tags"], capture_output=True, timeout=2 ) if result.returncode == 0: logger.info("Ollama serve started successfully") return True else: logger.warning("Ollama serve started but not responding yet") return False except FileNotFoundError: logger.error("Ollama not found. Please install from https://ollama.com/download") return False except Exception as e: logger.error(f"Failed to start Ollama: {e}") return False async def main(): """Main entry point.""" # Ensure Ollama is running ensure_ollama_running() # Create service service = DocumentIndexerService() try: # Initialize service await service.initialize() # Start service await service.start() # Set up MCP server mcp = service.setup_mcp_server() # Setup signal handlers def signal_handler(sig, frame): logger.info("Shutting down...") asyncio.create_task(service.stop()) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) logger.info("MCP Document Indexer running...") logger.info(f"Watching folders: {service.config.watch_folders}") logger.info(f"Database path: {service.config.lancedb_path}") # Run the MCP server in stdio mode for Claude Desktop await mcp.run_stdio_async() except KeyboardInterrupt: logger.info("Interrupted by user") except Exception as e: logger.error(f"Service error: {e}", exc_info=True) finally: await service.stop() def run(): """Run the service.""" asyncio.run(main()) if __name__ == "__main__": run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yairwein/document-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server