Skip to main content
Glama
main.py5.44 kB
import json import os from pathlib import Path from typing import Any, List import httpx from mcp.server.fastmcp import FastMCP from mistralai import Mistral, OCRResponse import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastMCP server mcp = FastMCP("lizeur") class Lizeur: def __init__(self): if os.getenv("MISTRAL_API_KEY") is None: raise ValueError("MISTRAL_API_KEY is not set") self.mistral = Mistral(api_key=os.getenv("MISTRAL_API_KEY")) self.cache_path = ( Path.home() / ".cache/lizeur" if os.getenv("CACHE_PATH") is None else Path(os.getenv("CACHE_PATH")) ) self.cache_path.mkdir(parents=True, exist_ok=True) # Representing a list of OCR'ed documents. The value being the name of the document situated in the cache_path. self.cached_documents: List[str] = [ file.name for file in self.cache_path.iterdir() if file.is_file() ] def read_document(self, path: Path) -> OCRResponse | None: """Read a document and return the OCRResponse.""" logging.info(f"read_document: Reading document {path.name}") # Check if the document is already cached cached_document_path = self.cache_path / path.name if cached_document_path.exists(): logging.info(f"read_document: Document {path.name} is already cached.") try: with open(cached_document_path, "r") as f: cached_json = f.read() # Parse JSON and reconstruct OCRResponse cached_data = json.loads(cached_json) return OCRResponse.model_validate(cached_data) except (json.JSONDecodeError, ValueError) as e: logging.warning(f"Failed to load cached document {path.name}: {e}") # Remove corrupted cache file cached_document_path.unlink(missing_ok=True) # OCR the document ocr_response = self._ocr_document(path) if ocr_response is None: return None # Cache the document using model_dump_json() for direct JSON serialization try: with open(cached_document_path, "w") as f: f.write(ocr_response.model_dump_json(indent=2)) logging.info(f"Successfully cached document {path.name}") except Exception as e: logging.error(f"Failed to cache document {path.name}: {e}") return ocr_response def _ocr_document(self, path: Path) -> OCRResponse | None: """OCR a document and return the OCRResponse.""" try: # Upload the file to MistralAI uploaded_file = self.mistral.files.upload( file={ "file_name": path.stem, "content": path.read_bytes(), }, purpose="ocr", ) # Process the uploaded file with OCR ocr_response = self.mistral.ocr.process( document={ "type": "file", "file_id": uploaded_file.id, }, model="mistral-ocr-latest", include_image_base64=True, ) # Clean up the uploaded file try: self.mistral.files.delete(uploaded_file.id) except Exception as e: logging.warning( f"Failed to delete uploaded file {uploaded_file.id}: {e}" ) return ocr_response except Exception as e: logging.error(f"OCR processing failed for {path}: {e}") return None @mcp.tool() def read_pdf(absolute_path: str) -> dict: """Read a PDF document and return the complete OCRResponse as a dictionary. Returns the full OCR response including all pages, not just the first page. The response includes pages with markdown content, bounding boxes, and other OCR metadata. """ ocr_response = Lizeur().read_document(Path(absolute_path)) if ocr_response is None: return {"error": "Failed to process document"} response_data = ocr_response.model_dump() return { "status": "success", "document_path": absolute_path, "total_pages": len(response_data.get("pages", [])), "pages": response_data.get("pages", []), "metadata": { k: v for k, v in response_data.items() if k != "pages" and k != "model_dump_json" }, } @mcp.tool() def read_pdf_text(absolute_path: str) -> str: """Read a PDF document and return only the markdown text content from all pages. This is a simpler alternative to read_pdf that returns just the text content without the full OCR metadata, which can be easier for agents to process. """ ocr_response = Lizeur().read_document(Path(absolute_path)) if ocr_response is None: return "Error: Failed to process document" # Combine all pages' markdown content all_text = [] for i, page in enumerate(ocr_response.pages): if hasattr(page, "markdown") and page.markdown: all_text.append(f"--- Page {i+1} ---\n{page.markdown}") return "\n\n".join(all_text) if all_text else "No text content found" def main(): mcp.run("stdio") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SilverBzH/lizeur'

If you have feedback or need assistance with the MCP directory API, please join our Discord server