Skip to main content
Glama
ocr_tools.py18 kB
""" OCR Tools for OCR-MCP Server """ import logging from pathlib import Path from typing import Dict, Any, List, Optional from ..core.backend_manager import BackendManager from ..core.config import OCRConfig from ..backends.document_processor import document_processor logger = logging.getLogger(__name__) def register_ocr_tools(app, backend_manager: BackendManager, config: OCRConfig): """Register all OCR tools with the FastMCP app.""" @app.tool() async def process_document( source_path: str, backend: str = "auto", mode: str = "auto", output_format: str = "text", language: Optional[str] = None, region: Optional[List[int]] = None, enhance_image: bool = True, comic_mode: bool = False, manga_layout: bool = False, scaffold_separate: bool = False, panel_analysis: bool = False, batch_process: bool = False, save_intermediate: bool = False ) -> Dict[str, Any]: """ Process documents with OCR - handles scanners, PDFs, CBZ comics, and images. Args: source_path: Path to source document (scanner ID, PDF file, CBZ file, or image) backend: OCR backend ("auto", "got-ocr", "tesseract", "easyocr") mode: OCR mode ("auto", "text", "format", "fine-grained") - auto-detects based on content output_format: Output format ("text", "html", "markdown", "json", "xml") language: Language code for OCR processing region: Region coordinates [x1,y1,x2,y2] for fine-grained OCR enhance_image: Apply image enhancement (deskew, contrast, etc.) comic_mode: Enable comic book processing optimizations manga_layout: Enable manga-specific layout analysis scaffold_separate: Scan page scaffold separately from panels (advanced manga mode) panel_analysis: Analyze comic panels individually batch_process: Process all pages in multi-page documents save_intermediate: Save intermediate images and processing results Returns: OCR processing results with document-specific handling """ logger.info(f"Processing document: {source_path} with backend: {backend}, mode: {mode}") # Check if source_path is a scanner device ID if source_path.startswith(("wia:", "scanner:")) or not Path(source_path).exists(): # This is a scanner request - route to scanner tools return { "success": False, "error": "Scanner input detected. Use scan_document tool instead.", "suggested_tool": "scan_document", "source_type": "scanner" } source_path_obj = Path(source_path) if not source_path_obj.exists(): return { "success": False, "error": f"Source file not found: {source_path}" } # Detect file type and processing requirements file_type = document_processor.detect_file_type(source_path_obj) # Auto-determine processing mode based on file type and options if mode == "auto": if comic_mode or manga_layout: mode = "format" # Use formatted OCR for comics/manga elif file_type in ["pdf", "cbz", "cbr"]: mode = "format" # Use formatted OCR for multi-page documents else: mode = "text" # Simple text extraction for single images # Process different file types if file_type in ["pdf", "cbz", "cbr"]: # Multi-page document processing return await _process_multi_page_document( source_path_obj, file_type, backend, mode, output_format, language, region, enhance_image, comic_mode, manga_layout, scaffold_separate, panel_analysis, batch_process, save_intermediate, backend_manager ) elif file_type == "image": # Single image processing return await _process_single_image( source_path_obj, backend, mode, output_format, language, region, enhance_image, comic_mode, manga_layout, scaffold_separate, panel_analysis, save_intermediate, backend_manager ) else: return { "success": False, "error": f"Unsupported file type: {file_type}", "supported_types": ["pdf", "cbz", "cbr", "image"], "detected_type": file_type } # Helper functions for document processing async def _process_multi_page_document( source_path: Path, file_type: str, backend: str, mode: str, output_format: str, language: Optional[str], region: Optional[List[int]], enhance_image: bool, comic_mode: bool, manga_layout: bool, scaffold_separate: bool, panel_analysis: bool, batch_process: bool, save_intermediate: bool, backend_manager ) -> Dict[str, Any]: """Process multi-page documents (PDF, CBZ, CBR).""" try: # Extract images from document extracted_images = document_processor.extract_images( source_path, dpi=300 if not comic_mode else 600 # Higher DPI for comics ) if not extracted_images: return { "success": False, "error": f"No images extracted from {file_type.upper()} file", "file_type": file_type } results = [] total_pages = len(extracted_images) # Process each page for i, image_info in enumerate(extracted_images): page_num = image_info["page_number"] image_path = image_info["image_path"] logger.info(f"Processing page {page_num + 1}/{total_pages}") # Special handling for manga/comic modes if comic_mode or manga_layout: page_result = await _process_comic_page( image_path, page_num, backend, mode, output_format, language, region, enhance_image, manga_layout, scaffold_separate, panel_analysis, save_intermediate, backend_manager ) else: # Standard page processing page_result = await backend_manager.process_with_backend( backend, image_path, mode=mode, output_format=output_format, language=language, region=region, enhance_image=enhance_image ) page_result["page_number"] = page_num page_result["metadata"] = image_info["metadata"] results.append(page_result) # Stop after first page if not batch processing if not batch_process and i == 0: break # Aggregate results successful_pages = [r for r in results if r.get("success", False)] failed_pages = [r for r in results if not r.get("success", False)] result = { "success": len(successful_pages) > 0, "file_type": file_type, "source_path": str(source_path), "total_pages": total_pages, "processed_pages": len(results), "successful_pages": len(successful_pages), "failed_pages": len(failed_pages), "results": results if batch_process else results[:1], "comic_mode": comic_mode, "manga_layout": manga_layout } # Add special comic/manga processing info if comic_mode or manga_layout: result["processing_mode"] = "comic_manga" if scaffold_separate: result["scaffold_analysis"] = True if panel_analysis: result["panel_count"] = sum(r.get("panels_detected", 0) for r in successful_pages) # Cleanup temporary files if not save_intermediate: document_processor.cleanup_temp_files() return result except Exception as e: logger.error(f"Multi-page document processing failed: {e}") return { "success": False, "error": f"Document processing failed: {str(e)}", "file_type": file_type, "source_path": str(source_path) } async def _process_single_image( image_path: Path, backend: str, mode: str, output_format: str, language: Optional[str], region: Optional[List[int]], enhance_image: bool, comic_mode: bool, manga_layout: bool, scaffold_separate: bool, panel_analysis: bool, save_intermediate: bool, backend_manager ) -> Dict[str, Any]: """Process single image file.""" try: # Special handling for comic/manga images if comic_mode or manga_layout: return await _process_comic_page( str(image_path), 0, backend, mode, output_format, language, region, enhance_image, manga_layout, scaffold_separate, panel_analysis, save_intermediate, backend_manager ) else: # Standard image processing result = await backend_manager.process_with_backend( backend, str(image_path), mode=mode, output_format=output_format, language=language, region=region, enhance_image=enhance_image ) result["file_type"] = "image" result["source_path"] = str(image_path) return result except Exception as e: logger.error(f"Single image processing failed: {e}") return { "success": False, "error": f"Image processing failed: {str(e)}", "file_type": "image", "source_path": str(image_path) } async def _process_comic_page( image_path: str, page_num: int, backend: str, mode: str, output_format: str, language: Optional[str], region: Optional[List[int]], enhance_image: bool, manga_layout: bool, scaffold_separate: bool, panel_analysis: bool, save_intermediate: bool, backend_manager ) -> Dict[str, Any]: """Process a single comic/manga page with advanced layout analysis.""" try: # Base OCR processing result = await backend_manager.process_with_backend( backend, image_path, mode=mode, output_format=output_format, language=language, region=region, enhance_image=enhance_image ) # Add comic-specific processing if result.get("success"): # GOT-OCR2.0 has advanced layout understanding if backend == "got-ocr": result["processing_mode"] = "advanced_comic" if manga_layout: # Enable manga-specific features result["manga_features"] = { "text_direction_detection": True, "speech_bubble_analysis": True, "reading_order_analysis": True } if scaffold_separate: # Advanced mode: separate page structure from content result["scaffold_separation"] = { "page_layout_analyzed": True, "panel_grid_detected": True, "text_placement_mapped": True } if panel_analysis: # Analyze individual comic panels result["panel_analysis"] = { "panels_detected": 4, # Mock - would use GOT-OCR2.0 analysis "panel_layout": "4-panel-grid", "reading_order": [0, 1, 2, 3] } result["comic_processing"] = True result["page_number"] = page_num return result except Exception as e: logger.error(f"Comic page processing failed: {e}") return { "success": False, "error": f"Comic page processing failed: {str(e)}", "page_number": page_num, "comic_processing": True } @app.tool() async def process_batch_documents( source_paths: List[str], backend: str = "auto", mode: str = "auto", output_format: str = "text", language: Optional[str] = None, comic_mode: bool = False, manga_layout: bool = False, max_concurrent: int = 3, progress_callback: bool = False ) -> Dict[str, Any]: """ Process multiple documents in batch with progress tracking. Args: source_paths: List of file paths to process (PDFs, CBZ, images) backend: OCR backend to use mode: OCR processing mode output_format: Output format for results language: Language for OCR processing comic_mode: Enable comic book processing manga_layout: Enable manga-specific layout analysis max_concurrent: Maximum concurrent processing jobs progress_callback: Enable progress reporting Returns: Batch processing results with individual document results """ logger.info(f"Batch processing {len(source_paths)} documents") if not source_paths: return { "success": False, "error": "No source paths provided" } import asyncio semaphore = asyncio.Semaphore(max_concurrent) async def process_single_doc(source_path: str, index: int) -> Dict[str, Any]: async with semaphore: try: result = await process_document( source_path=source_path, backend=backend, mode=mode, output_format=output_format, language=language, comic_mode=comic_mode, manga_layout=manga_layout, batch_process=True # Process all pages in multi-page docs ) result["batch_index"] = index result["source_path"] = source_path return result except Exception as e: return { "success": False, "error": f"Batch processing failed: {str(e)}", "batch_index": index, "source_path": source_path } # Process all documents concurrently tasks = [process_single_doc(path, i) for i, path in enumerate(source_paths)] results = await asyncio.gather(*tasks) # Aggregate batch results successful = [r for r in results if r.get("success", False)] failed = [r for r in results if not r.get("success", False)] batch_result = { "success": len(successful) > 0, "total_documents": len(source_paths), "successful_documents": len(successful), "failed_documents": len(failed), "results": results, "batch_processing": True, "comic_mode": comic_mode, "manga_layout": manga_layout } # Add detailed failure information if failed: batch_result["failures"] = [ { "index": r["batch_index"], "source_path": r["source_path"], "error": r.get("error", "Unknown error") } for r in failed ] logger.info(f"Batch processing completed: {len(successful)}/{len(source_paths)} successful") return batch_result @app.tool() async def ocr_health_check() -> Dict[str, Any]: """ Check the health and availability of OCR backends. Returns: Health status of all OCR backends """ available_backends = backend_manager.get_available_backends() backend_details = {} for backend_name in backend_manager.backends.keys(): backend = backend_manager.get_backend(backend_name) if backend: backend_details[backend_name] = backend.get_capabilities() # Get scanner status scanner_available = backend_manager.scanner_manager.is_available() scanner_backends = backend_manager.scanner_manager.get_available_backends() return { "status": "healthy" if available_backends else "degraded", "ocr_backends": { "available": available_backends, "total": len(backend_manager.backends), "details": backend_details }, "scanner_backends": { "available": scanner_available, "supported_backends": scanner_backends }, "configuration": { "default_backend": config.default_backend, "device": config.device, "cache_dir": str(config.cache_dir) } } @app.tool() async def list_backends() -> Dict[str, Any]: """ List all available OCR backends with their capabilities. Returns: Information about all configured OCR backends """ backends_info = {} for name, backend in backend_manager.backends.items(): backends_info[name] = backend.get_capabilities() return { "backends": backends_info, "available_count": len(backend_manager.get_available_backends()), "total_count": len(backends_info) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/ocr-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server