"""PDF parser using the Docling library for advanced document processing."""
import asyncio
import logging
import time
from pathlib import Path
from typing import Any, Dict, Optional
from .parser import DocumentParser, PageContent, ParseResult
logger = logging.getLogger(__name__)
# Default configuration following the design specification
DOCLING_DEFAULT_CONFIG = {
# OCR Configuration
"ocr_enabled": True,
"ocr_engine": "easyocr", # Options: easyocr, tesseract, rapidocr, ocrmac, onnxtr
"ocr_languages": ["en"],
"ocr_device": "auto", # auto, cpu, cuda, mps
# Table Processing
"table_processing_mode": "FAST", # FAST, ACCURATE
"table_extraction_enabled": True,
# Enrichment Features
"formula_enrichment": True,
"picture_description": False, # Requires additional models
"code_understanding": False, # Requires additional models
# Performance Settings
"processing_timeout": 300, # seconds
"max_pages": None, # None for no limit
"device_selection": "auto",
# Output Options
"export_format": "markdown",
"include_images": False,
"preserve_layout": True,
# Security Settings
"max_file_size": 100 * 1024 * 1024, # 100MB default
"allow_external_resources": False,
}
# Supported OCR engines with their import requirements
OCR_ENGINE_DEPENDENCIES = {
"easyocr": ["easyocr"],
"tesseract": ["pytesseract"],
"rapidocr": ["rapidocr_onnxruntime"],
"ocrmac": [], # System-dependent (macOS only)
"onnxtr": ["doctr"],
}
class DoclingParser(DocumentParser):
"""PDF parser using the Docling library for advanced document processing."""
def __init__(self, config: Optional[Dict[str, Any]] = None, cache_dir: Path = None):
"""Initialize the Docling parser with comprehensive configuration support.
Args:
config: Configuration options for docling processing.
cache_dir: Directory to cache parsed markdown files.
"""
super().__init__(cache_dir)
# Merge user config with defaults and validate
self.config = self._validate_config({**DOCLING_DEFAULT_CONFIG, **(config or {})})
# Determine available features
self.available_features = self._configure_available_features()
logger.debug(f"DoclingParser initialized with config: {self.config}")
logger.debug(f"Available features: {self.available_features}")
async def parse(self, file_path: Path) -> ParseResult:
"""Parse a PDF file using Docling library with advanced processing.
Args:
file_path: Path to the PDF file.
Returns:
ParseResult with markdown content and metadata.
"""
start_time = time.time()
try:
# Check cache first
cache_path = None
if self.cache_dir:
cache_path = self._get_cache_path(file_path)
if self._is_cache_valid(file_path, cache_path):
logger.debug(f"Loading parsed content from cache: {cache_path}")
markdown_content = self._load_from_cache(cache_path)
metadata = self._load_metadata_from_cache(cache_path)
if markdown_content is not None and metadata:
# Create page-aware result
# Since Docling doesn't provide per-page markdown, create single page
pages = [
PageContent(
page_number=1,
markdown_content=markdown_content,
metadata={"from_cache": True, "total_pages": metadata.get("page_count", 1)},
)
]
return ParseResult(pages=pages, metadata=metadata)
# Validate input file for security
self._validate_input_file(file_path)
logger.info(f"Starting Docling parsing: {file_path}")
logger.debug(f"Docling configuration: {self.config}")
# Lazy imports with comprehensive error handling
try:
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import DocumentConverter, PdfFormatOption
except ImportError as e:
raise ImportError(
"Docling library not available. Install with: pip install docling[complete] "
"or pip install docling for basic functionality"
) from e
# Create pipeline options and converter with new v2 API
pipeline_options = self._build_pipeline_options(PdfPipelineOptions)
converter = self._create_safe_converter(DocumentConverter, PdfFormatOption, InputFormat, pipeline_options)
# Apply timeout wrapper for processing
conversion_result = await self._convert_with_timeout(converter, file_path)
# Handle any conversion errors or warnings
self._handle_conversion_errors(conversion_result)
# Extract markdown content
markdown_content = conversion_result.document.export_to_markdown()
if not markdown_content or not markdown_content.strip():
raise RuntimeError("Docling produced empty markdown content")
# Extract comprehensive metadata
metadata = self._extract_metadata_from_result(conversion_result, file_path)
# Add processing information
processing_time = time.time() - start_time
metadata.update(
{
"processing_timestamp": "N/A", # Will be set by PDFProcessor
"processor_version": "docling",
"source_filename": file_path.name,
"source_directory": str(file_path.parent),
"docling_processing_time": processing_time,
"docling_features_used": self._get_features_used(),
}
)
# Save to cache if enabled
if cache_path:
logger.debug(f"Saving parsed content to cache: {cache_path}")
self._save_to_cache(cache_path, markdown_content)
self._save_metadata_to_cache(cache_path, metadata)
# Log processing statistics
logger.info(f"Docling parsing completed in {processing_time:.2f}s: {file_path}")
logger.debug(f"Pages processed: {metadata.get('page_count', 'unknown')}")
logger.debug(f"OCR enabled: {self.available_features['ocr']}")
logger.debug(f"Tables extracted: {metadata.get('table_count', 0)}")
# Create page-aware result
# Since Docling export_to_markdown() returns combined markdown,
# we'll create a single page with all content for now
# TODO: In future, could parse conversion_result.document.pages individually
pages = [
PageContent(
page_number=1,
markdown_content=markdown_content,
metadata={"total_pages": metadata.get("page_count", 1)},
)
]
return ParseResult(pages=pages, metadata=metadata)
except ImportError:
# Re-raise ImportError as-is for parser fallback logic
raise
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Docling parsing failed after {processing_time:.2f}s: {file_path} - {e}")
raise RuntimeError(f"Failed to parse PDF with Docling: {e}") from e
def _validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and normalize docling configuration.
Args:
config: Configuration dictionary to validate.
Returns:
Validated and normalized configuration.
Raises:
ValueError: If configuration values are invalid.
"""
validated = config.copy()
# Validate OCR engine
if validated["ocr_engine"] not in OCR_ENGINE_DEPENDENCIES:
supported_engines = list(OCR_ENGINE_DEPENDENCIES.keys())
raise ValueError(
f"Unsupported OCR engine: {validated['ocr_engine']}. " f"Supported engines: {supported_engines}"
)
# Validate table processing mode
if validated["table_processing_mode"] not in ["FAST", "ACCURATE"]:
raise ValueError("table_processing_mode must be FAST or ACCURATE")
# Validate device selection
if validated["device_selection"] not in ["auto", "cpu", "cuda", "mps"]:
raise ValueError("device_selection must be auto, cpu, cuda, or mps")
# Validate timeout
if validated["processing_timeout"] <= 0:
raise ValueError("processing_timeout must be positive")
# Validate file size limit
if validated["max_file_size"] <= 0:
raise ValueError("max_file_size must be positive")
# Validate OCR languages (must be list)
if not isinstance(validated["ocr_languages"], list):
if isinstance(validated["ocr_languages"], str):
validated["ocr_languages"] = [validated["ocr_languages"]]
else:
raise ValueError("ocr_languages must be a list of language codes")
return validated
def _configure_available_features(self) -> Dict[str, bool]:
"""Determine which docling features are available based on dependencies.
Returns:
Dictionary mapping feature names to availability status.
"""
features = {
"ocr": False,
"table_extraction": True, # Always available with docling
"formula_enrichment": True, # Always available with docling
"picture_description": False,
"code_understanding": False,
}
# Check OCR availability
if self.config["ocr_enabled"]:
selected_engine = self._get_available_ocr_engine(self.config["ocr_engine"])
if selected_engine:
features["ocr"] = True
# Update config with available engine if different from requested
if selected_engine != self.config["ocr_engine"]:
self.config["ocr_engine"] = selected_engine
# Check enrichment model availability (simplified check)
try:
# These features require additional model downloads
# For now, we'll conservatively set them to False unless explicitly tested
features["picture_description"] = self.config.get("picture_description", False)
features["code_understanding"] = self.config.get("code_understanding", False)
except Exception:
logger.debug("Advanced enrichment features not available")
return features
def _get_available_ocr_engine(self, preferred_engine: str) -> Optional[str]:
"""Select best available OCR engine with fallback logic.
Args:
preferred_engine: The preferred OCR engine to use.
Returns:
Available OCR engine name or None if no engines available.
"""
# Check if preferred engine is available
if self._check_ocr_engine_available(preferred_engine):
return preferred_engine
# Fallback order: easyocr -> tesseract -> rapidocr -> ocrmac -> onnxtr
fallback_order = ["easyocr", "tesseract", "rapidocr", "ocrmac", "onnxtr"]
for engine in fallback_order:
if engine != preferred_engine and self._check_ocr_engine_available(engine):
logger.warning(f"Preferred OCR engine '{preferred_engine}' not available, using '{engine}'")
return engine
# No OCR engines available
logger.warning("No OCR engines available, proceeding without OCR")
return None
def _check_ocr_engine_available(self, engine: str) -> bool:
"""Check if a specific OCR engine is available.
Args:
engine: OCR engine name to check.
Returns:
True if engine is available, False otherwise.
"""
if engine == "ocrmac":
import platform
return platform.system() == "Darwin"
dependencies = OCR_ENGINE_DEPENDENCIES.get(engine, [])
for dep in dependencies:
try:
__import__(dep)
except ImportError:
logger.debug(f"OCR engine '{engine}' dependency '{dep}' not available")
return False
return True
def _build_pipeline_options(self, PdfPipelineOptions):
"""Build PdfPipelineOptions from configuration using Docling v2 API.
Args:
PdfPipelineOptions: The PdfPipelineOptions class from docling.
Returns:
Configured PdfPipelineOptions instance.
"""
try:
pipeline_options = PdfPipelineOptions()
# Configure OCR using new v2 API
if self.available_features["ocr"] and self.config["ocr_enabled"]:
pipeline_options.do_ocr = True
# Set OCR engine-specific options
ocr_engine = self.config["ocr_engine"]
if ocr_engine == "rapidocr":
from docling.datamodel.pipeline_options import RapidOcrOptions
pipeline_options.ocr_options = RapidOcrOptions()
elif ocr_engine == "easyocr":
from docling.datamodel.pipeline_options import EasyOcrOptions
pipeline_options.ocr_options = EasyOcrOptions()
if hasattr(pipeline_options.ocr_options, "lang"):
pipeline_options.ocr_options.lang = self.config["ocr_languages"]
elif ocr_engine == "tesseract":
from docling.datamodel.pipeline_options import TesseractOcrOptions
pipeline_options.ocr_options = TesseractOcrOptions()
elif ocr_engine == "ocrmac":
from docling.datamodel.pipeline_options import OcrMacOptions
pipeline_options.ocr_options = OcrMacOptions()
else:
# Fallback to base OcrOptions
from docling.datamodel.pipeline_options import OcrOptions
pipeline_options.ocr_options = OcrOptions()
else:
pipeline_options.do_ocr = False
# Configure table processing using new v2 API
pipeline_options.do_table_structure = self.config["table_extraction_enabled"]
if pipeline_options.do_table_structure:
# Set table mode based on config
if self.config["table_processing_mode"].upper() == "ACCURATE":
from docling.datamodel.pipeline_options import TableFormerMode
pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE
# Configure enrichment features using new v2 API
pipeline_options.do_formula_enrichment = (
self.config["formula_enrichment"] and self.available_features["formula_enrichment"]
)
pipeline_options.do_picture_description = (
self.config["picture_description"] and self.available_features["picture_description"]
)
pipeline_options.do_code_enrichment = (
self.config["code_understanding"] and self.available_features["code_understanding"]
)
# Apply resource limits and timeouts
pipeline_options.document_timeout = self.config["processing_timeout"]
# Apply additional resource limits
pipeline_options = self._apply_resource_limits(pipeline_options)
return pipeline_options
except Exception as e:
logger.warning(f"Failed to configure pipeline options: {e}, using defaults")
return PdfPipelineOptions()
def _apply_resource_limits(self, pipeline_options):
"""Apply resource limits to prevent excessive resource usage.
Args:
pipeline_options: PdfPipelineOptions instance to modify.
Returns:
Modified pipeline_options with resource limits applied.
"""
# Limit maximum pages processed
if self.config.get("max_pages"):
if hasattr(pipeline_options, "page_range"):
pipeline_options.page_range = (1, self.config["max_pages"])
# Configure device usage
device = self.config.get("device_selection", "auto")
if device != "auto" and hasattr(pipeline_options, "device"):
pipeline_options.device = device
# Memory limits for table processing
if (
self.config["table_processing_mode"] == "FAST"
and hasattr(pipeline_options, "table_options")
and hasattr(pipeline_options.table_options, "max_table_size")
):
pipeline_options.table_options.max_table_size = 1000000 # 1MB limit
return pipeline_options
def _create_safe_converter(self, DocumentConverter, PdfFormatOption, InputFormat, pipeline_options):
"""Create DocumentConverter with security restrictions using Docling v2 API.
Args:
DocumentConverter: The DocumentConverter class from docling.
PdfFormatOption: The PdfFormatOption class from docling.
InputFormat: The InputFormat enum from docling.
pipeline_options: Configured PdfPipelineOptions instance.
Returns:
Configured DocumentConverter instance.
"""
try:
# Create converter with format options (new v2 API)
converter = DocumentConverter(
format_options={InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)}
)
return converter
except Exception as e:
logger.warning(f"Failed to configure converter: {e}, using defaults")
# Fallback to basic converter
return DocumentConverter()
def _optimize_memory_usage(self, converter):
"""Configure converter for optimal memory usage.
Args:
converter: DocumentConverter instance to optimize.
Returns:
Optimized converter instance.
"""
try:
# Configure batch processing for large documents
if self.config.get("max_pages") and hasattr(converter, "config"):
converter.config.max_pages = self.config["max_pages"]
# Configure memory-efficient table processing
if self.config["table_processing_mode"] == "FAST" and hasattr(converter, "config"):
# Use faster, less memory-intensive processing
pass
except Exception as e:
logger.debug(f"Memory optimization failed: {e}")
return converter
async def _convert_with_timeout(self, converter, file_path: Path):
"""Convert document with timeout handling using Docling v2 API.
Args:
converter: DocumentConverter instance.
file_path: Path to the PDF file.
Returns:
ConversionResult from docling.
Raises:
RuntimeError: If conversion times out or fails.
"""
timeout = self.config["processing_timeout"]
try:
# Create a coroutine wrapper for the synchronous conversion
async def convert_async():
# Use new v2 API - no pipeline_options parameter
return converter.convert(str(file_path))
# Apply timeout
return await asyncio.wait_for(convert_async(), timeout=timeout)
except asyncio.TimeoutError:
raise RuntimeError(f"Docling processing timed out after {timeout} seconds")
except Exception as e:
raise RuntimeError(f"Docling conversion failed: {e}") from e
def _handle_conversion_errors(self, conversion_result):
"""Handle partial conversion failures and log detailed error information.
Args:
conversion_result: ConversionResult from docling.
Raises:
RuntimeError: If conversion failed completely.
"""
try:
# Import ConversionStatus enum
from docling.datamodel.base_models import ConversionStatus
if conversion_result.status == ConversionStatus.FAILURE:
error_details = []
# Collect error information from result
if hasattr(conversion_result, "errors") and conversion_result.errors:
error_details.extend([str(error) for error in conversion_result.errors])
error_msg = (
f"Docling conversion failed: {'; '.join(error_details) if error_details else 'Unknown error'}"
)
raise RuntimeError(error_msg)
elif conversion_result.status == ConversionStatus.PARTIAL_SUCCESS:
warning_details = []
if hasattr(conversion_result, "warnings") and conversion_result.warnings:
warning_details.extend([str(warning) for warning in conversion_result.warnings])
logger.warning(f"Docling conversion partially successful: {'; '.join(warning_details)}")
except ImportError:
# If we can't import ConversionStatus, do basic checks
if not conversion_result or not hasattr(conversion_result, "document"):
raise RuntimeError("Docling conversion failed: Invalid result")
if not conversion_result.document:
raise RuntimeError("Docling conversion failed: No document produced")
def _extract_metadata_from_result(self, conversion_result, file_path: Path) -> Dict[str, Any]:
"""Extract comprehensive metadata from Docling's ConversionResult.
Args:
conversion_result: ConversionResult from docling.
file_path: Path to the original PDF file.
Returns:
Dictionary of extracted metadata.
"""
metadata = {}
try:
document = conversion_result.document
# Basic document information
metadata["page_count"] = len(document.pages) if document.pages else 1
# Count different element types
element_counts = {}
table_count = 0
image_count = 0
formula_count = 0
for page in document.pages:
for element in page.elements:
element_type = type(element).__name__
element_counts[element_type] = element_counts.get(element_type, 0) + 1
# Count specific elements of interest
if "table" in element_type.lower():
table_count += 1
elif "image" in element_type.lower() or "picture" in element_type.lower():
image_count += 1
elif "formula" in element_type.lower() or "equation" in element_type.lower():
formula_count += 1
metadata.update(
{
"element_types": element_counts,
"total_elements": sum(element_counts.values()),
"table_count": table_count,
"image_count": image_count,
"formula_count": formula_count,
}
)
# Document-level metadata if available
if hasattr(document, "metadata") and document.metadata:
doc_metadata = document.metadata
for key, value in doc_metadata.items():
if key not in metadata: # Don't override our counts
metadata[f"doc_{key}"] = value
# Processing statistics
if hasattr(conversion_result, "processing_stats"):
metadata["processing_stats"] = conversion_result.processing_stats
except Exception as e:
logger.warning(f"Failed to extract detailed metadata from Docling result: {e}")
# Provide minimal fallback metadata
metadata.update(
{
"page_count": 1,
"element_types": {},
"total_elements": 0,
"table_count": 0,
"image_count": 0,
"formula_count": 0,
}
)
return metadata
def _get_features_used(self) -> Dict[str, bool]:
"""Get a summary of which features were actually used during processing.
Returns:
Dictionary mapping feature names to usage status.
"""
return {
"ocr_enabled": self.available_features["ocr"] and self.config["ocr_enabled"],
"table_extraction": self.config["table_extraction_enabled"],
"formula_enrichment": (self.config["formula_enrichment"] and self.available_features["formula_enrichment"]),
"picture_description": (
self.config["picture_description"] and self.available_features["picture_description"]
),
"code_understanding": (self.config["code_understanding"] and self.available_features["code_understanding"]),
"ocr_engine": self.config["ocr_engine"] if self.available_features["ocr"] else None,
"table_mode": self.config["table_processing_mode"],
}
def _validate_input_file(self, file_path: Path) -> None:
"""Validate input file for security and basic requirements.
Args:
file_path: Path to the file to validate.
Raises:
RuntimeError: If file validation fails.
"""
# Check file exists
if not file_path.exists():
raise RuntimeError(f"File not found: {file_path}")
# Check file size limits
file_size = file_path.stat().st_size
max_size = self.config["max_file_size"]
if file_size > max_size:
raise RuntimeError(f"File too large: {file_size} bytes (max: {max_size})")
if file_size == 0:
raise RuntimeError("File is empty")
# Validate file extension
if file_path.suffix.lower() != ".pdf":
raise RuntimeError(f"Invalid file type: {file_path.suffix}")
# Basic PDF header validation
try:
with open(file_path, "rb") as f:
header = f.read(8)
if not header.startswith(b"%PDF-"):
# Allow non-PDF files for testing, but log a warning
logger.warning(f"File does not have PDF header: {file_path}")
except Exception as e:
raise RuntimeError(f"Cannot read file: {e}")