Skip to main content
Glama
server.py23.5 kB
#!/usr/bin/env python3 import asyncio import base64 import logging import tempfile from io import BytesIO from pathlib import Path from typing import Any, Dict, List, Optional, Union from mcp.server.fastmcp import FastMCP from PIL import Image from transformers import AutoTokenizer, AutoProcessor, AutoModelForImageTextToText try: import fitz # PyMuPDF import pdf2image PDF_SUPPORT = True except ImportError: PDF_SUPPORT = False try: from docx import Document DOCX_SUPPORT = True except ImportError: DOCX_SUPPORT = False try: import openpyxl import pandas as pd EXCEL_SUPPORT = True except ImportError: EXCEL_SUPPORT = False logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) mcp = FastMCP("Nanonets OCR") class NanonetsOCR: def __init__(self): self.model = None self.processor = None self.tokenizer = None self.model_name = "nanonets/Nanonets-OCR-s" async def load_model(self): if self.model is None: logger.info("Loading Nanonets OCR model...") try: self.processor = AutoProcessor.from_pretrained(self.model_name) self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) import torch self.model = AutoModelForImageTextToText.from_pretrained( self.model_name, torch_dtype="auto", device_map="auto" ) self.model.eval() logger.info("Model loaded successfully") except Exception as e: logger.error(f"Failed to load model: {e}") raise async def process_image(self, image: Image.Image, page_num: Optional[int] = None) -> str: await self.load_model() try: # Customize prompt based on whether this is part of a multi-page document if page_num is not None: prompt = f"""Extract the text from page {page_num} of this document as if you were reading it naturally and translate to english for none english document. Return the tables in html format. Return the equations in LaTeX representation. If there is an image in the document and image caption is not present, add a small description of the image inside the <img></img> tag; otherwise, add the image caption inside <img></img>. Watermarks should be wrapped in brackets. Ex: <watermark>OFFICIAL COPY</watermark>. Page numbers should be wrapped in brackets. Ex: <page_number>14</page_number> or <page_number>9/22</page_number>. Prefer using ☐ and ☑ for check boxes.""" else: prompt = """Extract the text from the above document as if you were reading it naturally and translate to english for none english document. Return the tables in html format. Return the equations in LaTeX representation. If there is an image in the document and image caption is not present, add a small description of the image inside the <img></img> tag; otherwise, add the image caption inside <img></img>. Watermarks should be wrapped in brackets. Ex: <watermark>OFFICIAL COPY</watermark>. Page numbers should be wrapped in brackets. Ex: <page_number>14</page_number> or <page_number>9/22</page_number>. Prefer using ☐ and ☑ for check boxes.""" messages = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": [ {"type": "image", "image": image}, {"type": "text", "text": prompt}, ]}, ] text = self.processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) inputs = self.processor( text=[text], images=[image], padding=True, return_tensors="pt" ) inputs = inputs.to(self.model.device) output_ids = self.model.generate( **inputs, max_new_tokens=4096, do_sample=False ) generated_ids = [ output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, output_ids) ] output_text = self.processor.batch_decode( generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True ) return output_text[0] except Exception as e: logger.error(f"Error processing image: {e}") raise def _pdf_to_images(self, pdf_data: bytes) -> List[Image.Image]: """Convert PDF pages to images using pdf2image""" if not PDF_SUPPORT: raise RuntimeError("PDF support not available. Install pdf2image and PyMuPDF.") try: # Save PDF data to temporary file with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as temp_pdf: temp_pdf.write(pdf_data) temp_pdf_path = temp_pdf.name # Convert PDF to images images = pdf2image.convert_from_path( temp_pdf_path, dpi=200, # Good balance between quality and performance fmt='RGB' ) # Clean up temporary file Path(temp_pdf_path).unlink() return images except Exception as e: logger.error(f"Error converting PDF to images: {e}") raise async def process_pdf(self, pdf_data: bytes) -> str: """Process entire PDF document and return combined markdown""" if not PDF_SUPPORT: raise RuntimeError("PDF support not available. Install pdf2image and PyMuPDF.") logger.info("Converting PDF to images...") images = self._pdf_to_images(pdf_data) if not images: raise ValueError("No pages found in PDF or PDF conversion failed") logger.info(f"Processing {len(images)} pages with OCR...") # Process each page page_results = [] for i, image in enumerate(images, 1): logger.info(f"Processing page {i}/{len(images)}...") try: page_content = await self.process_image(image, page_num=i) # Add page separator and page number page_header = f"\n\n---\n\n# Page {i}\n\n" page_results.append(page_header + page_content) except Exception as e: logger.error(f"Error processing page {i}: {e}") # Continue with other pages, add error note error_note = f"\n\n---\n\n# Page {i}\n\n*Error processing this page: {str(e)}*\n\n" page_results.append(error_note) # Combine all pages full_document = "".join(page_results) # Add document header header = f"# PDF Document\n\n*Total pages: {len(images)}*\n\n" return header + full_document # Global OCR instance ocr_instance = NanonetsOCR() def _load_image_from_data(image_data: Union[str, bytes, Path]) -> Image.Image: """Load image from various input formats""" if isinstance(image_data, str): if image_data.startswith('data:'): # Handle data URL header, data = image_data.split(',', 1) image_bytes = base64.b64decode(data) return Image.open(BytesIO(image_bytes)).convert('RGB') elif len(image_data) < 260 and (image_data.startswith('/') or Path(image_data).exists()): # Handle file path (only if reasonable length and exists) return Image.open(image_data).convert('RGB') else: # Assume base64 encoded try: image_bytes = base64.b64decode(image_data) return Image.open(BytesIO(image_bytes)).convert('RGB') except Exception as e: raise ValueError(f"Failed to decode base64 image data: {e}") elif isinstance(image_data, bytes): return Image.open(BytesIO(image_data)).convert('RGB') elif isinstance(image_data, Path): return Image.open(image_data).convert('RGB') else: raise ValueError(f"Unsupported image data type: {type(image_data)}") def _load_pdf_from_data(pdf_data: Union[str, bytes, Path]) -> bytes: """Load PDF from various input formats""" if isinstance(pdf_data, str): if pdf_data.startswith('data:'): # Handle data URL header, data = pdf_data.split(',', 1) return base64.b64decode(data) elif len(pdf_data) < 260 and (pdf_data.startswith('/') or Path(pdf_data).exists()): # Handle file path (only if reasonable length and exists) return Path(pdf_data).read_bytes() else: # Assume base64 encoded try: return base64.b64decode(pdf_data) except Exception as e: raise ValueError(f"Failed to decode base64 PDF data: {e}") elif isinstance(pdf_data, bytes): return pdf_data elif isinstance(pdf_data, Path): return pdf_data.read_bytes() else: raise ValueError(f"Unsupported PDF data type: {type(pdf_data)}") def _load_docx_from_data(docx_data: Union[str, bytes, Path]) -> bytes: """Load Word document from various input formats""" if isinstance(docx_data, str): if docx_data.startswith('data:'): # Handle data URL header, data = docx_data.split(',', 1) return base64.b64decode(data) elif len(docx_data) < 260 and (docx_data.startswith('/') or Path(docx_data).exists()): # Handle file path (only if reasonable length and exists) return Path(docx_data).read_bytes() else: # Assume base64 encoded try: return base64.b64decode(docx_data) except Exception as e: raise ValueError(f"Failed to decode base64 DOCX data: {e}") elif isinstance(docx_data, bytes): return docx_data elif isinstance(docx_data, Path): return docx_data.read_bytes() else: raise ValueError(f"Unsupported DOCX data type: {type(docx_data)}") def _load_excel_from_data(excel_data: Union[str, bytes, Path]) -> bytes: """Load Excel file from various input formats""" if isinstance(excel_data, str): if excel_data.startswith('data:'): # Handle data URL header, data = excel_data.split(',', 1) return base64.b64decode(data) elif len(excel_data) < 260 and (excel_data.startswith('/') or Path(excel_data).exists()): # Handle file path (only if reasonable length and exists) return Path(excel_data).read_bytes() else: # Assume base64 encoded try: return base64.b64decode(excel_data) except Exception as e: raise ValueError(f"Failed to decode base64 Excel data: {e}") elif isinstance(excel_data, bytes): return excel_data elif isinstance(excel_data, Path): return excel_data.read_bytes() else: raise ValueError(f"Unsupported Excel data type: {type(excel_data)}") def _process_docx_to_markdown(docx_bytes: bytes) -> str: """Convert Word document to markdown""" if not DOCX_SUPPORT: raise RuntimeError("Word document support not available. Install python-docx package.") try: # Save to temporary file and process with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as temp_file: temp_file.write(docx_bytes) temp_file_path = temp_file.name try: # Load document doc = Document(temp_file_path) # Extract content markdown_content = [] markdown_content.append("# Word Document\n") for paragraph in doc.paragraphs: text = paragraph.text.strip() if text: # Simple formatting detection if paragraph.style.name.startswith('Heading'): level = int(paragraph.style.name[-1]) if paragraph.style.name[-1].isdigit() else 1 markdown_content.append(f"{'#' * level} {text}\n") else: markdown_content.append(f"{text}\n") # Process tables for table in doc.tables: markdown_content.append("\n") for i, row in enumerate(table.rows): cells = [cell.text.strip() for cell in row.cells] if i == 0: # Header row markdown_content.append("| " + " | ".join(cells) + " |") markdown_content.append("| " + " | ".join(["---"] * len(cells)) + " |") else: # Data row markdown_content.append("| " + " | ".join(cells) + " |") markdown_content.append("\n") return "\n".join(markdown_content) finally: # Clean up temporary file Path(temp_file_path).unlink() except Exception as e: raise RuntimeError(f"Failed to process Word document: {e}") def _process_excel_to_markdown(excel_bytes: bytes) -> str: """Convert Excel file to markdown""" if not EXCEL_SUPPORT: raise RuntimeError("Excel support not available. Install openpyxl and pandas packages.") try: # Save to temporary file and process with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as temp_file: temp_file.write(excel_bytes) temp_file_path = temp_file.name try: # Load workbook workbook = openpyxl.load_workbook(temp_file_path, data_only=True) markdown_content = [] markdown_content.append("# Excel Workbook\n") for sheet_name in workbook.sheetnames: worksheet = workbook[sheet_name] # Skip empty sheets if worksheet.max_row == 1 and worksheet.max_column == 1: cell_value = worksheet.cell(1, 1).value if cell_value is None: continue markdown_content.append(f"## Sheet: {sheet_name}\n") # Convert to pandas DataFrame for easier processing data = [] for row in worksheet.iter_rows(values_only=True): if any(cell is not None for cell in row): # Skip completely empty rows data.append([str(cell) if cell is not None else "" for cell in row]) if data: # Create markdown table if len(data) > 1: # Header row headers = data[0] markdown_content.append("| " + " | ".join(headers) + " |") markdown_content.append("| " + " | ".join(["---"] * len(headers)) + " |") # Data rows for row in data[1:]: # Pad row to match header length padded_row = row + [""] * (len(headers) - len(row)) markdown_content.append("| " + " | ".join(padded_row[:len(headers)]) + " |") else: # Single row markdown_content.append("| " + " | ".join(data[0]) + " |") markdown_content.append("\n") return "\n".join(markdown_content) finally: # Clean up temporary file Path(temp_file_path).unlink() except Exception as e: raise RuntimeError(f"Failed to process Excel file: {e}") @mcp.tool() async def ocr_image_to_markdown( image_data: str, image_format: Optional[str] = None ) -> str: """ Convert an image containing text/documents to structured markdown using Nanonets OCR. Args: image_data: Image data as base64 string, data URL, or file path image_format: Optional format hint (png, jpg, etc.) Returns: Structured markdown representation of the document """ try: # Load and validate image image = _load_image_from_data(image_data) # Process with OCR result = await ocr_instance.process_image(image) return result except Exception as e: error_msg = f"OCR processing failed: {str(e)}" logger.error(error_msg) raise RuntimeError(error_msg) @mcp.tool() async def ocr_pdf_to_markdown(pdf_data: str) -> str: """ Convert an entire PDF document to structured markdown using Nanonets OCR. Processes all pages and combines them into a single markdown document. Args: pdf_data: PDF data as base64 string, data URL, or file path Returns: Structured markdown representation of the entire PDF document """ if not PDF_SUPPORT: raise RuntimeError("PDF support not available. Install pdf2image and PyMuPDF packages.") try: # Load and validate PDF pdf_bytes = _load_pdf_from_data(pdf_data) # Process with OCR result = await ocr_instance.process_pdf(pdf_bytes) return result except Exception as e: error_msg = f"PDF OCR processing failed: {str(e)}" logger.error(error_msg) raise RuntimeError(error_msg) @mcp.tool() async def process_word_to_markdown(docx_data: str) -> str: """ Convert a Word document (.docx) to structured markdown. Extracts text, headings, and tables from the document. Args: docx_data: Word document data as base64 string, data URL, or file path Returns: Structured markdown representation of the Word document """ if not DOCX_SUPPORT: raise RuntimeError("Word document support not available. Install python-docx package.") try: # Load and validate Word document docx_bytes = _load_docx_from_data(docx_data) # Process to markdown result = _process_docx_to_markdown(docx_bytes) return result except Exception as e: error_msg = f"Word document processing failed: {str(e)}" logger.error(error_msg) raise RuntimeError(error_msg) @mcp.tool() async def process_excel_to_markdown(excel_data: str) -> str: """ Convert an Excel file (.xlsx) to structured markdown. Extracts data from all worksheets and converts tables to markdown format. Args: excel_data: Excel file data as base64 string, data URL, or file path Returns: Structured markdown representation of the Excel workbook """ if not EXCEL_SUPPORT: raise RuntimeError("Excel support not available. Install openpyxl and pandas packages.") try: # Load and validate Excel file excel_bytes = _load_excel_from_data(excel_data) # Process to markdown result = _process_excel_to_markdown(excel_bytes) return result except Exception as e: error_msg = f"Excel file processing failed: {str(e)}" logger.error(error_msg) raise RuntimeError(error_msg) @mcp.resource("nanonets://model-info") async def get_model_info() -> str: """Get information about the Nanonets OCR model and document processing capabilities""" pdf_status = "✅ Available" if PDF_SUPPORT else "❌ Not installed (requires pdf2image and PyMuPDF)" docx_status = "✅ Available" if DOCX_SUPPORT else "❌ Not installed (requires python-docx)" excel_status = "✅ Available" if EXCEL_SUPPORT else "❌ Not installed (requires openpyxl and pandas)" return f"""# Nanonets MCP Server Information **OCR Model**: nanonets/Nanonets-OCR-s **Parameters**: 3.75B (based on Qwen2.5-VL-3B-Instruct) **OCR Capabilities**: - Document-to-markdown conversion - LaTeX equation recognition - Table structure preservation - Image description generation - Signature and watermark detection - Checkbox recognition - Complex layout understanding - Multi-page PDF processing **Document Processing Capabilities**: - Word document (.docx) text and table extraction - Excel spreadsheet (.xlsx) data extraction - PDF document OCR processing **Input Formats**: - Images: PNG, JPG, BMP, TIFF, WEBP - Documents: PDF, DOCX, XLSX **Output**: Structured markdown with semantic tagging **Support Status**: - **PDF Support**: {pdf_status} - **Word Support**: {docx_status} - **Excel Support**: {excel_status} """ @mcp.tool() async def get_supported_formats() -> Dict[str, Any]: """Get list of supported formats and capabilities""" formats = ["PNG", "JPEG", "JPG", "BMP", "TIFF", "WEBP"] if PDF_SUPPORT: formats.append("PDF") if DOCX_SUPPORT: formats.append("DOCX") if EXCEL_SUPPORT: formats.append("XLSX") capabilities = [ "text_extraction", "table_recognition", "equation_conversion", "layout_preservation", "semantic_tagging" ] if PDF_SUPPORT: capabilities.append("multi_page_processing") if DOCX_SUPPORT: capabilities.append("word_document_processing") if EXCEL_SUPPORT: capabilities.append("spreadsheet_processing") return { "supported_formats": formats, "input_methods": [ "base64_string", "data_url", "file_path" ], "capabilities": capabilities, "max_resolution": "Recommended: 2048x2048 pixels for images", "processing_options": { "pdf_processing": { "supported": PDF_SUPPORT, "dpi": "200 DPI conversion", "max_pages": "No limit (processes all pages)", "method": "OCR with vision model" }, "word_processing": { "supported": DOCX_SUPPORT, "extracts": ["text", "headings", "tables"], "method": "Direct text extraction" }, "excel_processing": { "supported": EXCEL_SUPPORT, "extracts": ["all_worksheets", "tables", "data"], "method": "Direct data extraction" } }, "output_format": "Structured markdown with appropriate separators" } def main(): """Main entry point for the MCP server""" import uvicorn # Get the SSE app from FastMCP app = mcp.sse_app # Run the HTTP server uvicorn.run( app, host="0.0.0.0", port=8000, log_level="info" ) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ArneJanning/nanonets-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server