Skip to main content
Glama
juanqui
by juanqui
parser_mineru.py15 kB
"""PDF parser using MinerU CLI tool.""" import json import logging import subprocess import tempfile from pathlib import Path from typing import Any, Dict, List, Optional from .parser import DocumentParser, PageContent, ParseResult logger = logging.getLogger(__name__) class MinerUPDFParser(DocumentParser): """PDF parser using MinerU CLI tool.""" def __init__(self, config: Optional[Dict[str, Any]] = None, cache_dir: Optional[Path] = None): """Initialize the MinerU parser. Args: config: Configuration options for MinerU CLI. cache_dir: Directory to cache parsed markdown files. """ super().__init__(cache_dir) # Default configuration with basic settings self.default_config = { "backend": "pipeline", "method": "auto", "lang": "en", "formula": True, "table": True, "vram": 16, } # Merge user config with defaults self.config = {**self.default_config, **(config or {})} # Check if mineru is available self._check_mineru_availability() def _check_mineru_availability(self) -> None: """Check if mineru CLI tool is available.""" try: result = subprocess.run(["mineru", "--version"], capture_output=True, text=True, timeout=10) if result.returncode == 0: logger.debug(f"MinerU CLI available: {result.stdout.strip()}") else: raise RuntimeError("MinerU CLI returned non-zero exit code") except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError) as e: raise ImportError( "MinerU CLI not available. Please ensure MinerU is installed and accessible via 'mineru' command." ) from e async def parse(self, file_path: Path) -> ParseResult: """Parse a PDF file using MinerU CLI. Args: file_path: Path to the PDF file. Returns: ParseResult with markdown content and metadata. """ try: # Check cache first cache_path = None if self.cache_dir: cache_path = self._get_cache_path(file_path) if self._is_cache_valid(file_path, cache_path): logger.debug(f"Loading parsed content from cache: {cache_path}") markdown_content = self._load_from_cache(cache_path) metadata = self._load_metadata_from_cache(cache_path) if markdown_content is not None: # Create page-aware result # TODO: Implement proper page extraction for this parser pages = [PageContent(page_number=1, markdown_content=markdown_content, metadata={})] return ParseResult(pages=pages, metadata=metadata) logger.debug(f"Parsing PDF with MinerU CLI: {file_path}") # Create temporary directory for MinerU output with tempfile.TemporaryDirectory() as temp_dir: temp_output_dir = Path(temp_dir) / "mineru_output" temp_output_dir.mkdir() # Execute MinerU CLI markdown_content, metadata = self._execute_mineru(file_path, temp_output_dir) # Add processing information metadata["processing_timestamp"] = "N/A" # Will be set by PDFProcessor metadata["processor_version"] = "mineru" metadata["source_filename"] = file_path.name metadata["source_directory"] = str(file_path.parent) # Save to cache if enabled if cache_path: logger.debug(f"Saving parsed content to cache: {cache_path}") self._save_to_cache(cache_path, markdown_content) self._save_metadata_to_cache(cache_path, metadata) logger.debug("Successfully extracted content from PDF using MinerU") # Create page-aware result # TODO: Implement proper page extraction for this parser pages = [PageContent(page_number=1, markdown_content=markdown_content, metadata={})] return ParseResult(pages=pages, metadata=metadata) except Exception as e: raise RuntimeError(f"Failed to parse PDF with MinerU: {e}") from e def _execute_mineru(self, file_path: Path, output_dir: Path) -> tuple[str, Dict[str, Any]]: """Execute MinerU CLI and process results. Args: file_path: Path to the PDF file. output_dir: Directory for MinerU output. Returns: Tuple of (markdown_content, metadata). """ # Build MinerU command cmd = self._build_mineru_command(file_path, output_dir) logger.info(f"Executing MinerU command: {' '.join(cmd)}") # Execute command without timeout try: # Use Popen for real-time output capture process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combine stderr with stdout text=True, cwd=Path.cwd(), bufsize=1, universal_newlines=True, ) # Capture and log output in real-time output_lines = [] # Read output line by line for real-time logging if process.stdout: for line in iter(process.stdout.readline, ""): if line: output_lines.append(line) logger.info(f"MinerU: {line.rstrip()}") # Wait for process to complete and get return code return_code = process.wait() # Check return code if return_code != 0: error_msg = f"MinerU CLI failed with exit code {return_code}" if output_lines: error_msg += f": {''.join(output_lines[-10:])}" # Last 10 lines of output raise RuntimeError(error_msg) logger.info("MinerU CLI execution completed successfully") except Exception as e: raise RuntimeError(f"MinerU CLI execution failed: {e}") # Process output files return self._process_mineru_output(file_path, output_dir) def _build_mineru_command(self, file_path: Path, output_dir: Path) -> List[str]: """Build MinerU CLI command with configuration. Args: file_path: Path to the PDF file. output_dir: Directory for MinerU output. Returns: List of command arguments. """ cmd = ["mineru", "-p", str(file_path), "-o", str(output_dir)] # Add configuration options if "backend" in self.config: cmd.extend(["-b", self.config["backend"]]) if "method" in self.config: cmd.extend(["-m", self.config["method"]]) if "lang" in self.config: cmd.extend(["-l", self.config["lang"]]) if "formula" in self.config: cmd.extend(["-f", str(self.config["formula"]).lower()]) if "table" in self.config: cmd.extend(["-t", str(self.config["table"]).lower()]) # Add other optional parameters for param in ["start", "end", "device", "vram", "url", "source"]: if param in self.config: flag = f"-{param[0]}" if param in ["start", "end", "device"] else f"--{param}" cmd.extend([flag, str(self.config[param])]) return cmd def _process_mineru_output(self, file_path: Path, output_dir: Path) -> tuple[str, Dict[str, Any]]: """Process MinerU output files and extract content and metadata. Args: file_path: Original PDF file path. output_dir: MinerU output directory. Returns: Tuple of (markdown_content, metadata). """ # Find markdown output file markdown_content = self._find_and_read_markdown(file_path, output_dir) # Extract metadata from JSON files metadata = self._extract_metadata_from_json_files(file_path, output_dir) return markdown_content, metadata def _find_and_read_markdown(self, file_path: Path, output_dir: Path) -> str: """Find and read the markdown output file from MinerU. Args: file_path: Original PDF file path. output_dir: MinerU output directory. Returns: Markdown content as string. """ # MinerU typically creates output with the same name as input but .md extension expected_md_name = file_path.stem + ".md" # Look for markdown files in output directory (recursively to handle MinerU's nested structure) markdown_files = list(output_dir.glob("**/*.md")) if not markdown_files: raise RuntimeError(f"No markdown output found in {output_dir}") # Prefer file with matching name, otherwise use first found target_file = None for md_file in markdown_files: if md_file.name == expected_md_name: target_file = md_file break if not target_file: target_file = markdown_files[0] logger.warning(f"Expected {expected_md_name} but using {target_file.name}") try: with open(target_file, "r", encoding="utf-8") as f: content = f.read() if not content.strip(): raise RuntimeError("Markdown output file is empty") logger.debug(f"Read {len(content)} characters from {target_file.name}") return content except Exception as e: raise RuntimeError(f"Failed to read markdown output {target_file}: {e}") from e def _extract_metadata_from_json_files(self, file_path: Path, output_dir: Path) -> Dict[str, Any]: """Extract metadata from MinerU JSON output files. Args: file_path: Original PDF file path. output_dir: MinerU output directory. Returns: Dictionary of extracted metadata. """ metadata = {} # Look for middle.json and model.json files json_files = list(output_dir.glob("*.json")) for json_file in json_files: try: with open(json_file, "r", encoding="utf-8") as f: json_data = json.load(f) if "middle" in json_file.name: # Extract metadata from middle.json middle_metadata = self._extract_from_middle_json(json_data) metadata.update(middle_metadata) elif "model" in json_file.name: # Extract metadata from model.json model_metadata = self._extract_from_model_json(json_data) metadata.update(model_metadata) logger.debug(f"Extracted metadata from {json_file.name}") except Exception as e: logger.warning(f"Failed to extract metadata from {json_file}: {e}") # Add basic fallback metadata if no JSON files were processed if not metadata: metadata = { "page_count": 1, # Default fallback "element_types": {}, "total_elements": 0, } return metadata def _extract_from_middle_json(self, json_data: Dict[str, Any]) -> Dict[str, Any]: """Extract metadata from middle.json file. Args: json_data: Parsed JSON data from middle.json. Returns: Dictionary of extracted metadata. """ metadata = {} # Extract parse type and version if "_parse_type" in json_data: metadata["parse_type"] = json_data["_parse_type"] if "_version_name" in json_data: metadata["mineru_version"] = json_data["_version_name"] # Extract page information pdf_info = json_data.get("pdf_info", []) if pdf_info: metadata["page_count"] = len(pdf_info) # Count elements and types total_elements = 0 element_types = {} for page in pdf_info: # Count para_blocks para_blocks = page.get("para_blocks", []) total_elements += len(para_blocks) # Count block types for block in para_blocks: block_type = block.get("type", "unknown") element_types[block_type] = element_types.get(block_type, 0) + 1 # Count images and tables images = page.get("images", []) tables = page.get("tables", []) equations = page.get("interline_equations", []) total_elements += len(images) + len(tables) + len(equations) if images: element_types["image"] = element_types.get("image", 0) + len(images) if tables: element_types["table"] = element_types.get("table", 0) + len(tables) if equations: element_types["equation"] = element_types.get("equation", 0) + len(equations) metadata["total_elements"] = total_elements metadata["element_types"] = element_types return metadata def _extract_from_model_json(self, json_data: Dict[str, Any]) -> Dict[str, Any]: """Extract metadata from model.json file. Args: json_data: Parsed JSON data from model.json. Returns: Dictionary of extracted metadata. """ metadata = {} # Extract inference results information if isinstance(json_data, list): metadata["inference_pages"] = len(json_data) # Count layout detection results total_detections = 0 category_counts = {} for page_result in json_data: layout_dets = page_result.get("layout_dets", []) total_detections += len(layout_dets) for detection in layout_dets: category_id = detection.get("category_id") if category_id is not None: category_counts[category_id] = category_counts.get(category_id, 0) + 1 # Extract page info page_info = page_result.get("page_info", {}) if page_info and "page_count" not in metadata: # Use the highest page number + 1 as page count page_no = page_info.get("page_no", 0) metadata["page_count"] = max(metadata.get("page_count", 0), page_no + 1) metadata["total_detections"] = total_detections metadata["category_counts"] = category_counts return metadata

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/juanqui/pdfkb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server