Skip to main content
Glama
juanqui
by juanqui
main.py75.3 kB
"""Main MCP server implementation using FastMCP.""" import asyncio import json import logging import time from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Optional from fastmcp import FastMCP from .config import ServerConfig from .document_processor import DocumentProcessor from .embeddings import EmbeddingService from .exceptions import ( ConfigurationError, DocumentNotFoundError, EmbeddingError, PDFProcessingError, ValidationError, VectorStoreError, ) from .file_monitor import FileMonitor from .intelligent_cache import IntelligentCacheManager from .models import Document, SearchQuery from .vector_store import VectorStore if TYPE_CHECKING: from .background_queue import BackgroundProcessingQueue logger = logging.getLogger(__name__) class PDFKnowledgebaseServer: """Main MCP server implementation for PDF knowledgebase management.""" def __init__( self, config: Optional[ServerConfig] = None, background_queue: Optional["BackgroundProcessingQueue"] = None ): """Initialize the PDF knowledgebase server. Args: config: Server configuration. If None, loads from environment. background_queue: Optional background processing queue for non-blocking operations. """ self.config = config or ServerConfig.from_env() self.app = FastMCP("PDF Knowledgebase") self.document_processor: Optional[DocumentProcessor] = None self.vector_store: Optional[VectorStore] = None self.embedding_service: Optional[EmbeddingService] = None self.reranker_service = None self.summarizer_service = None self.file_monitor: Optional[FileMonitor] = None self.cache_manager: Optional[IntelligentCacheManager] = None self.background_queue = background_queue self._web_document_service = None # Optional reference to web document service # Document metadata cache self._document_cache: Dict[str, Document] = {} self._cache_file = self.config.metadata_path / "documents.json" # Semaphores for controlling parallel processing self._parsing_semaphore = asyncio.Semaphore(self.config.max_parallel_parsing) self._embedding_semaphore = asyncio.Semaphore(self.config.max_parallel_embedding) self._setup_tools() self._setup_resources() async def initialize_core(self) -> None: """Initialize core components (excluding FileMonitor) asynchronously.""" try: logger.info("Initializing PDF Knowledgebase server core components...") # Initialize cache manager self.cache_manager = IntelligentCacheManager(self.config, self.config.cache_dir) # Check for configuration changes and handle selective cache invalidation await self._handle_intelligent_config_changes() # Initialize components in order self.embedding_service = EmbeddingService(self.config) await self.embedding_service.initialize() # Initialize reranker service if enabled if self.config.enable_reranker: from .reranker_factory import get_reranker_service try: self.reranker_service = get_reranker_service(self.config) if self.reranker_service: await self.reranker_service.initialize() logger.info("Reranker service initialized successfully") except Exception as e: logger.error(f"Failed to initialize reranker service: {e}") logger.warning("Continuing without reranker") self.reranker_service = None # Initialize summarizer service if enabled self.summarizer_service = None if self.config.enable_summarizer: try: from .summarizer_factory import create_summarizer_service self.summarizer_service = create_summarizer_service(self.config) if self.summarizer_service: await self.summarizer_service.initialize() logger.info("Summarizer service initialized successfully") except Exception as e: logger.error(f"Failed to initialize summarizer service: {e}") logger.warning("Continuing without summarizer") self.summarizer_service = None self.vector_store = VectorStore(self.config) self.vector_store.set_embedding_service(self.embedding_service) self.vector_store.set_reranker_service(self.reranker_service) await self.vector_store.initialize() self.document_processor = DocumentProcessor( self.config, self.embedding_service, self.cache_manager, self._embedding_semaphore, self.summarizer_service, ) # Log startup configuration summary for diagnostics try: parser_name = ( type(self.document_processor.parser).__name__ if self.document_processor else str(self.config.pdf_parser) ) chunker_name = ( type(self.document_processor.chunker).__name__ if self.document_processor else str(self.config.document_chunker) ) except Exception: parser_name = str(self.config.pdf_parser) chunker_name = str(self.config.pdf_chunker) reranker_info = f"Enabled ({self.config.reranker_model})" if self.config.enable_reranker else "Disabled" logger.info( "Startup configuration: Parser=%s, Chunker=%s, EmbeddingModel=%s, Reranker=%s, " "KnowledgebasePath=%s, CacheDir=%s", parser_name, chunker_name, self.config.embedding_model, reranker_info, self.config.knowledgebase_path, self.config.cache_dir, ) # Load document metadata cache BEFORE re-processing (needed for re-summarization) await self._load_document_cache() # Handle re-processing of cached documents after components are initialized await self._handle_post_initialization_reprocessing() # Update intelligent cache fingerprints self.cache_manager.update_fingerprints() logger.info("PDF Knowledgebase server core components initialized successfully") except Exception as e: logger.error(f"Failed to initialize core components: {e}") raise ConfigurationError(f"Core initialization failed: {e}") async def initialize_file_monitor(self, web_document_service=None) -> None: """Initialize file monitor with optional web document service. Args: web_document_service: Optional web document service for in-progress tracking """ try: logger.info("Initializing file monitor...") # Store web document service reference self._web_document_service = web_document_service self.file_monitor = FileMonitor( self.config, self.document_processor, self.vector_store, self._update_document_cache, background_queue=self.background_queue, web_document_service=self._web_document_service, ) await self.file_monitor.start_monitoring() logger.info("File monitor initialized successfully") except Exception as e: logger.error(f"Failed to initialize file monitor: {e}") raise ConfigurationError(f"File monitor initialization failed: {e}") async def initialize(self, web_document_service=None) -> None: """Initialize all components asynchronously. Args: web_document_service: Optional web document service for in-progress tracking """ await self.initialize_core() await self.initialize_file_monitor(web_document_service) async def _handle_intelligent_config_changes(self) -> None: """Handle configuration changes using intelligent cache management for selective invalidation.""" try: changes = self.cache_manager.detect_config_changes() # Check if any changes detected if not any(changes.values()): logger.info("No configuration changes detected, using existing caches") self._config_changes = None return logger.info(f"Configuration changes detected: {changes}") # Store changes for later re-processing after components are initialized self._config_changes = changes # Handle parsing changes - most destructive, affects everything downstream if changes["parsing"]: logger.warning("Parsing configuration changed - full database reset required") await self._reset_all_caches() return # Handle chunking changes - affects chunking and embedding stages if changes["chunking"]: logger.warning("Chunking configuration changed - clearing chunking and embedding caches") await self._reset_chunking_and_embedding_caches() # Handle embedding-only changes - least destructive elif changes["embedding"]: logger.info("Embedding configuration changed - clearing embedding caches only") await self._reset_embedding_caches() # Handle summarizer-only changes - no cache operations needed elif changes["summarizer"]: logger.info("Summarizer configuration changed - will re-summarize documents") # No cache clearing needed for summarizer changes logger.info("Selective cache invalidation complete") except Exception as e: logger.error(f"Error handling intelligent configuration changes: {e}") # Fallback to full reset on error logger.warning("Falling back to full cache reset due to error") await self._reset_all_caches() self._config_changes = None async def _reset_all_caches(self) -> None: """Reset all caches and vector database (full reset).""" try: logger.info("Performing full cache reset...") # Initialize vector store temporarily to reset it temp_vector_store = VectorStore(self.config) await temp_vector_store.initialize() await temp_vector_store.reset_database() await temp_vector_store.close() # Clear document cache if self._cache_file.exists(): self._cache_file.unlink() logger.info("Cleared document cache") # Clear file monitor metadata file_index_path = self.config.metadata_path / "file_index.json" if file_index_path.exists(): file_index_path.unlink() logger.info("Cleared file monitor index") # Clear processing caches if self.config.processing_path.exists(): import shutil shutil.rmtree(self.config.processing_path) self.config.processing_path.mkdir(parents=True, exist_ok=True) logger.info("Cleared processing caches") # Clear intelligent cache fingerprints self.cache_manager.clear_all_fingerprints() logger.info("Full cache reset complete. All PDFs will be re-processed.") except Exception as e: logger.error(f"Error during full cache reset: {e}") raise ConfigurationError(f"Failed to reset caches: {e}") async def _reset_chunking_and_embedding_caches(self) -> None: """Reset chunking and embedding caches, preserve parsing results.""" try: logger.info("Resetting chunking and embedding caches...") # Reset vector database (contains embeddings) temp_vector_store = VectorStore(self.config) await temp_vector_store.initialize() await temp_vector_store.reset_database() await temp_vector_store.close() # Clear document cache (will be repopulated) if self._cache_file.exists(): self._cache_file.unlink() logger.info("Cleared document cache") # Clear chunking and embedding stage fingerprints self.cache_manager.clear_stage_fingerprint("chunking") self.cache_manager.clear_stage_fingerprint("embedding") logger.info("Chunking and embedding caches reset complete") except Exception as e: logger.error(f"Error resetting chunking and embedding caches: {e}") raise ConfigurationError(f"Failed to reset chunking/embedding caches: {e}") async def _reprocess_existing_documents(self) -> None: """Re-process existing cached documents after configuration changes. This method is called after cache clearing to re-process cached markdown files with new chunking/embedding configurations, avoiding expensive PDF parsing while ensuring documents are processed with current settings. """ try: logger.info("Re-processing existing cached documents with new configuration...") # Re-process cached markdown files (skips expensive parsing) documents = await self.document_processor.reprocess_cached_documents() if documents: # Add all documents to vector store for document in documents: await self.vector_store.add_document(document) self._document_cache[document.id] = document # Save updated document cache await self._save_document_cache() logger.info(f"✓ Successfully re-processed {len(documents)} cached documents") else: logger.info("No cached documents found to re-process") except Exception as e: logger.error(f"Error re-processing cached documents: {e}") # Don't raise - let system continue with empty state # This allows the system to recover gracefully if re-processing fails async def _reset_embedding_caches(self) -> None: """Reset only embedding caches, preserve parsing and chunking results.""" try: logger.info("Resetting embedding caches only...") # Reset vector database (contains embeddings) temp_vector_store = VectorStore(self.config) await temp_vector_store.initialize() await temp_vector_store.reset_database() await temp_vector_store.close() # Clear document cache (will be repopulated with new embeddings) if self._cache_file.exists(): self._cache_file.unlink() logger.info("Cleared document cache") # Clear only embedding stage fingerprint self.cache_manager.clear_stage_fingerprint("embedding") logger.info("Embedding caches reset complete") except Exception as e: logger.error(f"Error resetting embedding caches: {e}") raise ConfigurationError(f"Failed to reset embedding caches: {e}") async def _resummarize_existing_documents(self) -> None: """Re-summarize existing documents with new summarizer configuration. This method only generates new summaries using cached parsing results, without re-parsing, re-chunking, or re-embedding documents. """ try: logger.info("Re-summarizing existing documents with new summarizer configuration...") # Skip if no summarizer service is available if not self.document_processor or not self.document_processor.summarizer_service: logger.info("No summarizer service available, skipping re-summarization") return # Ensure document cache is synchronized with vector store before re-summarization logger.info("Synchronizing document cache with vector store before re-summarization...") await self._synchronize_document_cache_with_vector_store() # Get all documents directly from the vector store (not from cache) documents_data = await self.vector_store.list_documents() if not documents_data: logger.info("No documents found to re-summarize") return logger.info(f"Found {len(documents_data)} documents to re-summarize") resummarized_count = 0 for doc_data in documents_data: try: doc_id = doc_data["id"] doc_title = doc_data["title"] doc_path = doc_data["path"] logger.info(f"Processing document for re-summarization: {doc_id} ({doc_title}) at {doc_path}") # Skip if we don't have a valid path if not doc_path or doc_path == "": logger.warning(f"Skipping document {doc_id} - no valid path") continue file_path = Path(doc_path) # Check if file still exists if not file_path.exists(): logger.warning(f"Skipping document {doc_id} - file no longer exists: {doc_path}") continue # Try to load cached parsing result instead of re-parsing logger.info(f"Loading cached parsing result for: {file_path}") cached_parse_result = await self._load_cached_parsing_result(file_path) if not cached_parse_result: logger.warning(f"No cached parsing result found for {doc_path}, skipping summarization") continue logger.info(f"Found cached parsing result, generating summary for: {file_path.name}") # Generate new summary using cached content summary_data = await self.document_processor._generate_document_summary( cached_parse_result, file_path.name ) if summary_data: logger.info(f"Summary generated successfully for {doc_title}") # Find the document in cache by path (since IDs might not match) cached_document = None cached_doc_id = None # First try by document ID if doc_id in self._document_cache: cached_document = self._document_cache[doc_id] cached_doc_id = doc_id logger.debug(f"Found document by ID: {doc_id}") else: # Try to find by path if ID doesn't match for cache_doc_id, cache_doc in self._document_cache.items(): if cache_doc.path == doc_path: cached_document = cache_doc cached_doc_id = cache_doc_id logger.info(f"Found document by path match: {doc_path} -> {cache_doc_id}") break if cached_document and cached_doc_id: old_title = cached_document.title # Update summary metadata if summary_data.title and len(summary_data.title) > 5: cached_document.title = summary_data.title logger.info(f"Updated title from '{old_title}' to '{cached_document.title}'") cached_document.metadata.update( { "short_description": summary_data.short_description, "long_description": summary_data.long_description, "summary_generated": True, } ) resummarized_count += 1 logger.info(f"Successfully re-summarized document: {doc_title} (cache ID: {cached_doc_id})") else: logger.warning(f"Document not found in cache by ID ({doc_id}) or path ({doc_path})") logger.debug(f"Available cache IDs: {list(self._document_cache.keys())}") else: logger.warning(f"No summary generated for document: {doc_title}") except Exception as e: logger.error(f"Failed to re-summarize document {doc_data.get('id', 'unknown')}: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") # Save the updated document cache if we have any documents there if resummarized_count > 0 and self._document_cache: await self._save_document_cache() logger.info(f"Completed re-summarizing {resummarized_count} out of {len(documents_data)} documents") except Exception as e: logger.error(f"Error re-summarizing documents: {e}") # Don't raise - let system continue async def _load_cached_parsing_result(self, file_path: Path): """Load cached parsing result for a file without re-parsing. For PDF files, loads from cache. For Markdown files, re-parses on the fly since Markdown parsing is fast and not cached during normal processing. Args: file_path: Path to the document file. Returns: ParseResult if cached result exists or can be generated, None otherwise. """ try: logger.debug(f"Attempting to load cached parsing result for: {file_path}") # Calculate file checksum checksum = await self.document_processor._calculate_checksum(file_path) logger.debug(f"Calculated checksum for {file_path}: {checksum}") # Handle different file types suffix = file_path.suffix.lower() if suffix == ".pdf": # For PDF files, try to load from cache parse_result = await self.document_processor._load_parsing_result(file_path, checksum) if parse_result: logger.info(f"Successfully loaded cached parsing result for PDF: {file_path}") page_count = len(parse_result.pages) if hasattr(parse_result, "pages") else "unknown" logger.debug(f"Parse result has {page_count} pages") return parse_result else: logger.warning(f"No cached parsing result found for PDF: {file_path}") return None elif suffix in [".md", ".markdown"]: # For Markdown files, re-parse on the fly since parsing is fast logger.info(f"Re-parsing Markdown file for summarization: {file_path}") # Import MarkdownParser and create parse result from .parsers.parser_markdown import MarkdownParser markdown_config = { "parse_frontmatter": getattr(self.config, "markdown_parse_frontmatter", True), "extract_title": getattr(self.config, "markdown_extract_title", True), "page_boundary_pattern": getattr( self.config, "markdown_page_boundary_pattern", r"--\[PAGE:\s*(\d+)\]--" ), "split_on_page_boundaries": getattr(self.config, "markdown_split_on_page_boundaries", True), } markdown_parser = MarkdownParser(config=markdown_config) parse_result = await markdown_parser.parse(file_path) if parse_result: logger.info(f"Successfully re-parsed Markdown file for summarization: {file_path}") page_count = len(parse_result.pages) if hasattr(parse_result, "pages") else "unknown" logger.debug(f"Parse result has {page_count} pages") return parse_result else: logger.warning(f"Failed to re-parse Markdown file: {file_path}") return None else: logger.warning(f"Unsupported file type for re-summarization: {suffix}") return None except Exception as e: logger.error(f"Failed to load cached parsing result for {file_path}: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return None async def _handle_post_initialization_reprocessing(self) -> None: """Handle re-processing of cached documents after components are initialized.""" try: # Check if we detected config changes that require re-processing if not hasattr(self, "_config_changes") or not self._config_changes: return changes = self._config_changes # Only re-process if chunking or embedding changed (parsing changes do full reset) if changes.get("chunking") or changes.get("embedding"): await self._reprocess_existing_documents() # Re-summarize documents if summarizer configuration changed if changes.get("summarizer"): await self._resummarize_existing_documents() # Clear the stored changes self._config_changes = None except Exception as e: logger.error(f"Error in post-initialization re-processing: {e}") # Don't raise - let system continue def _setup_tools(self) -> None: """Set up MCP tools.""" @self.app.tool() async def add_document(path: str, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Add a document (PDF or Markdown) to the knowledgebase for searching and analysis. Use this tool to ingest new documents. Supports both PDF and Markdown (.md, .markdown) files. Once added, the document content will be automatically processed, chunked, and made searchable via search_documents. You do not need to call any other tools after adding - the document becomes immediately available for searching. Args: path: Path to the document file (PDF or Markdown) to add to the knowledgebase. metadata: Optional metadata to associate with the document (e.g., tags, categories). Returns: Processing result with document information, success status, and processing time. """ try: # Validate input if not path or not path.strip(): raise ValidationError("Path cannot be empty", "path") file_path = Path(path) if not file_path.exists(): raise ValidationError(f"File does not exist: {path}", "path") supported_extensions = [".pdf", ".md", ".markdown"] if not file_path.suffix.lower() in supported_extensions: raise ValidationError( f"File type not supported. Must be one of {supported_extensions}: {path}", "path" ) logger.info(f"Adding document: {path}") start_time = time.time() # Process the PDF with semaphore to limit parallelism async with self._parsing_semaphore: result = await self.document_processor.process_document(file_path, metadata) if not result.success: logger.error(f"Failed to process PDF {path}: {result.error}") return { "success": False, "error": result.error, "processing_time": time.time() - start_time, } # Add document to vector store if result.document: await self.vector_store.add_document(result.document) # Cache document metadata self._document_cache[result.document.id] = result.document await self._save_document_cache() # Note: Don't call file_monitor.process_new_file() here to avoid double processing # The file monitor will pick up the file through its normal scanning process processing_time = time.time() - start_time result.processing_time = processing_time logger.info(f"Successfully added document {path} in {processing_time:.2f}s") return result.to_dict() except ValidationError as e: logger.error(f"Validation error adding document: {e}") return {"success": False, "error": str(e)} except PDFProcessingError as e: logger.error(f"PDF processing error: {e}") return {"success": False, "error": str(e)} except VectorStoreError as e: logger.error(f"Vector store error: {e}") return {"success": False, "error": str(e)} except Exception as e: logger.error(f"Unexpected error adding document {path}: {e}") return {"success": False, "error": f"Unexpected error: {e}"} @self.app.tool() async def search_documents( query: str, limit: int = 5, metadata_filter: Optional[Dict[str, Any]] = None, search_type: Optional[str] = None, ) -> Dict[str, Any]: """Search for relevant content across the entire PDF knowledgebase. This is the primary tool for finding information. By default, it uses hybrid search combining semantic similarity (vector search) with keyword matching (BM25) for optimal results. You do NOT need to call list_documents first - simply provide your search query and this tool will find the most relevant content across all PDFs. Args: query: Search query text describing what you're looking for. limit: Maximum number of results to return (default: 5). metadata_filter: Optional metadata filters to apply to narrow results. search_type: Optional search type - "hybrid" (default), "vector", or "text". Returns: Search results with relevant document chunks, similarity scores, and metadata. """ try: # Validate input if not query or not query.strip(): raise ValidationError("Query cannot be empty", "query") if limit <= 0: raise ValidationError("Limit must be positive", "limit") # Validate search_type if search_type and search_type not in ["hybrid", "vector", "text"]: raise ValidationError("search_type must be 'hybrid', 'vector', or 'text'", "search_type") # Use hybrid by default if enabled, otherwise fall back to vector if search_type is None: search_type = "hybrid" if self.config.enable_hybrid_search else "vector" logger.info(f"Searching for: {query} (limit: {limit}, type: {search_type})") start_time = time.time() # Create search query object search_query = SearchQuery( query=query.strip(), limit=limit, metadata_filter=metadata_filter, search_type=search_type ) # Generate query embedding query_embedding = await self.embedding_service.generate_embedding(query) if not query_embedding: raise EmbeddingError("Failed to generate query embedding") # Search vector store search_results = await self.vector_store.search(search_query, query_embedding) # Format results results_data = [] for result in search_results: results_data.append(result.to_dict()) search_time = time.time() - start_time response = { "success": True, "results": results_data, "total_results": len(results_data), "query": query, "search_time": search_time, "metadata": {"limit": limit, "metadata_filter": metadata_filter}, } logger.info(f"Search completed: {len(results_data)} results in {search_time:.2f}s") return response except ValidationError as e: logger.error(f"Validation error in search: {e}") return {"success": False, "error": str(e)} except EmbeddingError as e: logger.error(f"Embedding error in search: {e}") return {"success": False, "error": str(e)} except VectorStoreError as e: logger.error(f"Vector store error in search: {e}") return {"success": False, "error": str(e)} except Exception as e: logger.error(f"Unexpected error in search: {e}") return {"success": False, "error": f"Unexpected error: {e}"} @self.app.tool() async def list_documents( metadata_filter: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """List all documents in the knowledgebase for management and browsing purposes. Use this tool ONLY when you need to: - Browse available documents and their metadata - Get document management information (file sizes, page counts, etc.) - Remove or manage specific documents by ID DO NOT use this tool before searching - use search_documents directly instead, as it automatically searches across all documents without requiring a list first. Args: metadata_filter: Optional metadata filters to apply. Returns: List of document metadata and statistics (titles, paths, page counts, etc.). """ try: logger.info("Listing documents") start_time = time.time() # Get all documents from cache documents = list(self._document_cache.values()) # If cache is empty, try to populate from vector store if not documents: await self._populate_document_cache_from_vector_store() documents = list(self._document_cache.values()) # Apply metadata filter if provided if metadata_filter: filtered_docs = [] for doc in documents: # Simple metadata filtering matches = True for key, value in metadata_filter.items(): if key not in doc.metadata or doc.metadata[key] != value: matches = False break if matches: filtered_docs.append(doc) documents = filtered_docs # Format document list document_list = [] for doc in documents: document_list.append(doc.to_dict(include_chunks=False)) # Get vector store statistics total_chunks = await self.vector_store.get_chunk_count() list_time = time.time() - start_time response = { "success": True, "documents": document_list, "total_count": len(document_list), "total_chunks": total_chunks, "list_time": list_time, "metadata": {"metadata_filter": metadata_filter}, } logger.info(f"Listed {len(document_list)} documents in {list_time:.2f}s") return response except Exception as e: logger.error(f"Error listing documents: {e}") return {"success": False, "error": f"Error listing documents: {e}"} def _is_file_watcher_managed_document(self, document: Document) -> bool: """Check if a document is managed by the file watcher (exists in knowledgebase directory). Args: document: Document to check Returns: True if the document is managed by file watcher and should not be removed via API """ try: if not document.path: return False doc_path = Path(document.path).resolve() kb_path = self.config.knowledgebase_path.resolve() uploads_path = (kb_path / "uploads").resolve() # Check if document is within knowledgebase directory try: doc_path.relative_to(kb_path) except ValueError: # Document path is not within knowledgebase directory return False # Check if document is NOT in uploads directory (uploads are user-managed) try: doc_path.relative_to(uploads_path) # Document is in uploads directory, so it's user-managed return False except ValueError: # Document is not in uploads, so it could be file-watcher-managed pass # Check if the original file still exists if doc_path.exists(): return True return False except Exception as e: logger.error(f"Error checking if document is file-watcher-managed: {e}") return False @self.app.tool() async def rescan_documents() -> Dict[str, Any]: """Manually trigger a directory rescan to detect new, modified, or deleted documents. This tool is useful when: - Files were added/removed outside of normal monitoring - The automatic monitoring missed changes (e.g., in containerized environments) - You want to force a complete refresh of the document index The rescan will: 1. Scan the documents directory for all supported files (.pdf, .md, .markdown) 2. Compare with the internal file index to detect changes 3. Process new and modified files 4. Remove deleted files from the knowledgebase 5. Return detailed statistics about the operation Returns: Detailed scan results including files processed, errors, and timing. """ try: if not self.config.enable_manual_rescan: return { "success": False, "error": "Manual rescan is disabled. Set PDFKB_ENABLE_MANUAL_RESCAN=true to enable.", } if not hasattr(self, "file_monitor") or not self.file_monitor: return {"success": False, "error": "File monitor is not available"} logger.info("🔄 Manual document rescan requested via MCP") # Perform the manual rescan result = await self.file_monitor.manual_rescan() # Update our document cache after rescan await self._populate_document_cache_from_vector_store() # Add success flag and format response result["success"] = True result["message"] = ( f"Rescan completed: {result['changes_processed']['new_files_processed']} new, " f"{result['changes_processed']['modified_files_processed']} modified, " f"{result['changes_processed']['deleted_files_processed']} deleted files processed" ) logger.info(f"🔄 Manual document rescan completed: {result['message']}") return result except Exception as e: logger.error(f"Manual document rescan failed: {e}") return {"success": False, "error": f"Rescan failed: {e}"} @self.app.tool() async def remove_document(document_id: str) -> Dict[str, Any]: """Remove a specific document from the knowledgebase. Use this tool to permanently delete a document and all its associated data. If you need to find the document ID first, use list_documents to browse available documents and get their IDs. The document will be completely removed from search results after deletion. Args: document_id: Unique ID of the document to remove (get this from list_documents). Returns: Removal confirmation with document details or error information. """ try: # Validate input if not document_id or not document_id.strip(): raise ValidationError("Document ID cannot be empty", "document_id") document_id = document_id.strip() # Check if document exists if document_id not in self._document_cache: raise DocumentNotFoundError(document_id) document = self._document_cache[document_id] # Check if document is managed by file watcher if self._is_file_watcher_managed_document(document): error_msg = ( f"Cannot remove document '{document.filename or document_id}' as it exists in the " f"knowledgebase directory ({document.path}). To remove this document, delete the " f"file from the filesystem directly." ) logger.warning(f"Attempted to remove file-watcher-managed document: {document_id}") return { "success": False, "error": error_msg, "error_type": "file_watcher_managed", "document_path": document.path, } logger.info(f"Removing document: {document_id}") start_time = time.time() # Remove from vector store await self.vector_store.delete_document(document_id) # For user-uploaded documents, also remove the physical file document_path = document.path try: file_path = Path(document_path) uploads_dir = self.config.knowledgebase_path / "uploads" # Check if this is an uploaded file (in uploads directory) try: file_path.relative_to(uploads_dir) # It's in uploads directory, safe to delete if file_path.exists(): file_path.unlink() logger.info(f"Deleted uploaded file: {file_path}") else: logger.warning(f"Uploaded file not found for deletion: {file_path}") except ValueError: # File is not in uploads directory, don't delete it logger.debug(f"Document file not in uploads directory, preserving: {file_path}") except Exception as e: logger.warning(f"Failed to delete uploaded file {document_path}: {e}") # Remove from document cache del self._document_cache[document_id] await self._save_document_cache() removal_time = time.time() - start_time response = { "success": True, "document_id": document_id, "document_path": document.path, "removal_time": removal_time, "message": f"Document {document_id} removed successfully", } logger.info(f"Successfully removed document {document_id} in {removal_time:.2f}s") return response except ValidationError as e: logger.error(f"Validation error removing document: {e}") return {"success": False, "error": str(e)} except DocumentNotFoundError as e: logger.error(f"Document not found: {e}") return {"success": False, "error": str(e)} except VectorStoreError as e: logger.error(f"Vector store error removing document: {e}") return {"success": False, "error": str(e)} except Exception as e: logger.error(f"Unexpected error removing document {document_id}: {e}") return {"success": False, "error": f"Unexpected error: {e}"} async def _resolve_document_identifier(self, document_identifier: str) -> Optional[str]: """Resolve a document identifier to an internal document ID. Args: document_identifier: Either an internal document ID (doc_xxxxx) or a file path Returns: Internal document ID if found, None otherwise """ # If it looks like an internal ID, check if it exists directly if document_identifier.startswith("doc_"): if document_identifier in self._document_cache: return document_identifier return None # Otherwise, treat as a file path and try to find by path return await self._find_document_by_path(document_identifier) async def _find_document_by_path(self, file_path: str) -> Optional[str]: """Find a document by its file path. Args: file_path: File path to search for (can be absolute or relative) Returns: Internal document ID if found, None otherwise """ # Normalize the path path_obj = Path(file_path) # If not absolute, resolve against knowledgebase path if not path_obj.is_absolute(): path_obj = self.config.knowledgebase_path / path_obj # Convert to string for comparison normalized_path = str(path_obj.resolve()) # Search through document cache for doc_id, document in self._document_cache.items(): # Compare resolved paths try: doc_path = Path(document.path).resolve() if str(doc_path) == normalized_path: return doc_id except Exception: # If path resolution fails, try direct string comparison if document.path == file_path or document.path == str(path_obj): return doc_id return None def _setup_resources(self) -> None: """Set up MCP resources.""" @self.app.resource("doc://{document_identifier}") async def get_document(document_identifier: str) -> str: """Get a document (PDF or Markdown) by ID or file path. Args: document_identifier: Either an internal document ID (e.g., 'doc_4939b2617e65034a') or a file path (e.g., '/app/documents/a121.md' or 'a121.md') Returns: Document content as JSON string. """ try: # Resolve the identifier to an internal document ID document_id = await self._resolve_document_identifier(document_identifier) if not document_id: return json.dumps( { "error": f"Document not found: {document_identifier}", "suggestion": "Use doc://list to see all available documents", "identifier_type": "internal_id" if document_identifier.startswith("doc_") else "file_path", } ) document = self._document_cache[document_id] document_data = document.to_dict(include_chunks=True) logger.info(f"Retrieved document: {document_identifier} -> {document_id}") return json.dumps(document_data, indent=2) except Exception as e: logger.error(f"Error retrieving document {document_identifier}: {e}") return json.dumps( { "error": f"Error retrieving document: {e}", "suggestion": "Use doc://list to see all available documents", } ) @self.app.resource("doc://{document_identifier}/chunk/{chunk_indices}") async def get_document_chunks(document_identifier: str, chunk_indices: str) -> str: """Get specific chunks of a document by chunk index. Args: document_identifier: Either an internal document ID (e.g., 'doc_4939b2617e65034a') or a file path (e.g., '/app/documents/a121.md' or 'a121.md') chunk_indices: Chunk index or comma-separated indices (e.g., '0', '1,2,5', '0,3,4,7') Returns: Chunk content as JSON or plain text for single chunks. """ try: # Resolve the identifier to an internal document ID document_id = await self._resolve_document_identifier(document_identifier) if not document_id: return json.dumps( { "error": f"Document not found: {document_identifier}", "suggestion": "Use doc://list to see all available documents", "identifier_type": "internal_id" if document_identifier.startswith("doc_") else "file_path", } ) document = self._document_cache[document_id] # Parse chunk indices try: requested_indices = [int(idx.strip()) for idx in chunk_indices.split(",") if idx.strip().isdigit()] if not requested_indices: raise ValueError("No valid chunk indices provided") except ValueError as e: return json.dumps( { "error": f"Invalid chunk indices: {chunk_indices}", "suggestion": "Use comma-separated integers like '0', '1,2,5', or '0,3,4,7'", "details": str(e), } ) # Get chunks - first try from document, then from vector store if needed chunks_to_search = document.chunks if not chunks_to_search: try: chunks_to_search = await self.vector_store.get_document_chunks(document_id) logger.info( f"Fetched {len(chunks_to_search)} chunks from vector store for document {document_id}" ) except Exception as e: logger.warning(f"Failed to fetch chunks from vector store for {document_id}: {e}") chunks_to_search = [] if not chunks_to_search: return json.dumps( { "error": f"No chunks found for document {document_identifier}", "suggestion": "Document may not be processed yet or may have processing errors", } ) # Create a mapping of chunk_index to chunk for efficient lookup chunk_map = {chunk.chunk_index: chunk for chunk in chunks_to_search} max_chunk_index = max(chunk_map.keys()) if chunk_map else -1 # Find requested chunks found_chunks = [] missing_indices = [] for idx in requested_indices: if idx in chunk_map: found_chunks.append(chunk_map[idx]) else: missing_indices.append(idx) if not found_chunks: return json.dumps( { "error": f"No chunks found for indices: {requested_indices}", "available_chunk_count": len(chunks_to_search), "available_chunk_indices": sorted(chunk_map.keys()), "max_chunk_index": max_chunk_index, "suggestion": f"Use chunk indices between 0 and {max_chunk_index}", } ) # Sort chunks by chunk_index for consistent ordering found_chunks.sort(key=lambda c: c.chunk_index) # Return format depends on whether single or multiple chunks requested if len(requested_indices) == 1 and len(found_chunks) == 1: # Single chunk - return plain text chunk = found_chunks[0] logger.info(f"Retrieved chunk {chunk.chunk_index} from document {document_identifier}") return chunk.text else: # Multiple chunks - return structured JSON result = { "document_id": document_id, "document_identifier": document_identifier, "requested_indices": requested_indices, "found_chunks": [ { "chunk_index": chunk.chunk_index, "chunk_id": chunk.id, "text": chunk.text, "page_number": chunk.page_number, "metadata": chunk.metadata, } for chunk in found_chunks ], "total_found": len(found_chunks), } if missing_indices: result["missing_indices"] = missing_indices result["warning"] = f"Some requested chunks were not found: {missing_indices}" logger.info( f"Retrieved {len(found_chunks)} chunks (indices: {[c.chunk_index for c in found_chunks]}) " f"from document {document_identifier}" ) return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error retrieving chunks {chunk_indices} from document {document_identifier}: {e}") return json.dumps( { "error": f"Error retrieving chunks: {e}", "suggestion": "Use doc://list to see all available documents", } ) @self.app.resource("doc://list") async def list_all_documents() -> str: """List all available documents (PDFs and Markdown files). Returns: JSON string with document list and metadata. """ try: documents = list(self._document_cache.values()) # If cache is empty, try to populate from vector store if not documents: await self._populate_document_cache_from_vector_store() documents = list(self._document_cache.values()) # Create summary list document_summaries = [] for doc in documents: summary = { "id": doc.id, "title": doc.title or doc.filename, "path": doc.path, "page_count": doc.page_count, "chunk_count": doc.chunk_count, "file_size": doc.file_size, "added_at": doc.added_at.isoformat() if doc.added_at else None, "has_embeddings": doc.has_embeddings, } document_summaries.append(summary) # Get additional statistics total_chunks = await self.vector_store.get_chunk_count() response = { "documents": document_summaries, "total_documents": len(document_summaries), "total_chunks": total_chunks, "knowledgebase_path": str(self.config.knowledgebase_path), "cache_dir": str(self.config.cache_dir), } logger.info(f"Listed {len(document_summaries)} documents via resource") return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error listing documents via resource: {e}") return json.dumps({"error": f"Error listing documents: {e}"}) async def _on_file_change(self, file_path: Path, change_type: str) -> None: """Handle file system changes. Args: file_path: Path to the changed file. change_type: Type of change ('added', 'modified', 'deleted'). """ try: logger.info(f"File {change_type}: {file_path}") if change_type == "added" or change_type == "modified": # Automatically process new or modified PDFs if file_path.suffix.lower() == ".pdf": logger.info(f"Auto-processing {change_type} PDF: {file_path}") # Process the PDF result = await self.document_processor.process_document(file_path) if result.success and result.document: # Update vector store await self.vector_store.add_document(result.document) # Update document cache self._document_cache[result.document.id] = result.document await self._save_document_cache() logger.info(f"Successfully auto-processed: {file_path}") else: logger.error(f"Failed to auto-process {file_path}: {result.error}") elif change_type == "deleted": # Find and remove document from cache doc_to_remove = None for doc_id, doc in self._document_cache.items(): if doc.path == str(file_path): doc_to_remove = doc_id break if doc_to_remove: await self.vector_store.delete_document(doc_to_remove) del self._document_cache[doc_to_remove] await self._save_document_cache() logger.info(f"Removed deleted document: {file_path}") except Exception as e: logger.error(f"Error handling file change {file_path}: {e}") async def _load_document_cache(self) -> None: """Load document metadata from cache file.""" try: if self._cache_file.exists(): with open(self._cache_file, "r", encoding="utf-8") as f: cache_data = json.load(f) self._document_cache = {} for doc_id, doc_data in cache_data.items(): try: document = Document.from_dict(doc_data) self._document_cache[doc_id] = document except Exception as e: logger.warning(f"Failed to load document {doc_id} from cache: {e}") logger.info(f"Loaded {len(self._document_cache)} documents from cache") else: self._document_cache = {} logger.info("No document cache found, starting fresh") # Try to populate from vector store await self._populate_document_cache_from_vector_store() except Exception as e: logger.error(f"Failed to load document cache: {e}") self._document_cache = {} # Try to populate from vector store even if cache loading failed await self._populate_document_cache_from_vector_store() async def _populate_document_cache_from_vector_store(self) -> None: """Populate document cache from vector store when cache is empty.""" try: # Only populate if cache is empty if not self._document_cache: logger.info("Populating document cache from vector store...") documents_info = await self.vector_store.list_documents() # Create minimal Document objects from vector store info for doc_info in documents_info: # Create a minimal document with basic info doc = Document(id=doc_info["id"], path=doc_info["path"], title=doc_info["title"]) self._document_cache[doc.id] = doc logger.info(f"Populated document cache with {len(self._document_cache)} documents from vector store") except Exception as e: logger.error(f"Failed to populate document cache from vector store: {e}") async def _synchronize_document_cache_with_vector_store(self) -> None: """Synchronize document cache with vector store to ensure consistency. This method ensures that all documents in the vector store are also present in the document cache, adding missing ones with basic information. """ try: logger.debug("Synchronizing document cache with vector store...") # Get all documents from vector store vector_store_docs = await self.vector_store.list_documents() vector_store_ids = {doc["id"] for doc in vector_store_docs} # Get all document IDs from cache cache_ids = set(self._document_cache.keys()) # Find missing documents (in vector store but not in cache) missing_ids = vector_store_ids - cache_ids if missing_ids: logger.info(f"Found {len(missing_ids)} documents in vector store that are missing from cache") # Add missing documents to cache for doc_info in vector_store_docs: if doc_info["id"] in missing_ids: try: # Create document with available info from vector store doc = Document( id=doc_info["id"], path=doc_info["path"], title=doc_info["title"], # Add other fields if available checksum=doc_info.get("checksum", ""), file_size=doc_info.get("file_size", 0), page_count=doc_info.get("page_count", 0), metadata=doc_info.get("metadata", {}), ) self._document_cache[doc.id] = doc logger.debug(f"Added missing document to cache: {doc.id} ({doc.title})") except Exception as e: logger.warning(f"Failed to add document {doc_info['id']} to cache: {e}") # Save updated cache await self._save_document_cache() logger.info(f"Synchronized document cache: added {len(missing_ids)} missing documents") else: logger.debug("Document cache is already synchronized with vector store") # Also check for documents in cache but not in vector store (cleanup) orphaned_ids = cache_ids - vector_store_ids if orphaned_ids: logger.warning(f"Found {len(orphaned_ids)} documents in cache that are not in vector store") for orphaned_id in orphaned_ids: logger.debug(f"Removing orphaned document from cache: {orphaned_id}") del self._document_cache[orphaned_id] # Save cleaned cache await self._save_document_cache() logger.info(f"Cleaned document cache: removed {len(orphaned_ids)} orphaned documents") except Exception as e: logger.error(f"Failed to synchronize document cache with vector store: {e}") async def _update_document_cache(self, document: Document) -> None: """Callback function to update the document cache when file monitor processes a document. Args: document: Document to add to cache. """ try: self._document_cache[document.id] = document await self._save_document_cache() logger.debug(f"Updated document cache with document {document.id}") except Exception as e: logger.error(f"Failed to update document cache: {e}") async def _save_document_cache(self) -> None: """Save document metadata to cache file.""" try: # Ensure metadata directory exists self._cache_file.parent.mkdir(parents=True, exist_ok=True) cache_data = {} for doc_id, document in self._document_cache.items(): cache_data[doc_id] = document.to_dict(include_chunks=False) with open(self._cache_file, "w", encoding="utf-8") as f: json.dump(cache_data, f, indent=2) logger.debug(f"Saved {len(cache_data)} documents to cache") except Exception as e: logger.error(f"Failed to save document cache: {e}") def get_http_app(self, path: str = "/"): """Get the FastMCP HTTP ASGI application for integration into FastAPI. Args: path: The internal path prefix for FastMCP routes (default: "/"). When mounting in FastAPI, use path="/" and let FastAPI handle the mount prefix to avoid double path prefixes. Returns: ASGI application instance that can be mounted in FastAPI """ return self.app.http_app(path=path) async def run(self) -> None: """Run the MCP server.""" await self.initialize() if self.config.transport in ["http", "sse"]: transport_name = "HTTP" if self.config.transport == "http" else "SSE" logger.info( f"Running MCP server in {transport_name} mode on {self.config.server_host}:{self.config.server_port}" ) await self.app.run_http_async( transport=self.config.transport, host=self.config.server_host, port=self.config.server_port, show_banner=True, ) else: logger.info("Running MCP server in stdio mode") # Use run_async() instead of run() to work within existing event loop await self.app.run_async() def sync_run(self) -> None: """Synchronous wrapper for run() - runs in a separate thread with its own event loop.""" logger.info("Starting MCP sync_run in thread") try: asyncio.run(self.run()) except Exception as e: logger.error(f"MCP sync_run error: {e}") raise finally: logger.info("MCP sync_run completed in thread") async def shutdown(self) -> None: """Shutdown the server gracefully.""" try: logger.info("Shutting down PDF Knowledgebase server...") if self.file_monitor: await self.file_monitor.stop_monitoring() if self.vector_store: await self.vector_store.close() # Save document cache await self._save_document_cache() logger.info("PDF Knowledgebase server shutdown complete") except Exception as e: logger.error(f"Error during shutdown: {e}") def main(): """Entry point for the MCP server.""" import argparse import signal import sys # Parse command line arguments parser = argparse.ArgumentParser( description="PDF Knowledgebase MCP Server", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Environment Variables: OPENAI_API_KEY OpenAI API key (required for OpenAI embeddings) PDFKB_KNOWLEDGEBASE_PATH Path to PDF directory (default: ./documents) PDFKB_CACHE_DIR Cache directory (default: <KNOWLEDGEBASE_PATH>/.cache) PDFKB_WEB_ENABLE Enable web interface (true/false, default: false) PDFKB_WEB_PORT Unified server port (default: 8000) PDFKB_WEB_HOST Server host (default: localhost) PDFKB_PDF_PARSER PDF parser to use (default: pymupdf4llm) PDFKB_DOCUMENT_CHUNKER Text chunker to use (default: langchain) PDFKB_LOG_LEVEL Logging level (default: INFO) Examples: pdfkb-mcp # Run MCP-only, stdio transport pdfkb-mcp --transport http # Run with HTTP transport (for Cline, modern clients) pdfkb-mcp --transport sse # Run with SSE transport (for Roo, legacy clients) PDFKB_WEB_ENABLE=true pdfkb-mcp # Run with web interface enabled pdfkb-mcp --enable-web # Run unified server (web + MCP) pdfkb-mcp --config myconfig.env # Use custom config file Endpoints: Unified Mode (PDFKB_WEB_ENABLE=true): Web interface: http://localhost:8000/ MCP (HTTP): http://localhost:8000/mcp/ MCP (SSE): http://localhost:8000/sse/ API docs: http://localhost:8000/docs MCP-only Mode (stdio transport is used by MCP clients like Claude Desktop) """, ) parser.add_argument("--config", type=str, help="Path to environment configuration file") parser.add_argument( "--transport", choices=["stdio", "http", "sse"], default="stdio", help="MCP transport mode (default: stdio, use http/sse for remote connections)", ) parser.add_argument("--enable-web", action="store_true", help="Enable web interface") parser.add_argument("--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Override logging level") parser.add_argument("--version", action="version", version=f'pdfkb-mcp {__import__("pdfkb").__version__}') args = parser.parse_args() # Load configuration from custom file if specified if args.config: from dotenv import load_dotenv load_dotenv(args.config, override=True) logger.info(f"Loaded configuration from: {args.config}") # Load main configuration config = ServerConfig.from_env() # Log full configuration for debugging logger.info("Loaded configuration details:") for key, value in config.__dict__.items(): if isinstance(value, Path): logger.info(f" {key}: {value}") elif isinstance(value, (list, dict)): logger.info(f" {key}: {value}") else: logger.info(f" {key}: {value}") # Override configuration from command line arguments if args.enable_web: config.web_enabled = True if args.transport: config.transport = args.transport if args.log_level: config.log_level = args.log_level # Configure logging with the configured level log_level = getattr(logging, config.log_level.upper(), logging.INFO) logging.basicConfig(level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger.info(f"Configuration: {config.knowledgebase_path}") logger.info(f"Cache directory: {config.cache_dir}") logger.info(f"Web interface: {'enabled' if config.web_enabled else 'disabled'}") # Set up signal handlers for graceful shutdown def signal_handler(signum, frame): logger.info(f"Received signal {signum}, initiating graceful shutdown...") # This will cause KeyboardInterrupt to be raised in the event loop raise KeyboardInterrupt(f"Signal {signum}") signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: if config.web_enabled: # Run integrated server (MCP + Web) logger.info("Running in unified server mode (MCP + Web via Hypercorn)") logger.info(f"Web interface will be available at: http://{config.web_host}:{config.web_port}") if config.transport in ["http", "sse"]: endpoint = "mcp" if config.transport == "http" else "sse" logger.info( f"MCP endpoints will be available at: http://{config.web_host}:{config.web_port}/{endpoint}/" ) logger.info(f"API documentation will be available at: http://{config.web_host}:{config.web_port}/docs") # Import here to avoid circular imports and ensure web dependencies are only required when needed from .web_server import IntegratedPDFKnowledgebaseServer integrated_server = IntegratedPDFKnowledgebaseServer(config) asyncio.run(integrated_server.run_integrated()) else: # Run MCP server only logger.info("Running in MCP-only mode") server = PDFKnowledgebaseServer(config) asyncio.run(server.run()) except KeyboardInterrupt: logger.info("Received interrupt signal, shutting down gracefully...") except Exception as e: logger.error(f"Server error: {e}") sys.exit(1) finally: logger.info("Shutdown complete") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/juanqui/pdfkb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server