Skip to main content
Glama

MCP Orchestration Server

pdf_reader.py28.5 kB
import os import sys import io import tempfile import subprocess from pathlib import Path from typing import List, Optional, Dict, Any from PyPDF2 import PdfReader from PyPDF2.errors import PdfReadError from dotenv import load_dotenv # Load environment variables load_dotenv() # Try to import optional OCR libraries try: import pytesseract from PIL import Image HAS_OCR = True except ImportError: HAS_OCR = False print("⚠️ OCR libraries not available. Install pytesseract and Pillow for OCR functionality.") # Try to import LangChain libraries for LLM functionality try: from langchain_community.document_loaders import PyPDFLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_core.documents import Document from langchain_community.vectorstores import FAISS from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationalRetrievalChain from langchain_core.prompts import PromptTemplate from langchain_core.embeddings import Embeddings import requests HAS_LANGCHAIN = True print("✅ LangChain libraries imported successfully, including ConversationBufferMemory") except ImportError as e: HAS_LANGCHAIN = False print(f"⚠️ LangChain libraries not available: {e}") print("💡 Install with: pip install langchain langchain-community langchain-core") # Try to import Together.ai try: from langchain_together import Together HAS_TOGETHER = True except ImportError: try: from langchain_community.llms import Together HAS_TOGETHER = True except ImportError: HAS_TOGETHER = False print("⚠️ Together.ai not available. Install langchain-together for LLM functionality.") # Set Tesseract path based on OS if HAS_OCR: if sys.platform.startswith('win'): # Windows tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' if not os.path.exists(tesseract_cmd): tesseract_cmd = r'C:\Program Files (x86)\Tesseract-OCR\tesseract.exe' if os.path.exists(tesseract_cmd): pytesseract.pytesseract.tesseract_cmd = tesseract_cmd print(f"✅ Tesseract found at {tesseract_cmd}") elif sys.platform.startswith('darwin'): # macOS tesseract_cmd = '/usr/local/bin/tesseract' if os.path.exists(tesseract_cmd): pytesseract.pytesseract.tesseract_cmd = tesseract_cmd print(f"✅ Tesseract found at {tesseract_cmd}") else: # Linux/Unix tesseract_cmd = '/usr/bin/tesseract' if os.path.exists(tesseract_cmd): pytesseract.pytesseract.tesseract_cmd = tesseract_cmd print(f"✅ Tesseract found at {tesseract_cmd}") class TogetherEmbeddings(Embeddings): """Together.ai embeddings implementation for LangChain.""" def __init__( self, model_name: str = None, api_key: Optional[str] = None, dimensions: int = 768, ): """Initialize Together.ai embeddings.""" self.model_name = model_name or os.getenv("TOGETHER_EMBEDDING_MODEL", "togethercomputer/m2-bert-80M-8k-retrieval") self.api_key = api_key or os.getenv("TOGETHER_API_KEY") if not self.api_key: raise ValueError("TOGETHER_API_KEY not found in environment variables") self.dimensions = dimensions self.base_url = "https://api.together.xyz/v1/embeddings" def _get_embedding(self, text: str) -> List[float]: """Get embedding for a single text.""" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": self.model_name, "input": text } response = requests.post(self.base_url, headers=headers, json=data) if response.status_code != 200: raise ValueError(f"Error from Together.ai API: {response.text}") result = response.json() embedding = result["data"][0]["embedding"] return embedding def embed_documents(self, texts: List[str]) -> List[List[float]]: """Embed a list of documents.""" return [self._get_embedding(text) for text in texts] def embed_query(self, text: str) -> List[float]: """Embed a query.""" return self._get_embedding(text) class EnhancedPDFReader: """Enhanced PDF reader with LLM question-answering capabilities.""" def __init__(self, api_key: Optional[str] = None): """Initialize the enhanced PDF reader.""" self.api_key = api_key or os.getenv("TOGETHER_API_KEY") self.model_name = os.getenv("TOGETHER_MODEL_NAME", "deepseek-ai/DeepSeek-V3") self.embedding_model = os.getenv("TOGETHER_EMBEDDING_MODEL", "togethercomputer/m2-bert-80M-8k-retrieval") # Text processing settings - optimized for token compatibility self.chunk_size = int(os.getenv("CHUNK_SIZE", "1000")) # Increased for better context self.chunk_overlap = int(os.getenv("CHUNK_OVERLAP", "200")) # Increased overlap for continuity # Token estimation settings (rough estimate: 1 token ≈ 4 characters) self.max_chunk_tokens = self.chunk_size // 4 # ~250 tokens per chunk self.max_context_tokens = int(os.getenv("MAX_CONTEXT_TOKENS", "3000")) # Max context for retrieval # LLM settings - optimized for better responses self.temperature = float(os.getenv("LLM_TEMPERATURE", "0.3")) # Lower for more focused answers self.max_tokens = int(os.getenv("LLM_MAX_TOKENS", "1024")) # Reasonable response length self.memory_k = int(os.getenv("LLM_MEMORY_K", "3")) # Reduced for token efficiency # Storage paths for organized file management self.uploaded_pdfs_dir = Path("data/multimodal/uploaded_pdfs") self.processed_outputs_dir = Path("data/multimodal/processed_outputs") # Create directories if they don't exist self.uploaded_pdfs_dir.mkdir(parents=True, exist_ok=True) self.processed_outputs_dir.mkdir(parents=True, exist_ok=True) # Initialize components self.llm = None self.embeddings = None self.text_splitter = None self.vectorstore = None self.qa_chain = None self.memory = None # Initialize if LangChain is available if HAS_LANGCHAIN and HAS_TOGETHER and self.api_key: self._initialize_llm_components() else: print("⚠️ LLM functionality not available. Missing dependencies or API key.") def _initialize_llm_components(self): """Initialize LLM components.""" try: # Set API key in environment os.environ["TOGETHER_API_KEY"] = self.api_key # Initialize LLM self.llm = Together( model=self.model_name, temperature=self.temperature, max_tokens=self.max_tokens ) # Initialize embeddings self.embeddings = TogetherEmbeddings( model_name=self.embedding_model, api_key=self.api_key ) # Initialize text splitter with token-aware settings self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, length_function=len, # Use character count separators=["\n\n", "\n", ". ", " ", ""], # Better separation keep_separator=True # Keep separators for context ) # Initialize memory self.memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True, k=self.memory_k ) print("✅ LLM components initialized successfully") except Exception as e: print(f"⚠️ Error initializing LLM components: {e}") self.llm = None def estimate_tokens(self, text: str) -> int: """Estimate the number of tokens in text (rough approximation).""" # Rough estimation: 1 token ≈ 4 characters for English text return len(text) // 4 def optimize_context_for_tokens(self, retrieved_docs: list, max_tokens: int = None) -> str: """Optimize retrieved context to fit within token limits.""" if max_tokens is None: max_tokens = self.max_context_tokens combined_text = "" current_tokens = 0 for doc in retrieved_docs: doc_text = doc.page_content doc_tokens = self.estimate_tokens(doc_text) if current_tokens + doc_tokens <= max_tokens: combined_text += doc_text + "\n\n" current_tokens += doc_tokens else: # Add partial content if it fits remaining_tokens = max_tokens - current_tokens remaining_chars = remaining_tokens * 4 if remaining_chars > 100: # Only add if meaningful content combined_text += doc_text[:remaining_chars] + "...\n\n" break return combined_text.strip() def extract_text_from_pdf(self, pdf_path: str, include_page_numbers: bool = True, verbose: bool = True, try_ocr: bool = True) -> str: """Extract text from PDF with enhanced capabilities.""" return extract_text_from_pdf(pdf_path, include_page_numbers, verbose, try_ocr) def load_and_process_pdf(self, pdf_path: str, verbose: bool = True) -> bool: """Load and process a PDF for question answering.""" if not HAS_LANGCHAIN or not self.llm: print("⚠️ LLM functionality not available") return False try: if verbose: print(f"📄 Loading PDF: {pdf_path}") # Load PDF using LangChain loader = PyPDFLoader(pdf_path) documents = loader.load() if not documents: print("❌ No content found in PDF") return False if verbose: print(f"✅ Loaded {len(documents)} pages from PDF") # Split documents into chunks split_docs = self.text_splitter.split_documents(documents) if verbose: print(f"✅ Split into {len(split_docs)} chunks") # Create vector store self.vectorstore = FAISS.from_documents(split_docs, self.embeddings) # Create QA chain with optimized retrieval prompt_template = PromptTemplate( input_variables=["context", "question"], template="""You are an expert assistant. Use the following document content to answer the user's question accurately and comprehensively. Context: {context} Question: {question} Answer: Provide a detailed and accurate answer based on the document content. If the information is not available in the document, clearly state that. Keep your response focused and concise while being comprehensive.""" ) # Configure retriever with token-aware settings retriever = self.vectorstore.as_retriever( search_type="similarity", search_kwargs={ "k": 4, # Retrieve more chunks for better context "fetch_k": 8 # Consider more candidates } ) self.qa_chain = ConversationalRetrievalChain.from_llm( llm=self.llm, retriever=retriever, memory=self.memory, return_source_documents=False, # Disable source documents to fix memory issue output_key="answer", # Specify output key for memory combine_docs_chain_kwargs={"prompt": prompt_template} ) if verbose: print("✅ PDF processed and ready for questions") return True except Exception as e: print(f"❌ Error processing PDF: {e}") return False def load_and_process_text(self, text_content: str, document_name: str = "document", verbose: bool = True) -> bool: """Load and process text content for question answering.""" if not HAS_LANGCHAIN or not self.llm: if verbose: print("⚠️ LLM functionality not available") return False try: if verbose: print(f"📝 Processing text content: {document_name}") # Create document from text from langchain_core.documents import Document documents = [Document(page_content=text_content, metadata={"source": document_name})] if verbose: print(f"📄 Created document with {len(text_content)} characters") # Split text into chunks text_chunks = self.text_splitter.split_documents(documents) if verbose: print(f"📝 Split into {len(text_chunks)} chunks") # Create vector store self.vectorstore = FAISS.from_documents(text_chunks, self.embeddings) if verbose: print("🔍 Vector store created") # Create QA chain with optimized retrieval retriever = self.vectorstore.as_retriever( search_type="similarity", search_kwargs={ "k": 4, # Retrieve more chunks for better context "fetch_k": 8 # Consider more candidates } ) self.qa_chain = ConversationalRetrievalChain.from_llm( llm=self.llm, retriever=retriever, memory=self.memory, return_source_documents=False, # Disable source documents to fix memory issue verbose=False, output_key="answer" # Specify output key for memory ) if verbose: print("✅ Text processing complete - ready for questions") return True except Exception as e: if verbose: print(f"❌ Error processing text: {e}") return False def ask_question(self, question: str, verbose: bool = True) -> str: """Ask a question about the loaded PDF.""" if not self.qa_chain: return "❌ No PDF loaded. Please load a PDF first using load_and_process_pdf()." try: if verbose: print(f"🤔 Processing question: {question}") response = self.qa_chain.invoke({"question": question}) answer = response.get("answer", "I couldn't find an answer to that question in the document.") if verbose: print("✅ Answer generated") return answer except Exception as e: error_msg = f"❌ Error generating answer: {str(e)}" if verbose: print(error_msg) return error_msg def get_document_summary(self, max_length: int = 500) -> str: """Get a summary of the loaded document.""" if not self.qa_chain: return "❌ No PDF loaded." summary_question = f"Please provide a comprehensive summary of this document in no more than {max_length} words. Include the main topics, key points, and overall purpose of the document." return self.ask_question(summary_question, verbose=False) def search_document(self, query: str, k: int = 3) -> List[str]: """Search for relevant sections in the document.""" if not self.vectorstore: return ["❌ No PDF loaded."] try: # Perform similarity search docs = self.vectorstore.similarity_search(query, k=k) # Extract content from documents results = [] for i, doc in enumerate(docs, 1): content = doc.page_content[:300] + "..." if len(doc.page_content) > 300 else doc.page_content results.append(f"Result {i}: {content}") return results except Exception as e: return [f"❌ Error searching document: {str(e)}"] def clear_memory(self): """Clear the conversation memory.""" if self.memory: self.memory.clear() print("✅ Conversation memory cleared") def save_vectorstore(self, directory: str = None) -> bool: """Save the vector store to organized processed outputs folder.""" if not self.vectorstore: print("❌ No vector store to save") return False try: # Use organized folder structure if no directory specified if directory is None: from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") directory = self.processed_outputs_dir / f"vectorstore_{timestamp}" os.makedirs(directory, exist_ok=True) self.vectorstore.save_local(str(directory)) print(f"✅ Vector store saved to {directory}") return True except Exception as e: print(f"❌ Error saving vector store: {e}") return False def save_extracted_text(self, text: str, pdf_filename: str) -> str: """Save extracted text to organized processed outputs folder.""" try: from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_name = Path(pdf_filename).stem output_filename = f"{timestamp}_{base_name}_extracted.txt" output_path = self.processed_outputs_dir / output_filename with open(output_path, 'w', encoding='utf-8') as f: f.write(text) print(f"✅ Extracted text saved to {output_path}") return str(output_path) except Exception as e: print(f"❌ Error saving extracted text: {e}") return "" def load_vectorstore(self, directory: str = "vectorstore") -> bool: """Load a vector store from disk.""" if not HAS_LANGCHAIN or not self.embeddings: print("⚠️ LLM functionality not available") return False try: if not os.path.exists(directory): print(f"❌ Vector store directory {directory} not found") return False self.vectorstore = FAISS.load_local(directory, self.embeddings) # Recreate QA chain if self.llm: prompt_template = PromptTemplate( input_variables=["context", "question"], template="""You are an expert assistant. Use the following document content to answer the user's question accurately and comprehensively. Context: {context} Question: {question} Answer: Provide a detailed and accurate answer based on the document content. If the information is not available in the document, clearly state that.""" ) self.qa_chain = ConversationalRetrievalChain.from_llm( llm=self.llm, retriever=self.vectorstore.as_retriever(search_type="similarity"), memory=self.memory, return_source_documents=False, # Disable source documents to fix memory issue output_key="answer", # Specify output key for memory combine_docs_chain_kwargs={"prompt": prompt_template} ) print(f"✅ Vector store loaded from {directory}") return True except Exception as e: print(f"❌ Error loading vector store: {e}") return False def extract_text_from_pdf(pdf_path, include_page_numbers=True, verbose=True, try_ocr=True): """ Extract text from a PDF file with enhanced capabilities. Args: pdf_path (str): Path to the PDF file include_page_numbers (bool): Whether to include page numbers in the output verbose (bool): Whether to print status messages try_ocr (bool): Whether to try OCR if no text is found in the PDF Returns: str: Extracted text or error message """ # Check file extension supported_formats = ('.pdf',) if not pdf_path.lower().endswith(supported_formats): return f"❌ Unsupported file type: {os.path.splitext(pdf_path)[-1]}" # Check if file exists if not os.path.exists(pdf_path): return f"❌ File not found: {pdf_path}" try: if verbose: print(f"📄 Reading PDF from: {pdf_path}") # Try to read the PDF with PyPDF2 reader = PdfReader(pdf_path) if len(reader.pages) == 0: return "⚠️ PDF has no pages." extracted_text = "" empty_pages = 0 # Extract text from each page for page_number, page in enumerate(reader.pages, start=1): text = page.extract_text() # If text extraction failed, try alternative methods if not text or text.isspace(): empty_pages += 1 if verbose: print(f"⚠️ No text found on page {page_number} using standard extraction.") # Add the text to the result if text: if include_page_numbers: extracted_text += f"\n--- Page {page_number} ---\n{text}\n" else: extracted_text += f"{text}\n" # Check if we got any text result = extracted_text.strip() # If no text was found and OCR is available, try OCR if (not result or empty_pages == len(reader.pages)) and try_ocr and HAS_OCR: if verbose: print("🔍 No text found using standard extraction. Trying OCR...") ocr_text = "" try: # Import pdf2image here to avoid import errors if it's not installed from pdf2image import convert_from_path # Process each page with OCR for page_number, page in enumerate(reader.pages, start=1): if verbose: print(f"🔍 Processing page {page_number} with OCR...") try: # Extract the page as an image images = convert_from_path(pdf_path, first_page=page_number, last_page=page_number) if images: # Process the image with OCR page_text = pytesseract.image_to_string(images[0]) if page_text: if include_page_numbers: ocr_text += f"\n--- Page {page_number} (OCR) ---\n{page_text}\n" else: ocr_text += f"{page_text}\n" except Exception as ocr_err: if verbose: print(f"⚠️ OCR failed for page {page_number}: {str(ocr_err)}") except ImportError: if verbose: print("⚠️ pdf2image module not found. OCR processing requires pdf2image.") except Exception as e: if verbose: print(f"⚠️ OCR processing failed: {str(e)}") # If OCR found text, use it if ocr_text.strip(): result = ocr_text.strip() if verbose: print("✅ Successfully extracted text using OCR.") # If still no text, return a warning if not result: return "⚠️ No text found in PDF. The file may contain only images or be protected." return result except PdfReadError as e: return f"❌ Error reading PDF: {str(e)}. The file may be corrupted or password-protected." except PermissionError: return f"❌ Permission denied: Cannot access {pdf_path}" except Exception as e: return f"❌ Error processing PDF: {str(e)}" def save_extracted_text(text, output_path): """ Save extracted text to a file. Args: text (str): Text to save output_path (str): Path to save the text to Returns: bool: True if successful, False otherwise """ try: with open(output_path, 'w', encoding='utf-8') as f: f.write(text) print(f"✅ Text saved to: {output_path}") return True except Exception as e: print(f"❌ Error saving text: {str(e)}") return False def main(): """Main function demonstrating the enhanced PDF reader capabilities.""" print("🚀 Enhanced PDF Reader with LLM Integration") print("=" * 50) # Use command line argument if provided if len(sys.argv) > 1: pdf_path = sys.argv[1] else: # Check for user-uploaded PDFs in the organized folder uploaded_pdfs_dir = Path("data/multimodal/uploaded_pdfs") if uploaded_pdfs_dir.exists(): pdf_files = list(uploaded_pdfs_dir.glob("*.pdf")) if pdf_files: # Use the most recent uploaded PDF pdf_path = str(max(pdf_files, key=os.path.getctime)) print(f"📁 Using most recent uploaded PDF: {pdf_path}") else: print("❌ No PDF files found in uploaded_pdfs folder.") print("Please upload a PDF file first or provide a PDF file path as an argument.") print("Usage: python pdf_reader.py path/to/your/file.pdf") sys.exit(1) else: print("❌ Please provide a PDF file path as an argument.") print("Usage: python pdf_reader.py path/to/your/file.pdf") sys.exit(1) # Initialize the enhanced PDF reader print(f"\n📚 Initializing Enhanced PDF Reader for: {pdf_path}") reader = EnhancedPDFReader() # Extract text using traditional method print("\n📄 Extracting text using traditional method...") text = reader.extract_text_from_pdf(pdf_path, verbose=True) if text: print("\n" + "="*50) print("EXTRACTED TEXT (First 500 characters)") print("="*50) print(text[:500] + "..." if len(text) > 500 else text) # Optionally save to file if requested if len(sys.argv) > 2 and sys.argv[2] == "--save": output_path = os.path.splitext(pdf_path)[0] + ".txt" save_extracted_text(text, output_path) # If LLM functionality is available, demonstrate it if reader.llm: print("\n🤖 LLM functionality is available! Demonstrating advanced features...") # Load and process PDF for question answering print("\n📚 Loading PDF for question answering...") if reader.load_and_process_pdf(pdf_path): # Get document summary print("\n📋 Generating document summary...") summary = reader.get_document_summary() print("Summary:", summary) # Interactive question answering if no command line args if len(sys.argv) <= 1: print("\n❓ Interactive Question Answering Mode") print("Type your questions (or 'quit' to exit):") while True: question = input("\nQ: ").strip() if question.lower() in ['quit', 'exit', 'q']: break if question: answer = reader.ask_question(question) print(f"A: {answer}") # Save vector store for future use print("\n💾 Saving vector store...") reader.save_vectorstore("pdf_vectorstore") else: print("❌ Failed to process PDF for question answering") else: print("\n⚠️ LLM functionality not available.") print("To enable LLM features:") print("1. Install required packages: pip install langchain langchain-together langchain-community") print("2. Set TOGETHER_API_KEY in your .env file") print("3. Ensure all dependencies are properly installed") print("\n✅ Processing completed!") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nisarg-123-web/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server