Skip to main content
Glama
juanqui
by juanqui
parser_docling.py27.1 kB
"""PDF parser using the Docling library for advanced document processing.""" import asyncio import logging import time from pathlib import Path from typing import Any, Dict, Optional from .parser import DocumentParser, PageContent, ParseResult logger = logging.getLogger(__name__) # Default configuration following the design specification DOCLING_DEFAULT_CONFIG = { # OCR Configuration "ocr_enabled": True, "ocr_engine": "easyocr", # Options: easyocr, tesseract, rapidocr, ocrmac, onnxtr "ocr_languages": ["en"], "ocr_device": "auto", # auto, cpu, cuda, mps # Table Processing "table_processing_mode": "FAST", # FAST, ACCURATE "table_extraction_enabled": True, # Enrichment Features "formula_enrichment": True, "picture_description": False, # Requires additional models "code_understanding": False, # Requires additional models # Performance Settings "processing_timeout": 300, # seconds "max_pages": None, # None for no limit "device_selection": "auto", # Output Options "export_format": "markdown", "include_images": False, "preserve_layout": True, # Security Settings "max_file_size": 100 * 1024 * 1024, # 100MB default "allow_external_resources": False, } # Supported OCR engines with their import requirements OCR_ENGINE_DEPENDENCIES = { "easyocr": ["easyocr"], "tesseract": ["pytesseract"], "rapidocr": ["rapidocr_onnxruntime"], "ocrmac": [], # System-dependent (macOS only) "onnxtr": ["doctr"], } class DoclingParser(DocumentParser): """PDF parser using the Docling library for advanced document processing.""" def __init__(self, config: Optional[Dict[str, Any]] = None, cache_dir: Path = None): """Initialize the Docling parser with comprehensive configuration support. Args: config: Configuration options for docling processing. cache_dir: Directory to cache parsed markdown files. """ super().__init__(cache_dir) # Merge user config with defaults and validate self.config = self._validate_config({**DOCLING_DEFAULT_CONFIG, **(config or {})}) # Determine available features self.available_features = self._configure_available_features() logger.debug(f"DoclingParser initialized with config: {self.config}") logger.debug(f"Available features: {self.available_features}") async def parse(self, file_path: Path) -> ParseResult: """Parse a PDF file using Docling library with advanced processing. Args: file_path: Path to the PDF file. Returns: ParseResult with markdown content and metadata. """ start_time = time.time() try: # Check cache first cache_path = None if self.cache_dir: cache_path = self._get_cache_path(file_path) if self._is_cache_valid(file_path, cache_path): logger.debug(f"Loading parsed content from cache: {cache_path}") markdown_content = self._load_from_cache(cache_path) metadata = self._load_metadata_from_cache(cache_path) if markdown_content is not None and metadata: # Create page-aware result # Since Docling doesn't provide per-page markdown, create single page pages = [ PageContent( page_number=1, markdown_content=markdown_content, metadata={"from_cache": True, "total_pages": metadata.get("page_count", 1)}, ) ] return ParseResult(pages=pages, metadata=metadata) # Validate input file for security self._validate_input_file(file_path) logger.info(f"Starting Docling parsing: {file_path}") logger.debug(f"Docling configuration: {self.config}") # Lazy imports with comprehensive error handling try: from docling.datamodel.base_models import InputFormat from docling.datamodel.pipeline_options import PdfPipelineOptions from docling.document_converter import DocumentConverter, PdfFormatOption except ImportError as e: raise ImportError( "Docling library not available. Install with: pip install docling[complete] " "or pip install docling for basic functionality" ) from e # Create pipeline options and converter with new v2 API pipeline_options = self._build_pipeline_options(PdfPipelineOptions) converter = self._create_safe_converter(DocumentConverter, PdfFormatOption, InputFormat, pipeline_options) # Apply timeout wrapper for processing conversion_result = await self._convert_with_timeout(converter, file_path) # Handle any conversion errors or warnings self._handle_conversion_errors(conversion_result) # Extract markdown content markdown_content = conversion_result.document.export_to_markdown() if not markdown_content or not markdown_content.strip(): raise RuntimeError("Docling produced empty markdown content") # Extract comprehensive metadata metadata = self._extract_metadata_from_result(conversion_result, file_path) # Add processing information processing_time = time.time() - start_time metadata.update( { "processing_timestamp": "N/A", # Will be set by PDFProcessor "processor_version": "docling", "source_filename": file_path.name, "source_directory": str(file_path.parent), "docling_processing_time": processing_time, "docling_features_used": self._get_features_used(), } ) # Save to cache if enabled if cache_path: logger.debug(f"Saving parsed content to cache: {cache_path}") self._save_to_cache(cache_path, markdown_content) self._save_metadata_to_cache(cache_path, metadata) # Log processing statistics logger.info(f"Docling parsing completed in {processing_time:.2f}s: {file_path}") logger.debug(f"Pages processed: {metadata.get('page_count', 'unknown')}") logger.debug(f"OCR enabled: {self.available_features['ocr']}") logger.debug(f"Tables extracted: {metadata.get('table_count', 0)}") # Create page-aware result # Since Docling export_to_markdown() returns combined markdown, # we'll create a single page with all content for now # TODO: In future, could parse conversion_result.document.pages individually pages = [ PageContent( page_number=1, markdown_content=markdown_content, metadata={"total_pages": metadata.get("page_count", 1)}, ) ] return ParseResult(pages=pages, metadata=metadata) except ImportError: # Re-raise ImportError as-is for parser fallback logic raise except Exception as e: processing_time = time.time() - start_time logger.error(f"Docling parsing failed after {processing_time:.2f}s: {file_path} - {e}") raise RuntimeError(f"Failed to parse PDF with Docling: {e}") from e def _validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]: """Validate and normalize docling configuration. Args: config: Configuration dictionary to validate. Returns: Validated and normalized configuration. Raises: ValueError: If configuration values are invalid. """ validated = config.copy() # Validate OCR engine if validated["ocr_engine"] not in OCR_ENGINE_DEPENDENCIES: supported_engines = list(OCR_ENGINE_DEPENDENCIES.keys()) raise ValueError( f"Unsupported OCR engine: {validated['ocr_engine']}. " f"Supported engines: {supported_engines}" ) # Validate table processing mode if validated["table_processing_mode"] not in ["FAST", "ACCURATE"]: raise ValueError("table_processing_mode must be FAST or ACCURATE") # Validate device selection if validated["device_selection"] not in ["auto", "cpu", "cuda", "mps"]: raise ValueError("device_selection must be auto, cpu, cuda, or mps") # Validate timeout if validated["processing_timeout"] <= 0: raise ValueError("processing_timeout must be positive") # Validate file size limit if validated["max_file_size"] <= 0: raise ValueError("max_file_size must be positive") # Validate OCR languages (must be list) if not isinstance(validated["ocr_languages"], list): if isinstance(validated["ocr_languages"], str): validated["ocr_languages"] = [validated["ocr_languages"]] else: raise ValueError("ocr_languages must be a list of language codes") return validated def _configure_available_features(self) -> Dict[str, bool]: """Determine which docling features are available based on dependencies. Returns: Dictionary mapping feature names to availability status. """ features = { "ocr": False, "table_extraction": True, # Always available with docling "formula_enrichment": True, # Always available with docling "picture_description": False, "code_understanding": False, } # Check OCR availability if self.config["ocr_enabled"]: selected_engine = self._get_available_ocr_engine(self.config["ocr_engine"]) if selected_engine: features["ocr"] = True # Update config with available engine if different from requested if selected_engine != self.config["ocr_engine"]: self.config["ocr_engine"] = selected_engine # Check enrichment model availability (simplified check) try: # These features require additional model downloads # For now, we'll conservatively set them to False unless explicitly tested features["picture_description"] = self.config.get("picture_description", False) features["code_understanding"] = self.config.get("code_understanding", False) except Exception: logger.debug("Advanced enrichment features not available") return features def _get_available_ocr_engine(self, preferred_engine: str) -> Optional[str]: """Select best available OCR engine with fallback logic. Args: preferred_engine: The preferred OCR engine to use. Returns: Available OCR engine name or None if no engines available. """ # Check if preferred engine is available if self._check_ocr_engine_available(preferred_engine): return preferred_engine # Fallback order: easyocr -> tesseract -> rapidocr -> ocrmac -> onnxtr fallback_order = ["easyocr", "tesseract", "rapidocr", "ocrmac", "onnxtr"] for engine in fallback_order: if engine != preferred_engine and self._check_ocr_engine_available(engine): logger.warning(f"Preferred OCR engine '{preferred_engine}' not available, using '{engine}'") return engine # No OCR engines available logger.warning("No OCR engines available, proceeding without OCR") return None def _check_ocr_engine_available(self, engine: str) -> bool: """Check if a specific OCR engine is available. Args: engine: OCR engine name to check. Returns: True if engine is available, False otherwise. """ if engine == "ocrmac": import platform return platform.system() == "Darwin" dependencies = OCR_ENGINE_DEPENDENCIES.get(engine, []) for dep in dependencies: try: __import__(dep) except ImportError: logger.debug(f"OCR engine '{engine}' dependency '{dep}' not available") return False return True def _build_pipeline_options(self, PdfPipelineOptions): """Build PdfPipelineOptions from configuration using Docling v2 API. Args: PdfPipelineOptions: The PdfPipelineOptions class from docling. Returns: Configured PdfPipelineOptions instance. """ try: pipeline_options = PdfPipelineOptions() # Configure OCR using new v2 API if self.available_features["ocr"] and self.config["ocr_enabled"]: pipeline_options.do_ocr = True # Set OCR engine-specific options ocr_engine = self.config["ocr_engine"] if ocr_engine == "rapidocr": from docling.datamodel.pipeline_options import RapidOcrOptions pipeline_options.ocr_options = RapidOcrOptions() elif ocr_engine == "easyocr": from docling.datamodel.pipeline_options import EasyOcrOptions pipeline_options.ocr_options = EasyOcrOptions() if hasattr(pipeline_options.ocr_options, "lang"): pipeline_options.ocr_options.lang = self.config["ocr_languages"] elif ocr_engine == "tesseract": from docling.datamodel.pipeline_options import TesseractOcrOptions pipeline_options.ocr_options = TesseractOcrOptions() elif ocr_engine == "ocrmac": from docling.datamodel.pipeline_options import OcrMacOptions pipeline_options.ocr_options = OcrMacOptions() else: # Fallback to base OcrOptions from docling.datamodel.pipeline_options import OcrOptions pipeline_options.ocr_options = OcrOptions() else: pipeline_options.do_ocr = False # Configure table processing using new v2 API pipeline_options.do_table_structure = self.config["table_extraction_enabled"] if pipeline_options.do_table_structure: # Set table mode based on config if self.config["table_processing_mode"].upper() == "ACCURATE": from docling.datamodel.pipeline_options import TableFormerMode pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE # Configure enrichment features using new v2 API pipeline_options.do_formula_enrichment = ( self.config["formula_enrichment"] and self.available_features["formula_enrichment"] ) pipeline_options.do_picture_description = ( self.config["picture_description"] and self.available_features["picture_description"] ) pipeline_options.do_code_enrichment = ( self.config["code_understanding"] and self.available_features["code_understanding"] ) # Apply resource limits and timeouts pipeline_options.document_timeout = self.config["processing_timeout"] # Apply additional resource limits pipeline_options = self._apply_resource_limits(pipeline_options) return pipeline_options except Exception as e: logger.warning(f"Failed to configure pipeline options: {e}, using defaults") return PdfPipelineOptions() def _apply_resource_limits(self, pipeline_options): """Apply resource limits to prevent excessive resource usage. Args: pipeline_options: PdfPipelineOptions instance to modify. Returns: Modified pipeline_options with resource limits applied. """ # Limit maximum pages processed if self.config.get("max_pages"): if hasattr(pipeline_options, "page_range"): pipeline_options.page_range = (1, self.config["max_pages"]) # Configure device usage device = self.config.get("device_selection", "auto") if device != "auto" and hasattr(pipeline_options, "device"): pipeline_options.device = device # Memory limits for table processing if ( self.config["table_processing_mode"] == "FAST" and hasattr(pipeline_options, "table_options") and hasattr(pipeline_options.table_options, "max_table_size") ): pipeline_options.table_options.max_table_size = 1000000 # 1MB limit return pipeline_options def _create_safe_converter(self, DocumentConverter, PdfFormatOption, InputFormat, pipeline_options): """Create DocumentConverter with security restrictions using Docling v2 API. Args: DocumentConverter: The DocumentConverter class from docling. PdfFormatOption: The PdfFormatOption class from docling. InputFormat: The InputFormat enum from docling. pipeline_options: Configured PdfPipelineOptions instance. Returns: Configured DocumentConverter instance. """ try: # Create converter with format options (new v2 API) converter = DocumentConverter( format_options={InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)} ) return converter except Exception as e: logger.warning(f"Failed to configure converter: {e}, using defaults") # Fallback to basic converter return DocumentConverter() def _optimize_memory_usage(self, converter): """Configure converter for optimal memory usage. Args: converter: DocumentConverter instance to optimize. Returns: Optimized converter instance. """ try: # Configure batch processing for large documents if self.config.get("max_pages") and hasattr(converter, "config"): converter.config.max_pages = self.config["max_pages"] # Configure memory-efficient table processing if self.config["table_processing_mode"] == "FAST" and hasattr(converter, "config"): # Use faster, less memory-intensive processing pass except Exception as e: logger.debug(f"Memory optimization failed: {e}") return converter async def _convert_with_timeout(self, converter, file_path: Path): """Convert document with timeout handling using Docling v2 API. Args: converter: DocumentConverter instance. file_path: Path to the PDF file. Returns: ConversionResult from docling. Raises: RuntimeError: If conversion times out or fails. """ timeout = self.config["processing_timeout"] try: # Create a coroutine wrapper for the synchronous conversion async def convert_async(): # Use new v2 API - no pipeline_options parameter return converter.convert(str(file_path)) # Apply timeout return await asyncio.wait_for(convert_async(), timeout=timeout) except asyncio.TimeoutError: raise RuntimeError(f"Docling processing timed out after {timeout} seconds") except Exception as e: raise RuntimeError(f"Docling conversion failed: {e}") from e def _handle_conversion_errors(self, conversion_result): """Handle partial conversion failures and log detailed error information. Args: conversion_result: ConversionResult from docling. Raises: RuntimeError: If conversion failed completely. """ try: # Import ConversionStatus enum from docling.datamodel.base_models import ConversionStatus if conversion_result.status == ConversionStatus.FAILURE: error_details = [] # Collect error information from result if hasattr(conversion_result, "errors") and conversion_result.errors: error_details.extend([str(error) for error in conversion_result.errors]) error_msg = ( f"Docling conversion failed: {'; '.join(error_details) if error_details else 'Unknown error'}" ) raise RuntimeError(error_msg) elif conversion_result.status == ConversionStatus.PARTIAL_SUCCESS: warning_details = [] if hasattr(conversion_result, "warnings") and conversion_result.warnings: warning_details.extend([str(warning) for warning in conversion_result.warnings]) logger.warning(f"Docling conversion partially successful: {'; '.join(warning_details)}") except ImportError: # If we can't import ConversionStatus, do basic checks if not conversion_result or not hasattr(conversion_result, "document"): raise RuntimeError("Docling conversion failed: Invalid result") if not conversion_result.document: raise RuntimeError("Docling conversion failed: No document produced") def _extract_metadata_from_result(self, conversion_result, file_path: Path) -> Dict[str, Any]: """Extract comprehensive metadata from Docling's ConversionResult. Args: conversion_result: ConversionResult from docling. file_path: Path to the original PDF file. Returns: Dictionary of extracted metadata. """ metadata = {} try: document = conversion_result.document # Basic document information metadata["page_count"] = len(document.pages) if document.pages else 1 # Count different element types element_counts = {} table_count = 0 image_count = 0 formula_count = 0 for page in document.pages: for element in page.elements: element_type = type(element).__name__ element_counts[element_type] = element_counts.get(element_type, 0) + 1 # Count specific elements of interest if "table" in element_type.lower(): table_count += 1 elif "image" in element_type.lower() or "picture" in element_type.lower(): image_count += 1 elif "formula" in element_type.lower() or "equation" in element_type.lower(): formula_count += 1 metadata.update( { "element_types": element_counts, "total_elements": sum(element_counts.values()), "table_count": table_count, "image_count": image_count, "formula_count": formula_count, } ) # Document-level metadata if available if hasattr(document, "metadata") and document.metadata: doc_metadata = document.metadata for key, value in doc_metadata.items(): if key not in metadata: # Don't override our counts metadata[f"doc_{key}"] = value # Processing statistics if hasattr(conversion_result, "processing_stats"): metadata["processing_stats"] = conversion_result.processing_stats except Exception as e: logger.warning(f"Failed to extract detailed metadata from Docling result: {e}") # Provide minimal fallback metadata metadata.update( { "page_count": 1, "element_types": {}, "total_elements": 0, "table_count": 0, "image_count": 0, "formula_count": 0, } ) return metadata def _get_features_used(self) -> Dict[str, bool]: """Get a summary of which features were actually used during processing. Returns: Dictionary mapping feature names to usage status. """ return { "ocr_enabled": self.available_features["ocr"] and self.config["ocr_enabled"], "table_extraction": self.config["table_extraction_enabled"], "formula_enrichment": (self.config["formula_enrichment"] and self.available_features["formula_enrichment"]), "picture_description": ( self.config["picture_description"] and self.available_features["picture_description"] ), "code_understanding": (self.config["code_understanding"] and self.available_features["code_understanding"]), "ocr_engine": self.config["ocr_engine"] if self.available_features["ocr"] else None, "table_mode": self.config["table_processing_mode"], } def _validate_input_file(self, file_path: Path) -> None: """Validate input file for security and basic requirements. Args: file_path: Path to the file to validate. Raises: RuntimeError: If file validation fails. """ # Check file exists if not file_path.exists(): raise RuntimeError(f"File not found: {file_path}") # Check file size limits file_size = file_path.stat().st_size max_size = self.config["max_file_size"] if file_size > max_size: raise RuntimeError(f"File too large: {file_size} bytes (max: {max_size})") if file_size == 0: raise RuntimeError("File is empty") # Validate file extension if file_path.suffix.lower() != ".pdf": raise RuntimeError(f"Invalid file type: {file_path.suffix}") # Basic PDF header validation try: with open(file_path, "rb") as f: header = f.read(8) if not header.startswith(b"%PDF-"): # Allow non-PDF files for testing, but log a warning logger.warning(f"File does not have PDF header: {file_path}") except Exception as e: raise RuntimeError(f"Cannot read file: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/juanqui/pdfkb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server