Skip to main content
Glama

MCP Document Indexer

by yairwein
parser.py9.11 kB
"""Document parsing for various file formats.""" import hashlib import mimetypes from pathlib import Path from typing import List, Dict, Any, Optional from datetime import datetime import fitz # PyMuPDF from docx import Document as DocxDocument import tiktoken class DocumentParser: """Parse documents of various formats.""" def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.tokenizer = tiktoken.get_encoding("cl100k_base") def parse_file(self, file_path: Path) -> Dict[str, Any]: """Parse a file and extract its content and metadata.""" import logging logger = logging.getLogger(__name__) if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") file_stats = file_path.stat() file_hash = self._calculate_file_hash(file_path) file_type = file_path.suffix.lower() logger.info(f" → Extracting text from {file_type} file...") metadata = { "file_path": str(file_path), "file_name": file_path.name, "file_size": file_stats.st_size, "modified_time": datetime.fromtimestamp(file_stats.st_mtime).isoformat(), "file_hash": file_hash, "file_type": file_type } # Extract text based on file type text = self._extract_text(file_path) logger.info(f" → Extracted {len(text):,} characters") logger.info(f" → Creating text chunks (size: {self.chunk_size}, overlap: {self.chunk_overlap})...") # Create chunks chunks = self._create_chunks(text) return { "metadata": metadata, "text": text, "chunks": chunks, "num_chunks": len(chunks), "total_chars": len(text), "total_tokens": len(self.tokenizer.encode(text)) } def _calculate_file_hash(self, file_path: Path) -> str: """Calculate SHA256 hash of file.""" sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() def _extract_text(self, file_path: Path) -> str: """Extract text from various file formats.""" file_ext = file_path.suffix.lower() if file_ext == ".pdf": return self._extract_pdf_text(file_path) elif file_ext in [".docx", ".doc"]: return self._extract_docx_text(file_path) elif file_ext in [".txt", ".md", ".rtf"]: return self._extract_plain_text(file_path) else: # Try to read as plain text try: return self._extract_plain_text(file_path) except Exception: raise ValueError(f"Unsupported file type: {file_ext}") def _extract_pdf_text(self, file_path: Path) -> str: """Extract text from PDF file.""" text_parts = [] try: with fitz.open(str(file_path)) as pdf: for page_num, page in enumerate(pdf, 1): text = page.get_text() if text.strip(): text_parts.append(f"[Page {page_num}]\n{text}") except Exception as e: raise ValueError(f"Error parsing PDF: {e}") return "\n\n".join(text_parts) def _extract_docx_text(self, file_path: Path) -> str: """Extract text from Word document.""" try: doc = DocxDocument(str(file_path)) paragraphs = [] for para in doc.paragraphs: if para.text.strip(): paragraphs.append(para.text) # Also extract text from tables for table in doc.tables: for row in table.rows: row_text = [] for cell in row.cells: if cell.text.strip(): row_text.append(cell.text) if row_text: paragraphs.append(" | ".join(row_text)) return "\n\n".join(paragraphs) except Exception as e: raise ValueError(f"Error parsing Word document: {e}") def _extract_plain_text(self, file_path: Path) -> str: """Extract text from plain text file.""" try: # Try different encodings encodings = ['utf-8', 'latin-1', 'cp1252', 'ascii'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: return f.read() except UnicodeDecodeError: continue # If all encodings fail, try with errors='ignore' with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: return f.read() except Exception as e: raise ValueError(f"Error reading text file: {e}") def _create_chunks(self, text: str) -> List[Dict[str, Any]]: """Create overlapping chunks from text.""" import logging logger = logging.getLogger(__name__) if not text: return [] text_length = len(text) estimated_chunks = (text_length // self.chunk_size) + 1 logger.info(f" → Estimated {estimated_chunks} chunks for {text_length:,} characters") chunks = [] start = 0 chunk_id = 0 last_progress = 0 while start < text_length: # Progress logging every 10% or every 100 chunks progress = int((start / text_length) * 100) if progress >= last_progress + 10 or chunk_id % 100 == 0: logger.info(f" → Chunking progress: {progress}% ({chunk_id} chunks created)") last_progress = progress # Find the end of the chunk end = min(start + self.chunk_size, text_length) # Try to break at sentence or paragraph boundary if end < text_length: # Look for paragraph break first (better boundary) para_break = text.rfind('\n\n', start, end) if para_break > start: end = para_break else: # Look for sentence break sentence_breaks = ['. ', '! ', '? ', '\n'] for break_char in sentence_breaks: break_pos = text.rfind(break_char, start, end) if break_pos > start: end = break_pos + len(break_char) break chunk_text = text[start:end].strip() if chunk_text: # Only tokenize if chunk is not empty - this is the expensive operation token_count = len(self.tokenizer.encode(chunk_text)) chunks.append({ "chunk_id": chunk_id, "text": chunk_text, "start_pos": start, "end_pos": end, "char_count": len(chunk_text), "token_count": token_count }) chunk_id += 1 # Move to next chunk with overlap, but ensure we make progress new_start = max(end - self.chunk_overlap, start + 1) if new_start <= start: # Force progress to prevent infinite loop new_start = start + max(1, self.chunk_size // 2) start = new_start # Safety check to prevent infinite loops if chunk_id > 10000: # Reasonable limit logger.warning(f" → Chunk limit reached ({chunk_id}), stopping to prevent infinite loop") break logger.info(f" → Created {len(chunks)} chunks") return chunks def is_supported_file(file_path: Path, extensions: List[str]) -> bool: """Check if file is supported based on extension.""" return file_path.suffix.lower() in extensions def get_file_info(file_path: Path) -> Dict[str, Any]: """Get basic file information without parsing content.""" if not file_path.exists(): return None stats = file_path.stat() mime_type, _ = mimetypes.guess_type(str(file_path)) return { "path": str(file_path), "name": file_path.name, "size": stats.st_size, "size_mb": round(stats.st_size / (1024 * 1024), 2), "modified": datetime.fromtimestamp(stats.st_mtime).isoformat(), "created": datetime.fromtimestamp(stats.st_ctime).isoformat(), "extension": file_path.suffix.lower(), "mime_type": mime_type }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yairwein/document-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server