mcp-mistral-ocr
by everaldo
- src
- mcp_mistral_ocr
import os
import json
import base64
from pathlib import Path
from typing import Dict, Any, Optional
from datetime import datetime
from urllib.parse import urlparse
from mistralai import Mistral
class MistralOCRProcessor:
def __init__(self, api_key: str):
self.api_key = api_key
self.max_file_size = 50 * 1024 * 1024 # 50MB in bytes
def _save_result(self, result: Dict[str, Any], source_name: str, output_dir: Path) -> None:
"""Save OCR result to output directory with timestamp"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = output_dir / f"{source_name}_{timestamp}.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
def _process_response(self, response) -> str:
"""Convert OCR response to JSON string"""
return json.dumps(response.model_dump(), ensure_ascii=False)
async def process_local_file(self, file_path: Path, output_dir: Path) -> str:
"""Process a local file using Mistral's OCR capabilities"""
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Check file size
file_size = file_path.stat().st_size
if file_size > self.max_file_size:
raise ValueError(f"File size exceeds 50MB limit: {file_size / 1024 / 1024:.2f}MB")
file_extension = file_path.suffix.lower()
client = Mistral(api_key=self.api_key)
try:
if file_extension in ['.jpg', '.jpeg', '.png', '.gif', '.webp']:
# Handle image files with base64 encoding
base64_image = self._encode_image(file_path)
if not base64_image:
raise ValueError("Failed to encode image")
response = client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "image_url",
"image_url": f"data:image/jpeg;base64,{base64_image}"
}
)
else:
# Handle PDF and other document types
uploaded_file = client.files.upload(
file={
"file_name": file_path.name,
"content": open(file_path, "rb"),
},
purpose="ocr"
)
# Get signed URL for processing
signed_url = client.files.get_signed_url(file_id=uploaded_file.id)
# Process the document
response = client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": signed_url.url,
}
)
# Convert response to JSON string
result = self._process_response(response)
# Save result to output directory
source_name = file_path.stem
self._save_result(json.loads(result), source_name, output_dir)
return result
except Exception as e:
raise Exception(f"Error processing file with Mistral API: {str(e)}")
async def process_url_file(self, url: str, file_type: str, output_dir: Path) -> str:
"""Process a file from a URL using Mistral's OCR capabilities"""
try:
if file_type not in ["image", "pdf"]:
raise ValueError("file_type must be either 'image' or 'pdf'")
client = Mistral(api_key=self.api_key)
response = client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "image_url" if file_type == "image" else "document_url",
f"{'image' if file_type == 'image' else 'document'}_url": url
}
)
# Convert response to JSON string
result = self._process_response(response)
# Extract filename from URL
parsed_url = urlparse(url)
source_name = Path(parsed_url.path).stem or 'url_document'
# Save result to output directory
self._save_result(json.loads(result), source_name, output_dir)
return result
except Exception as e:
raise Exception(f"Error processing URL with Mistral API: {str(e)}")
def _encode_image(self, image_path: Path) -> Optional[str]:
"""Encode an image file to base64."""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except Exception as e:
print(f"Error encoding image: {e}")
return None