Skip to main content
Glama
pdf_processor.py25.6 kB
"""PDF processing module for extracting text and converting to Markdown.""" import logging import tempfile import os import re from typing import Dict, List, Any, Optional from pathlib import Path from urllib.parse import urlparse import asyncio import aiohttp from .enhanced_pdf_processor import EnhancedPDFProcessor # 延迟导入 PDF 处理库,避免启动时的 SWIG 警告 def _import_fitz(): """延迟导入 PyMuPDF (fitz) 以避免启动时警告""" try: import fitz return fitz except ImportError as e: raise ImportError(f"PyMuPDF (fitz) is required for PDF processing: {e}") def _import_pypdf(): """延迟导入 pypdf""" try: import pypdf return pypdf except ImportError as e: raise ImportError(f"pypdf is required for PDF processing: {e}") logger = logging.getLogger(__name__) class PDFProcessor: """PDF processor for extracting text and converting to Markdown.""" def __init__( self, enable_enhanced_features: bool = True, output_dir: Optional[str] = None ): """ Initialize the PDF processor. Args: enable_enhanced_features: Whether to enable enhanced extraction features output_dir: Directory to save extracted images and assets """ self.supported_methods = ["pymupdf", "pypdf", "auto"] self.temp_dir = tempfile.mkdtemp(prefix="pdf_extractor_") self.enable_enhanced_features = enable_enhanced_features # Initialize enhanced processor for images, tables, and formulas if self.enable_enhanced_features: self.enhanced_processor = EnhancedPDFProcessor(output_dir) else: self.enhanced_processor = None async def process_pdf( self, pdf_source: str, method: str = "auto", include_metadata: bool = True, page_range: Optional[tuple] = None, output_format: str = "markdown", *, extract_images: bool = True, extract_tables: bool = True, extract_formulas: bool = True, embed_images: bool = False, enhanced_options: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Process a PDF file from URL or local path. Args: pdf_source: URL or local file path to PDF method: Extraction method: auto, pymupdf, pypdf (default: auto) include_metadata: Include PDF metadata in result (default: True) page_range: Tuple of (start_page, end_page) for partial extraction (optional) output_format: Output format: markdown, text (default: markdown) extract_images: Whether to extract images (default: True) extract_tables: Whether to extract tables (default: True) extract_formulas: Whether to extract mathematical formulas (default: True) embed_images: Whether to embed images as base64 in markdown (default: False) enhanced_options: Additional options for enhanced processing (optional) Returns: Dict containing extracted text/markdown and metadata, including enhanced assets """ pdf_path = None try: # Validate method if method not in self.supported_methods: return { "success": False, "error": f"Method must be one of: {', '.join(self.supported_methods)}", "source": pdf_source, } # Check if source is URL or local path if self._is_url(pdf_source): pdf_path = await self._download_pdf(pdf_source) if not pdf_path: return { "success": False, "error": "Failed to download PDF from URL", "source": pdf_source, } else: pdf_path = Path(pdf_source) if not pdf_path.exists(): return { "success": False, "error": "PDF file does not exist", "source": pdf_source, } # Extract text using selected method extraction_result = None if method == "auto": extraction_result = await self._auto_extract( pdf_path, page_range, include_metadata ) elif method == "pymupdf": extraction_result = await self._extract_with_pymupdf( pdf_path, page_range, include_metadata ) elif method == "pypdf": extraction_result = await self._extract_with_pypdf( pdf_path, page_range, include_metadata ) if not extraction_result or not extraction_result.get("success"): return extraction_result or { "success": False, "error": "Unknown extraction error", "source": pdf_source, } # Enhanced processing for images, tables, and formulas enhanced_assets = None if self.enable_enhanced_features and self.enhanced_processor: enhanced_assets = await self._extract_enhanced_assets( pdf_path, page_range, extract_images, extract_tables, extract_formulas, ) # Convert to markdown if requested if output_format == "markdown": markdown_content = self._convert_to_markdown(extraction_result["text"]) # Enhance markdown with extracted assets if enhanced_assets: enhanced_options = enhanced_options or {} embed_images_setting = enhanced_options.get( "embed_images", embed_images ) image_size = enhanced_options.get("image_size") markdown_content = ( self.enhanced_processor.enhance_markdown_with_assets( markdown_content, embed_images=embed_images_setting, image_size=image_size, ) ) # Add enhanced assets summary to result extraction_result["enhanced_assets"] = ( self.enhanced_processor.get_extraction_summary() ) extraction_result["markdown"] = markdown_content # Add processing info extraction_result.update( { "source": pdf_source, "method_used": extraction_result.get("method_used", method), "output_format": output_format, "pages_processed": extraction_result.get("pages_processed", 0), "word_count": len(extraction_result["text"].split()), "character_count": len(extraction_result["text"]), } ) return extraction_result except Exception as e: logger.error(f"Error processing PDF {pdf_source}: {str(e)}") return {"success": False, "error": str(e), "source": pdf_source} finally: # Clean up downloaded files if they're in temp directory if pdf_path and str(pdf_path).startswith(self.temp_dir): try: os.unlink(pdf_path) except (FileNotFoundError, PermissionError, OSError): # Ignore cleanup errors - file might already be deleted or inaccessible pass async def batch_process_pdfs( self, pdf_sources: List[str], method: str = "auto", include_metadata: bool = True, page_range: Optional[tuple] = None, output_format: str = "markdown", extract_images: bool = True, extract_tables: bool = True, extract_formulas: bool = True, embed_images: bool = False, enhanced_options: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Process multiple PDF files concurrently. Args: pdf_sources: List of URLs or local file paths method: Extraction method for all PDFs include_metadata: Include metadata for all PDFs page_range: Page range for all PDFs (if applicable) output_format: Output format for all PDFs extract_images: Extract images from all PDFs extract_tables: Extract tables from all PDFs extract_formulas: Extract formulas from all PDFs embed_images: Embed images as base64 instead of saving as files enhanced_options: Enhanced processing options for all PDFs Returns: Dict containing batch processing results and summary """ if not pdf_sources: return {"success": False, "error": "PDF sources list cannot be empty"} logger.info(f"Batch processing {len(pdf_sources)} PDFs with method: {method}") # Process PDFs concurrently tasks = [ self.process_pdf( pdf_source=source, method=method, include_metadata=include_metadata, page_range=page_range, output_format=output_format, extract_images=extract_images, extract_tables=extract_tables, extract_formulas=extract_formulas, embed_images=embed_images, enhanced_options=enhanced_options, ) for source in pdf_sources ] results = await asyncio.gather(*tasks, return_exceptions=True) # Process results and handle exceptions processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): processed_results.append( {"success": False, "error": str(result), "source": pdf_sources[i]} ) else: processed_results.append(result) # Calculate summary statistics successful_results = [r for r in processed_results if r.get("success")] failed_results = [r for r in processed_results if not r.get("success")] total_pages = sum(r.get("pages_processed", 0) for r in successful_results) total_words = sum(r.get("word_count", 0) for r in successful_results) return { "success": True, "results": processed_results, "summary": { "total_pdfs": len(pdf_sources), "successful": len(successful_results), "failed": len(failed_results), "total_pages_processed": total_pages, "total_words_extracted": total_words, "method_used": method, "output_format": output_format, }, } def _is_url(self, source: str) -> bool: """Check if source is a URL.""" try: parsed = urlparse(source) return parsed.scheme in ["http", "https"] except Exception: return False async def _download_pdf(self, url: str) -> Optional[Path]: """Download PDF from URL to temporary file.""" try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: # Create temporary file temp_file = tempfile.NamedTemporaryFile( suffix=".pdf", dir=self.temp_dir, delete=False ) # Write PDF content content = await response.read() temp_file.write(content) temp_file.close() return Path(temp_file.name) return None except Exception as e: logger.error(f"Error downloading PDF from {url}: {str(e)}") return None async def _auto_extract( self, pdf_path: Path, page_range: Optional[tuple] = None, include_metadata: bool = True, ) -> Dict[str, Any]: """Auto-select best method for PDF extraction.""" # Try PyMuPDF first (generally more reliable) try: result = await self._extract_with_pymupdf( pdf_path, page_range, include_metadata ) if result.get("success"): result["method_used"] = "pymupdf" return result except Exception as e: logger.warning(f"PyMuPDF failed for {pdf_path}, trying pypdf: {str(e)}") # Fall back to pypdf try: result = await self._extract_with_pypdf( pdf_path, page_range, include_metadata ) if result.get("success"): result["method_used"] = "pypdf" return result except Exception as e: logger.error(f"Both methods failed for {pdf_path}: {str(e)}") return { "success": False, "error": "Both PyMuPDF and pypdf extraction methods failed", } async def _extract_with_pymupdf( self, pdf_path: Path, page_range: Optional[tuple] = None, include_metadata: bool = True, ) -> Dict[str, Any]: """Extract text using PyMuPDF (fitz).""" try: fitz = _import_fitz() doc = fitz.open(str(pdf_path)) # Determine page range total_pages = doc.page_count start_page = 0 end_page = total_pages if page_range: start_page = max(0, page_range[0]) end_page = min(total_pages, page_range[1]) # Extract text from pages text_content = [] for page_num in range(start_page, end_page): page = doc.load_page(page_num) text = page.get_text() if text.strip(): # Only add non-empty pages text_content.append(f"<!-- Page {page_num + 1} -->\n{text}") full_text = "\n\n".join(text_content) result = { "success": True, "text": full_text, "pages_processed": end_page - start_page, "total_pages": total_pages, } # Add metadata if requested if include_metadata: metadata = doc.metadata result["metadata"] = { "title": metadata.get("title", ""), "author": metadata.get("author", ""), "subject": metadata.get("subject", ""), "creator": metadata.get("creator", ""), "producer": metadata.get("producer", ""), "creation_date": metadata.get("creationDate", ""), "modification_date": metadata.get("modDate", ""), "total_pages": total_pages, "file_size_bytes": pdf_path.stat().st_size, } doc.close() return result except Exception as e: return {"success": False, "error": f"PyMuPDF extraction failed: {str(e)}"} async def _extract_with_pypdf( self, pdf_path: Path, page_range: Optional[tuple] = None, include_metadata: bool = True, ) -> Dict[str, Any]: """Extract text using pypdf library.""" try: with open(pdf_path, "rb") as file: pypdf = _import_pypdf() reader = pypdf.PdfReader(file) total_pages = len(reader.pages) # Determine page range start_page = 0 end_page = total_pages if page_range: start_page = max(0, page_range[0]) end_page = min(total_pages, page_range[1]) # Extract text from pages text_content = [] for page_num in range(start_page, end_page): page = reader.pages[page_num] text = page.extract_text() if text.strip(): # Only add non-empty pages text_content.append(f"<!-- Page {page_num + 1} -->\n{text}") full_text = "\n\n".join(text_content) result = { "success": True, "text": full_text, "pages_processed": end_page - start_page, "total_pages": total_pages, } # Add metadata if requested if include_metadata: metadata = reader.metadata or {} result["metadata"] = { "title": str(metadata.get("/Title", "")), "author": str(metadata.get("/Author", "")), "subject": str(metadata.get("/Subject", "")), "creator": str(metadata.get("/Creator", "")), "producer": str(metadata.get("/Producer", "")), "creation_date": str(metadata.get("/CreationDate", "")), "modification_date": str(metadata.get("/ModDate", "")), "total_pages": total_pages, "file_size_bytes": pdf_path.stat().st_size, } return result except Exception as e: return {"success": False, "error": f"pypdf extraction failed: {str(e)}"} def _convert_to_markdown(self, text: str) -> str: """Convert extracted text to Markdown format using MarkItDown.""" try: # Try to use the new MarkdownConverter for better formatting from .markdown_converter import MarkdownConverter converter = MarkdownConverter() # Create a simple HTML structure from the text for better conversion html_content = f"<html><body><div>{text}</div></body></html>" # Use MarkItDown through the converter result = converter.html_to_markdown(html_content) # Check if the result has proper markdown formatting (headers, structure) # If not, fall back to our simple conversion which is better for PDFs if not self._has_markdown_structure(result): logger.info( "MarkdownConverter didn't add structure, using simple conversion" ) return self._simple_markdown_conversion(text) return result except Exception as e: logger.warning( f"Failed to use MarkdownConverter, falling back to simple conversion: {str(e)}" ) # Fallback to the simple conversion method return self._simple_markdown_conversion(text) def _simple_markdown_conversion(self, text: str) -> str: """Simple fallback markdown conversion.""" # Clean up the text lines = text.split("\n") cleaned_lines = [] for line in lines: line = line.strip() if line: # Convert common patterns to Markdown if line.isupper() and len(line.split()) <= 5: # Potential heading cleaned_lines.append(f"# {line}") elif line.endswith(":") and len(line.split()) <= 8: # Potential subheading cleaned_lines.append(f"## {line}") elif self._looks_like_title(line): # Check if it looks like a title (capitalized, short) cleaned_lines.append(f"# {line}") else: cleaned_lines.append(line) else: cleaned_lines.append("") return "\n".join(cleaned_lines) def _looks_like_title(self, line: str) -> bool: """Check if a line looks like a title.""" # Title heuristics words = line.split() if len(words) > 8: # Too long to be a title return False # Check if most words are capitalized capitalized_count = sum(1 for word in words if word and word[0].isupper()) # If more than half the words are capitalized, it might be a title return capitalized_count > len(words) * 0.6 def _has_markdown_structure(self, text: str) -> bool: """Check if text has proper markdown structure (headers, formatting, etc.).""" # Check for common markdown structures has_headers = bool(re.search(r"^#{1,6}\s+", text, re.MULTILINE)) has_lists = bool(re.search(r"^[\s]*[-*+]\s+", text, re.MULTILINE)) has_bold = "**" in text or "__" in text has_italic = "*" in text or "_" in text has_links = "[" in text and "](" in text has_code = "`" in text # If it has any meaningful markdown structure, consider it good structure_count = sum( [has_headers, has_lists, has_bold, has_italic, has_links, has_code] ) # We especially want headers for PDF content return has_headers or structure_count >= 2 async def _extract_enhanced_assets( self, pdf_path: Path, page_range: Optional[tuple], extract_images: bool, extract_tables: bool, extract_formulas: bool, ) -> Dict[str, Any]: """ Extract enhanced assets (images, tables, formulas) from PDF. Args: pdf_path: Path to PDF file page_range: Optional page range tuple extract_images: Whether to extract images extract_tables: Whether to extract tables extract_formulas: Whether to extract formulas Returns: Dict with extraction results """ if not self.enhanced_processor: return {} try: # Open PDF document fitz = _import_fitz() doc = fitz.open(str(pdf_path)) # Determine page range start_page = 0 end_page = len(doc) if page_range: start_page = max(0, page_range[0]) end_page = min(len(doc), page_range[1]) extracted_assets = { "success": True, "pages_processed": end_page - start_page, } # Extract images if extract_images: for page_num in range(start_page, end_page): try: images = ( await self.enhanced_processor.extract_images_from_pdf_page( doc, page_num ) ) self.enhanced_processor.images.extend(images) except Exception as e: logger.warning( f"Failed to extract images from page {page_num}: {str(e)}" ) extracted_assets["images_extracted"] = len( self.enhanced_processor.images ) # Extract text for table and formula processing for page_num in range(start_page, end_page): try: page = doc[page_num] text = page.get_text() # Extract tables if extract_tables: tables = self.enhanced_processor.extract_tables_from_text( text, page_num ) self.enhanced_processor.tables.extend(tables) # Extract formulas if extract_formulas: formulas = self.enhanced_processor.extract_formulas_from_text( text, page_num ) self.enhanced_processor.formulas.extend(formulas) except Exception as e: logger.warning( f"Failed to process page {page_num} for tables/formulas: {str(e)}" ) # Add extraction summaries if extract_tables: extracted_assets["tables_extracted"] = len( self.enhanced_processor.tables ) if extract_formulas: extracted_assets["formulas_extracted"] = len( self.enhanced_processor.formulas ) doc.close() return extracted_assets except Exception as e: logger.error(f"Error in enhanced asset extraction: {str(e)}") return {"success": False, "error": str(e)} def cleanup(self): """Clean up temporary files and directories.""" try: import shutil # Clean up enhanced processor if self.enhanced_processor: self.enhanced_processor.cleanup() if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) except Exception as e: logger.warning(f"Failed to cleanup temp directory: {str(e)}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ThreeFish-AI/scrapy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server