Skip to main content
Glama

RAG Database MCP Server

by Human-center
ingest_pdfs.py•25.4 kB
#!/usr/bin/env python3 """ PDF Ingestion Pipeline for RAG System This module implements a comprehensive PDF ingestion pipeline that: 1. Extracts text from PDF files using PyPDF2/PyMuPDF 2. Implements intelligent text chunking for semantic coherence 3. Generates embeddings using EmbeddingGemma model 4. Stores processed documents in ChromaDB with metadata 5. Handles batch processing with progress tracking and error handling Usage: python ingest_pdfs.py --input-dir ./documents python ingest_pdfs.py --input-file document.pdf python ingest_pdfs.py --input-dir ./documents --chunk-size 1000 --overlap 200 """ import argparse import logging import os import sys import time import uuid import traceback from pathlib import Path from typing import List, Dict, Tuple, Optional, Generator from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor, as_completed # PDF processing try: import PyPDF2 import fitz # PyMuPDF except ImportError as e: print(f"Error: Missing PDF processing libraries. Please install: {e}", file=sys.stderr) sys.exit(1) # Vector storage try: import chromadb from chromadb.config import Settings from chromadb.utils import embedding_functions except ImportError as e: print(f"Error: Missing ML/vector storage libraries. Please install: {e}", file=sys.stderr) sys.exit(1) # Utilities try: from tqdm import tqdm import nltk from nltk.tokenize import sent_tokenize except ImportError as e: print(f"Error: Missing utility libraries. Please install: {e}", file=sys.stderr) sys.exit(1) @dataclass class DocumentChunk: """Represents a chunk of text extracted from a PDF document.""" text: str page_number: int chunk_id: str filename: str file_path: str chunk_index: int char_start: int char_end: int metadata: Dict[str, any] @dataclass class IngestionStats: """Statistics for the ingestion process.""" total_files: int = 0 processed_files: int = 0 failed_files: int = 0 total_chunks: int = 0 total_characters: int = 0 processing_time: float = 0.0 failed_file_paths: List[str] = None def __post_init__(self): if self.failed_file_paths is None: self.failed_file_paths = [] class PDFExtractor: """Handles PDF text extraction using multiple libraries for robustness.""" def __init__(self): self.logger = logging.getLogger(__name__) def extract_text_pypdf2(self, pdf_path: str) -> List[Tuple[int, str]]: """Extract text using PyPDF2.""" pages = [] try: with open(pdf_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page_num, page in enumerate(pdf_reader.pages, 1): try: text = page.extract_text() if text.strip(): # Only add non-empty pages pages.append((page_num, text)) except Exception as e: self.logger.warning(f"Failed to extract page {page_num} from {pdf_path} with PyPDF2: {e}") continue except Exception as e: self.logger.error(f"Failed to read {pdf_path} with PyPDF2: {e}") raise return pages def extract_text_pymupdf(self, pdf_path: str) -> List[Tuple[int, str]]: """Extract text using PyMuPDF (fitz).""" pages = [] try: doc = fitz.open(pdf_path) for page_num in range(len(doc)): try: page = doc[page_num] text = page.get_text() if text.strip(): # Only add non-empty pages pages.append((page_num + 1, text)) except Exception as e: self.logger.warning(f"Failed to extract page {page_num + 1} from {pdf_path} with PyMuPDF: {e}") continue doc.close() except Exception as e: self.logger.error(f"Failed to read {pdf_path} with PyMuPDF: {e}") raise return pages def extract_text(self, pdf_path: str) -> List[Tuple[int, str]]: """ Extract text from PDF using fallback strategy. Args: pdf_path: Path to PDF file Returns: List of tuples (page_number, text) """ self.logger.info(f"Extracting text from {pdf_path}") # Try PyMuPDF first (generally more robust) try: pages = self.extract_text_pymupdf(pdf_path) if pages: self.logger.debug(f"Successfully extracted {len(pages)} pages with PyMuPDF") return pages except Exception as e: self.logger.warning(f"PyMuPDF failed for {pdf_path}: {e}") # Fallback to PyPDF2 try: pages = self.extract_text_pypdf2(pdf_path) if pages: self.logger.debug(f"Successfully extracted {len(pages)} pages with PyPDF2") return pages except Exception as e: self.logger.error(f"Both PDF extractors failed for {pdf_path}: {e}") raise # If we get here, both methods failed to extract any text raise ValueError(f"No text could be extracted from {pdf_path}") class TextChunker: """Implements intelligent text chunking strategies.""" def __init__(self, chunk_size: int = 1000, overlap: int = 200): """ Initialize chunker. Args: chunk_size: Target size of each chunk in characters overlap: Number of characters to overlap between chunks """ self.chunk_size = chunk_size self.overlap = overlap self.logger = logging.getLogger(__name__) # Download NLTK data if needed try: nltk.data.find('tokenizers/punkt') except LookupError: self.logger.info("Downloading NLTK punkt tokenizer...") nltk.download('punkt', quiet=True) def chunk_text_semantic(self, text: str, page_number: int, filename: str, file_path: str) -> List[DocumentChunk]: """ Chunk text using sentence boundaries for better semantic coherence. Args: text: Text to chunk page_number: Page number the text came from filename: Name of the source file file_path: Full path to the source file Returns: List of DocumentChunk objects """ if not text.strip(): return [] # Tokenize into sentences try: sentences = sent_tokenize(text) except Exception as e: self.logger.warning(f"Sentence tokenization failed, falling back to simple splitting: {e}") # Fallback to simple sentence splitting sentences = [s.strip() for s in text.split('.') if s.strip()] chunks = [] current_chunk = "" current_start = 0 chunk_index = 0 for sentence in sentences: # Check if adding this sentence would exceed chunk size potential_chunk = current_chunk + " " + sentence if current_chunk else sentence if len(potential_chunk) <= self.chunk_size: current_chunk = potential_chunk else: # Save current chunk if it's not empty if current_chunk.strip(): chunk_id = str(uuid.uuid4()) char_end = current_start + len(current_chunk) chunk = DocumentChunk( text=current_chunk.strip(), page_number=page_number, chunk_id=chunk_id, filename=filename, file_path=file_path, chunk_index=chunk_index, char_start=current_start, char_end=char_end, metadata={ 'chunk_type': 'semantic', 'sentence_count': len(sent_tokenize(current_chunk.strip())) if current_chunk.strip() else 0, 'char_length': len(current_chunk.strip()) } ) chunks.append(chunk) chunk_index += 1 # Start new chunk with overlap if self.overlap > 0 and current_chunk: # Take last `overlap` characters for overlap overlap_text = current_chunk[-self.overlap:] if len(current_chunk) > self.overlap else current_chunk current_chunk = overlap_text + " " + sentence current_start = char_end - len(overlap_text) else: current_chunk = sentence current_start = char_end # Don't forget the last chunk if current_chunk.strip(): chunk_id = str(uuid.uuid4()) char_end = current_start + len(current_chunk) chunk = DocumentChunk( text=current_chunk.strip(), page_number=page_number, chunk_id=chunk_id, filename=filename, file_path=file_path, chunk_index=chunk_index, char_start=current_start, char_end=char_end, metadata={ 'chunk_type': 'semantic', 'sentence_count': len(sent_tokenize(current_chunk.strip())) if current_chunk.strip() else 0, 'char_length': len(current_chunk.strip()) } ) chunks.append(chunk) return chunks class ChromaDBStorage: """Handles ChromaDB storage operations.""" def __init__(self, persist_directory: str = "./chroma_db", collection_name: str = "pdf_documents"): """ Initialize ChromaDB storage. Args: persist_directory: Directory to persist ChromaDB data collection_name: Name of the ChromaDB collection """ self.persist_directory = persist_directory self.collection_name = collection_name self.logger = logging.getLogger(__name__) # Ensure persist directory exists Path(persist_directory).mkdir(parents=True, exist_ok=True) # Initialize ChromaDB try: self.client = chromadb.PersistentClient( path=persist_directory, settings=Settings( anonymized_telemetry=False, allow_reset=True ) ) # Use SentenceTransformer embedding function for consistent 384-dim embeddings self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction( model_name="all-MiniLM-L6-v2" ) # Get or create collection self.collection = self.client.get_or_create_collection( name=collection_name, metadata={"description": "PDF document chunks for RAG system"}, embedding_function=self.embedding_function ) self.logger.info(f"ChromaDB initialized at {persist_directory}, collection: {collection_name}") self.logger.info(f"Current collection size: {self.collection.count()}") except Exception as e: self.logger.error(f"Failed to initialize ChromaDB: {e}") raise def add_chunks(self, chunks: List[DocumentChunk]) -> None: """ Add document chunks to ChromaDB. Args: chunks: List of DocumentChunk objects """ try: ids = [chunk.chunk_id for chunk in chunks] documents = [chunk.text for chunk in chunks] metadatas = [] for chunk in chunks: metadata = { "filename": chunk.filename, "file_path": chunk.file_path, "page_number": chunk.page_number, "chunk_index": chunk.chunk_index, "char_start": chunk.char_start, "char_end": chunk.char_end, "char_length": len(chunk.text), "chunk_type": chunk.metadata.get("chunk_type", "unknown"), "sentence_count": chunk.metadata.get("sentence_count", 0) } metadatas.append(metadata) # Embeddings are generated by the collection's embedding_function self.collection.add( ids=ids, documents=documents, metadatas=metadatas ) self.logger.debug(f"Added {len(chunks)} chunks to ChromaDB") except Exception as e: self.logger.error(f"Failed to add chunks to ChromaDB: {e}") raise def get_collection_info(self) -> Dict: """Get information about the current collection.""" try: count = self.collection.count() peek = self.collection.peek() return { "name": self.collection_name, "count": count, "persist_directory": self.persist_directory, "sample_metadata": peek.get("metadatas", [{}])[0] if peek.get("metadatas") else {} } except Exception as e: self.logger.error(f"Failed to get collection info: {e}") return {"error": str(e)} class PDFIngestionPipeline: """Main pipeline class that orchestrates the entire ingestion process.""" def __init__(self, chunk_size: int = 1000, overlap: int = 200, chroma_persist_dir: str = "./chroma_db", collection_name: str = "pdf_documents", max_workers: int = 4): """ Initialize the ingestion pipeline. Args: chunk_size: Size of text chunks in characters overlap: Overlap between chunks in characters chroma_persist_dir: Directory for ChromaDB persistence collection_name: Name of ChromaDB collection embedding_model: EmbeddingGemma model ID max_workers: Maximum number of worker threads for parallel processing """ self.chunk_size = chunk_size self.overlap = overlap self.max_workers = max_workers self.logger = logging.getLogger(__name__) # Initialize components self.pdf_extractor = PDFExtractor() self.text_chunker = TextChunker(chunk_size=chunk_size, overlap=overlap) self.storage = ChromaDBStorage(persist_directory=chroma_persist_dir, collection_name=collection_name) def process_single_pdf(self, pdf_path: str) -> Tuple[bool, int, str]: """ Process a single PDF file. Args: pdf_path: Path to PDF file Returns: Tuple of (success, chunk_count, error_message) """ try: pdf_path = str(Path(pdf_path).resolve()) filename = Path(pdf_path).name self.logger.info(f"Processing {filename}") # Extract text from PDF pages = self.pdf_extractor.extract_text(pdf_path) if not pages: return False, 0, "No text extracted from PDF" # Process each page and create chunks all_chunks = [] for page_num, text in pages: if not text.strip(): continue page_chunks = self.text_chunker.chunk_text_semantic( text=text, page_number=page_num, filename=filename, file_path=pdf_path ) all_chunks.extend(page_chunks) if not all_chunks: return False, 0, "No chunks created from PDF" # Store in ChromaDB (embeddings computed by Chroma) self.storage.add_chunks(all_chunks) self.logger.info(f"Successfully processed {filename}: {len(all_chunks)} chunks") return True, len(all_chunks), "" except Exception as e: error_msg = f"Failed to process {pdf_path}: {str(e)}" self.logger.error(error_msg) self.logger.debug(traceback.format_exc()) return False, 0, error_msg def process_pdf_batch(self, pdf_paths: List[str]) -> IngestionStats: """ Process a batch of PDF files with parallel processing. Args: pdf_paths: List of paths to PDF files Returns: IngestionStats object with processing statistics """ stats = IngestionStats() stats.total_files = len(pdf_paths) start_time = time.time() self.logger.info(f"Starting batch processing of {len(pdf_paths)} PDF files") # Process files in parallel with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit all tasks future_to_path = { executor.submit(self.process_single_pdf, pdf_path): pdf_path for pdf_path in pdf_paths } # Process completed tasks with progress bar with tqdm(total=len(pdf_paths), desc="Processing PDFs") as pbar: for future in as_completed(future_to_path): pdf_path = future_to_path[future] try: success, chunk_count, error_msg = future.result() if success: stats.processed_files += 1 stats.total_chunks += chunk_count else: stats.failed_files += 1 stats.failed_file_paths.append(pdf_path) self.logger.error(f"Failed to process {pdf_path}: {error_msg}") except Exception as e: stats.failed_files += 1 stats.failed_file_paths.append(pdf_path) self.logger.error(f"Exception processing {pdf_path}: {e}") pbar.update(1) stats.processing_time = time.time() - start_time # Log final statistics self.logger.info(f"Batch processing completed in {stats.processing_time:.2f} seconds") self.logger.info(f"Successfully processed: {stats.processed_files}/{stats.total_files} files") self.logger.info(f"Total chunks created: {stats.total_chunks}") if stats.failed_files > 0: self.logger.warning(f"Failed to process {stats.failed_files} files:") for failed_path in stats.failed_file_paths: self.logger.warning(f" - {failed_path}") return stats def get_storage_info(self) -> Dict: """Get information about the storage backend.""" return self.storage.get_collection_info() def setup_logging(verbose: bool = False): """Set up logging configuration.""" level = logging.DEBUG if verbose else logging.INFO # Configure logging to stderr (important for MCP compatibility) logging.basicConfig( level=level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stderr)] ) def find_pdf_files(input_path: str) -> List[str]: """ Find PDF files in the given path. Args: input_path: Path to file or directory Returns: List of PDF file paths """ path = Path(input_path) if path.is_file(): if path.suffix.lower() == '.pdf': return [str(path)] else: raise ValueError(f"Input file {input_path} is not a PDF") elif path.is_dir(): pdf_files = [] for pdf_path in path.rglob("*.pdf"): if pdf_path.is_file(): pdf_files.append(str(pdf_path)) if not pdf_files: raise ValueError(f"No PDF files found in directory {input_path}") return pdf_files else: raise ValueError(f"Input path {input_path} does not exist") def main(): """Main entry point for the PDF ingestion pipeline.""" parser = argparse.ArgumentParser( description="PDF Ingestion Pipeline for RAG System", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Process a single PDF file python ingest_pdfs.py --input-file document.pdf # Process all PDFs in a directory python ingest_pdfs.py --input-dir ./documents # Custom chunk size and overlap python ingest_pdfs.py --input-dir ./documents --chunk-size 1500 --overlap 300 # Use custom ChromaDB location python ingest_pdfs.py --input-dir ./documents --chroma-dir ./my_chroma_db """ ) # Input options input_group = parser.add_mutually_exclusive_group(required=True) input_group.add_argument( "--input-dir", help="Directory containing PDF files to process" ) input_group.add_argument( "--input-file", help="Single PDF file to process" ) # Processing options parser.add_argument( "--chunk-size", type=int, default=1000, help="Target size of text chunks in characters (default: 1000)" ) parser.add_argument( "--overlap", type=int, default=200, help="Overlap between chunks in characters (default: 200)" ) parser.add_argument( "--max-workers", type=int, default=4, help="Maximum number of parallel worker threads (default: 4)" ) # Storage options parser.add_argument( "--chroma-dir", default="./chroma_db", help="Directory for ChromaDB persistence (default: ./chroma_db)" ) parser.add_argument( "--collection-name", default="pdf_documents", help="Name of ChromaDB collection (default: pdf_documents)" ) # (No embedding model option; uses MiniLM via Chroma) # Utility options parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose logging" ) parser.add_argument( "--info", action="store_true", help="Show ChromaDB collection info and exit" ) args = parser.parse_args() # Setup logging setup_logging(args.verbose) logger = logging.getLogger(__name__) try: # Initialize pipeline pipeline = PDFIngestionPipeline( chunk_size=args.chunk_size, overlap=args.overlap, chroma_persist_dir=args.chroma_dir, collection_name=args.collection_name, max_workers=args.max_workers ) # Handle info request if args.info: info = pipeline.get_storage_info() print("\n=== ChromaDB Collection Info ===") for key, value in info.items(): print(f"{key}: {value}") return # Find PDF files to process input_path = args.input_dir or args.input_file pdf_files = find_pdf_files(input_path) logger.info(f"Found {len(pdf_files)} PDF files to process") # Process PDFs stats = pipeline.process_pdf_batch(pdf_files) # Print final report print("\n=== Ingestion Complete ===") print(f"Total files: {stats.total_files}") print(f"Successfully processed: {stats.processed_files}") print(f"Failed: {stats.failed_files}") print(f"Total chunks created: {stats.total_chunks}") print(f"Processing time: {stats.processing_time:.2f} seconds") if stats.failed_files > 0: print(f"\nFailed files ({stats.failed_files}):") for failed_path in stats.failed_file_paths: print(f" - {Path(failed_path).name}") # Show final collection info info = pipeline.get_storage_info() print(f"\nChromaDB Collection: {info['name']}") print(f"Total documents: {info['count']}") print(f"Storage location: {info['persist_directory']}") # Exit with appropriate code sys.exit(0 if stats.failed_files == 0 else 1) except KeyboardInterrupt: logger.info("Processing interrupted by user") sys.exit(1) except Exception as e: logger.error(f"Pipeline failed: {e}") if args.verbose: logger.debug(traceback.format_exc()) sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Human-center/RAG-MCP-HCSRL'

If you have feedback or need assistance with the MCP directory API, please join our Discord server