#!/usr/bin/env python3
import asyncio
import base64
import logging
import tempfile
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from mcp.server.fastmcp import FastMCP
from PIL import Image
from transformers import AutoTokenizer, AutoProcessor, AutoModelForImageTextToText
try:
import fitz # PyMuPDF
import pdf2image
PDF_SUPPORT = True
except ImportError:
PDF_SUPPORT = False
try:
from docx import Document
DOCX_SUPPORT = True
except ImportError:
DOCX_SUPPORT = False
try:
import openpyxl
import pandas as pd
EXCEL_SUPPORT = True
except ImportError:
EXCEL_SUPPORT = False
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
mcp = FastMCP("Nanonets OCR")
class NanonetsOCR:
def __init__(self):
self.model = None
self.processor = None
self.tokenizer = None
self.model_name = "nanonets/Nanonets-OCR-s"
async def load_model(self):
if self.model is None:
logger.info("Loading Nanonets OCR model...")
try:
self.processor = AutoProcessor.from_pretrained(self.model_name)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
import torch
self.model = AutoModelForImageTextToText.from_pretrained(
self.model_name,
torch_dtype="auto",
device_map="auto"
)
self.model.eval()
logger.info("Model loaded successfully")
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise
async def process_image(self, image: Image.Image, page_num: Optional[int] = None) -> str:
await self.load_model()
try:
# Customize prompt based on whether this is part of a multi-page document
if page_num is not None:
prompt = f"""Extract the text from page {page_num} of this document as if you were reading it naturally and translate to english for none english document. Return the tables in html format. Return the equations in LaTeX representation. If there is an image in the document and image caption is not present, add a small description of the image inside the <img></img> tag; otherwise, add the image caption inside <img></img>. Watermarks should be wrapped in brackets. Ex: <watermark>OFFICIAL COPY</watermark>. Page numbers should be wrapped in brackets. Ex: <page_number>14</page_number> or <page_number>9/22</page_number>. Prefer using ☐ and ☑ for check boxes."""
else:
prompt = """Extract the text from the above document as if you were reading it naturally and translate to english for none english document. Return the tables in html format. Return the equations in LaTeX representation. If there is an image in the document and image caption is not present, add a small description of the image inside the <img></img> tag; otherwise, add the image caption inside <img></img>. Watermarks should be wrapped in brackets. Ex: <watermark>OFFICIAL COPY</watermark>. Page numbers should be wrapped in brackets. Ex: <page_number>14</page_number> or <page_number>9/22</page_number>. Prefer using ☐ and ☑ for check boxes."""
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": [
{"type": "image", "image": image},
{"type": "text", "text": prompt},
]},
]
text = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
inputs = self.processor(
text=[text],
images=[image],
padding=True,
return_tensors="pt"
)
inputs = inputs.to(self.model.device)
output_ids = self.model.generate(
**inputs,
max_new_tokens=4096,
do_sample=False
)
generated_ids = [
output_ids[len(input_ids):]
for input_ids, output_ids in zip(inputs.input_ids, output_ids)
]
output_text = self.processor.batch_decode(
generated_ids,
skip_special_tokens=True,
clean_up_tokenization_spaces=True
)
return output_text[0]
except Exception as e:
logger.error(f"Error processing image: {e}")
raise
def _pdf_to_images(self, pdf_data: bytes) -> List[Image.Image]:
"""Convert PDF pages to images using pdf2image"""
if not PDF_SUPPORT:
raise RuntimeError("PDF support not available. Install pdf2image and PyMuPDF.")
try:
# Save PDF data to temporary file
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as temp_pdf:
temp_pdf.write(pdf_data)
temp_pdf_path = temp_pdf.name
# Convert PDF to images
images = pdf2image.convert_from_path(
temp_pdf_path,
dpi=200, # Good balance between quality and performance
fmt='RGB'
)
# Clean up temporary file
Path(temp_pdf_path).unlink()
return images
except Exception as e:
logger.error(f"Error converting PDF to images: {e}")
raise
async def process_pdf(self, pdf_data: bytes) -> str:
"""Process entire PDF document and return combined markdown"""
if not PDF_SUPPORT:
raise RuntimeError("PDF support not available. Install pdf2image and PyMuPDF.")
logger.info("Converting PDF to images...")
images = self._pdf_to_images(pdf_data)
if not images:
raise ValueError("No pages found in PDF or PDF conversion failed")
logger.info(f"Processing {len(images)} pages with OCR...")
# Process each page
page_results = []
for i, image in enumerate(images, 1):
logger.info(f"Processing page {i}/{len(images)}...")
try:
page_content = await self.process_image(image, page_num=i)
# Add page separator and page number
page_header = f"\n\n---\n\n# Page {i}\n\n"
page_results.append(page_header + page_content)
except Exception as e:
logger.error(f"Error processing page {i}: {e}")
# Continue with other pages, add error note
error_note = f"\n\n---\n\n# Page {i}\n\n*Error processing this page: {str(e)}*\n\n"
page_results.append(error_note)
# Combine all pages
full_document = "".join(page_results)
# Add document header
header = f"# PDF Document\n\n*Total pages: {len(images)}*\n\n"
return header + full_document
# Global OCR instance
ocr_instance = NanonetsOCR()
def _load_image_from_data(image_data: Union[str, bytes, Path]) -> Image.Image:
"""Load image from various input formats"""
if isinstance(image_data, str):
if image_data.startswith('data:'):
# Handle data URL
header, data = image_data.split(',', 1)
image_bytes = base64.b64decode(data)
return Image.open(BytesIO(image_bytes)).convert('RGB')
elif len(image_data) < 260 and (image_data.startswith('/') or Path(image_data).exists()):
# Handle file path (only if reasonable length and exists)
return Image.open(image_data).convert('RGB')
else:
# Assume base64 encoded
try:
image_bytes = base64.b64decode(image_data)
return Image.open(BytesIO(image_bytes)).convert('RGB')
except Exception as e:
raise ValueError(f"Failed to decode base64 image data: {e}")
elif isinstance(image_data, bytes):
return Image.open(BytesIO(image_data)).convert('RGB')
elif isinstance(image_data, Path):
return Image.open(image_data).convert('RGB')
else:
raise ValueError(f"Unsupported image data type: {type(image_data)}")
def _load_pdf_from_data(pdf_data: Union[str, bytes, Path]) -> bytes:
"""Load PDF from various input formats"""
if isinstance(pdf_data, str):
if pdf_data.startswith('data:'):
# Handle data URL
header, data = pdf_data.split(',', 1)
return base64.b64decode(data)
elif len(pdf_data) < 260 and (pdf_data.startswith('/') or Path(pdf_data).exists()):
# Handle file path (only if reasonable length and exists)
return Path(pdf_data).read_bytes()
else:
# Assume base64 encoded
try:
return base64.b64decode(pdf_data)
except Exception as e:
raise ValueError(f"Failed to decode base64 PDF data: {e}")
elif isinstance(pdf_data, bytes):
return pdf_data
elif isinstance(pdf_data, Path):
return pdf_data.read_bytes()
else:
raise ValueError(f"Unsupported PDF data type: {type(pdf_data)}")
def _load_docx_from_data(docx_data: Union[str, bytes, Path]) -> bytes:
"""Load Word document from various input formats"""
if isinstance(docx_data, str):
if docx_data.startswith('data:'):
# Handle data URL
header, data = docx_data.split(',', 1)
return base64.b64decode(data)
elif len(docx_data) < 260 and (docx_data.startswith('/') or Path(docx_data).exists()):
# Handle file path (only if reasonable length and exists)
return Path(docx_data).read_bytes()
else:
# Assume base64 encoded
try:
return base64.b64decode(docx_data)
except Exception as e:
raise ValueError(f"Failed to decode base64 DOCX data: {e}")
elif isinstance(docx_data, bytes):
return docx_data
elif isinstance(docx_data, Path):
return docx_data.read_bytes()
else:
raise ValueError(f"Unsupported DOCX data type: {type(docx_data)}")
def _load_excel_from_data(excel_data: Union[str, bytes, Path]) -> bytes:
"""Load Excel file from various input formats"""
if isinstance(excel_data, str):
if excel_data.startswith('data:'):
# Handle data URL
header, data = excel_data.split(',', 1)
return base64.b64decode(data)
elif len(excel_data) < 260 and (excel_data.startswith('/') or Path(excel_data).exists()):
# Handle file path (only if reasonable length and exists)
return Path(excel_data).read_bytes()
else:
# Assume base64 encoded
try:
return base64.b64decode(excel_data)
except Exception as e:
raise ValueError(f"Failed to decode base64 Excel data: {e}")
elif isinstance(excel_data, bytes):
return excel_data
elif isinstance(excel_data, Path):
return excel_data.read_bytes()
else:
raise ValueError(f"Unsupported Excel data type: {type(excel_data)}")
def _process_docx_to_markdown(docx_bytes: bytes) -> str:
"""Convert Word document to markdown"""
if not DOCX_SUPPORT:
raise RuntimeError("Word document support not available. Install python-docx package.")
try:
# Save to temporary file and process
with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as temp_file:
temp_file.write(docx_bytes)
temp_file_path = temp_file.name
try:
# Load document
doc = Document(temp_file_path)
# Extract content
markdown_content = []
markdown_content.append("# Word Document\n")
for paragraph in doc.paragraphs:
text = paragraph.text.strip()
if text:
# Simple formatting detection
if paragraph.style.name.startswith('Heading'):
level = int(paragraph.style.name[-1]) if paragraph.style.name[-1].isdigit() else 1
markdown_content.append(f"{'#' * level} {text}\n")
else:
markdown_content.append(f"{text}\n")
# Process tables
for table in doc.tables:
markdown_content.append("\n")
for i, row in enumerate(table.rows):
cells = [cell.text.strip() for cell in row.cells]
if i == 0:
# Header row
markdown_content.append("| " + " | ".join(cells) + " |")
markdown_content.append("| " + " | ".join(["---"] * len(cells)) + " |")
else:
# Data row
markdown_content.append("| " + " | ".join(cells) + " |")
markdown_content.append("\n")
return "\n".join(markdown_content)
finally:
# Clean up temporary file
Path(temp_file_path).unlink()
except Exception as e:
raise RuntimeError(f"Failed to process Word document: {e}")
def _process_excel_to_markdown(excel_bytes: bytes) -> str:
"""Convert Excel file to markdown"""
if not EXCEL_SUPPORT:
raise RuntimeError("Excel support not available. Install openpyxl and pandas packages.")
try:
# Save to temporary file and process
with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as temp_file:
temp_file.write(excel_bytes)
temp_file_path = temp_file.name
try:
# Load workbook
workbook = openpyxl.load_workbook(temp_file_path, data_only=True)
markdown_content = []
markdown_content.append("# Excel Workbook\n")
for sheet_name in workbook.sheetnames:
worksheet = workbook[sheet_name]
# Skip empty sheets
if worksheet.max_row == 1 and worksheet.max_column == 1:
cell_value = worksheet.cell(1, 1).value
if cell_value is None:
continue
markdown_content.append(f"## Sheet: {sheet_name}\n")
# Convert to pandas DataFrame for easier processing
data = []
for row in worksheet.iter_rows(values_only=True):
if any(cell is not None for cell in row): # Skip completely empty rows
data.append([str(cell) if cell is not None else "" for cell in row])
if data:
# Create markdown table
if len(data) > 1:
# Header row
headers = data[0]
markdown_content.append("| " + " | ".join(headers) + " |")
markdown_content.append("| " + " | ".join(["---"] * len(headers)) + " |")
# Data rows
for row in data[1:]:
# Pad row to match header length
padded_row = row + [""] * (len(headers) - len(row))
markdown_content.append("| " + " | ".join(padded_row[:len(headers)]) + " |")
else:
# Single row
markdown_content.append("| " + " | ".join(data[0]) + " |")
markdown_content.append("\n")
return "\n".join(markdown_content)
finally:
# Clean up temporary file
Path(temp_file_path).unlink()
except Exception as e:
raise RuntimeError(f"Failed to process Excel file: {e}")
@mcp.tool()
async def ocr_image_to_markdown(
image_data: str,
image_format: Optional[str] = None
) -> str:
"""
Convert an image containing text/documents to structured markdown using Nanonets OCR.
Args:
image_data: Image data as base64 string, data URL, or file path
image_format: Optional format hint (png, jpg, etc.)
Returns:
Structured markdown representation of the document
"""
try:
# Load and validate image
image = _load_image_from_data(image_data)
# Process with OCR
result = await ocr_instance.process_image(image)
return result
except Exception as e:
error_msg = f"OCR processing failed: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)
@mcp.tool()
async def ocr_pdf_to_markdown(pdf_data: str) -> str:
"""
Convert an entire PDF document to structured markdown using Nanonets OCR.
Processes all pages and combines them into a single markdown document.
Args:
pdf_data: PDF data as base64 string, data URL, or file path
Returns:
Structured markdown representation of the entire PDF document
"""
if not PDF_SUPPORT:
raise RuntimeError("PDF support not available. Install pdf2image and PyMuPDF packages.")
try:
# Load and validate PDF
pdf_bytes = _load_pdf_from_data(pdf_data)
# Process with OCR
result = await ocr_instance.process_pdf(pdf_bytes)
return result
except Exception as e:
error_msg = f"PDF OCR processing failed: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)
@mcp.tool()
async def process_word_to_markdown(docx_data: str) -> str:
"""
Convert a Word document (.docx) to structured markdown.
Extracts text, headings, and tables from the document.
Args:
docx_data: Word document data as base64 string, data URL, or file path
Returns:
Structured markdown representation of the Word document
"""
if not DOCX_SUPPORT:
raise RuntimeError("Word document support not available. Install python-docx package.")
try:
# Load and validate Word document
docx_bytes = _load_docx_from_data(docx_data)
# Process to markdown
result = _process_docx_to_markdown(docx_bytes)
return result
except Exception as e:
error_msg = f"Word document processing failed: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)
@mcp.tool()
async def process_excel_to_markdown(excel_data: str) -> str:
"""
Convert an Excel file (.xlsx) to structured markdown.
Extracts data from all worksheets and converts tables to markdown format.
Args:
excel_data: Excel file data as base64 string, data URL, or file path
Returns:
Structured markdown representation of the Excel workbook
"""
if not EXCEL_SUPPORT:
raise RuntimeError("Excel support not available. Install openpyxl and pandas packages.")
try:
# Load and validate Excel file
excel_bytes = _load_excel_from_data(excel_data)
# Process to markdown
result = _process_excel_to_markdown(excel_bytes)
return result
except Exception as e:
error_msg = f"Excel file processing failed: {str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)
@mcp.resource("nanonets://model-info")
async def get_model_info() -> str:
"""Get information about the Nanonets OCR model and document processing capabilities"""
pdf_status = "✅ Available" if PDF_SUPPORT else "❌ Not installed (requires pdf2image and PyMuPDF)"
docx_status = "✅ Available" if DOCX_SUPPORT else "❌ Not installed (requires python-docx)"
excel_status = "✅ Available" if EXCEL_SUPPORT else "❌ Not installed (requires openpyxl and pandas)"
return f"""# Nanonets MCP Server Information
**OCR Model**: nanonets/Nanonets-OCR-s
**Parameters**: 3.75B (based on Qwen2.5-VL-3B-Instruct)
**OCR Capabilities**:
- Document-to-markdown conversion
- LaTeX equation recognition
- Table structure preservation
- Image description generation
- Signature and watermark detection
- Checkbox recognition
- Complex layout understanding
- Multi-page PDF processing
**Document Processing Capabilities**:
- Word document (.docx) text and table extraction
- Excel spreadsheet (.xlsx) data extraction
- PDF document OCR processing
**Input Formats**:
- Images: PNG, JPG, BMP, TIFF, WEBP
- Documents: PDF, DOCX, XLSX
**Output**: Structured markdown with semantic tagging
**Support Status**:
- **PDF Support**: {pdf_status}
- **Word Support**: {docx_status}
- **Excel Support**: {excel_status}
"""
@mcp.tool()
async def get_supported_formats() -> Dict[str, Any]:
"""Get list of supported formats and capabilities"""
formats = ["PNG", "JPEG", "JPG", "BMP", "TIFF", "WEBP"]
if PDF_SUPPORT:
formats.append("PDF")
if DOCX_SUPPORT:
formats.append("DOCX")
if EXCEL_SUPPORT:
formats.append("XLSX")
capabilities = [
"text_extraction",
"table_recognition",
"equation_conversion",
"layout_preservation",
"semantic_tagging"
]
if PDF_SUPPORT:
capabilities.append("multi_page_processing")
if DOCX_SUPPORT:
capabilities.append("word_document_processing")
if EXCEL_SUPPORT:
capabilities.append("spreadsheet_processing")
return {
"supported_formats": formats,
"input_methods": [
"base64_string",
"data_url",
"file_path"
],
"capabilities": capabilities,
"max_resolution": "Recommended: 2048x2048 pixels for images",
"processing_options": {
"pdf_processing": {
"supported": PDF_SUPPORT,
"dpi": "200 DPI conversion",
"max_pages": "No limit (processes all pages)",
"method": "OCR with vision model"
},
"word_processing": {
"supported": DOCX_SUPPORT,
"extracts": ["text", "headings", "tables"],
"method": "Direct text extraction"
},
"excel_processing": {
"supported": EXCEL_SUPPORT,
"extracts": ["all_worksheets", "tables", "data"],
"method": "Direct data extraction"
}
},
"output_format": "Structured markdown with appropriate separators"
}
def main():
"""Main entry point for the MCP server"""
import uvicorn
# Get the SSE app from FastMCP
app = mcp.sse_app
# Run the HTTP server
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)
if __name__ == "__main__":
main()