"""PDF processing module for extracting text and converting to Markdown."""
import logging
import tempfile
import os
import re
from typing import Dict, List, Any, Optional
from pathlib import Path
from urllib.parse import urlparse
import asyncio
import aiohttp
from .enhanced_pdf_processor import EnhancedPDFProcessor
# 延迟导入 PDF 处理库,避免启动时的 SWIG 警告
def _import_fitz():
"""延迟导入 PyMuPDF (fitz) 以避免启动时警告"""
try:
import fitz
return fitz
except ImportError as e:
raise ImportError(f"PyMuPDF (fitz) is required for PDF processing: {e}")
def _import_pypdf():
"""延迟导入 pypdf"""
try:
import pypdf
return pypdf
except ImportError as e:
raise ImportError(f"pypdf is required for PDF processing: {e}")
logger = logging.getLogger(__name__)
class PDFProcessor:
"""PDF processor for extracting text and converting to Markdown."""
def __init__(
self, enable_enhanced_features: bool = True, output_dir: Optional[str] = None
):
"""
Initialize the PDF processor.
Args:
enable_enhanced_features: Whether to enable enhanced extraction features
output_dir: Directory to save extracted images and assets
"""
self.supported_methods = ["pymupdf", "pypdf", "auto"]
self.temp_dir = tempfile.mkdtemp(prefix="pdf_extractor_")
self.enable_enhanced_features = enable_enhanced_features
# Initialize enhanced processor for images, tables, and formulas
if self.enable_enhanced_features:
self.enhanced_processor = EnhancedPDFProcessor(output_dir)
else:
self.enhanced_processor = None
async def process_pdf(
self,
pdf_source: str,
method: str = "auto",
include_metadata: bool = True,
page_range: Optional[tuple] = None,
output_format: str = "markdown",
*,
extract_images: bool = True,
extract_tables: bool = True,
extract_formulas: bool = True,
embed_images: bool = False,
enhanced_options: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Process a PDF file from URL or local path.
Args:
pdf_source: URL or local file path to PDF
method: Extraction method: auto, pymupdf, pypdf (default: auto)
include_metadata: Include PDF metadata in result (default: True)
page_range: Tuple of (start_page, end_page) for partial extraction (optional)
output_format: Output format: markdown, text (default: markdown)
extract_images: Whether to extract images (default: True)
extract_tables: Whether to extract tables (default: True)
extract_formulas: Whether to extract mathematical formulas (default: True)
embed_images: Whether to embed images as base64 in markdown (default: False)
enhanced_options: Additional options for enhanced processing (optional)
Returns:
Dict containing extracted text/markdown and metadata, including enhanced assets
"""
pdf_path = None
try:
# Validate method
if method not in self.supported_methods:
return {
"success": False,
"error": f"Method must be one of: {', '.join(self.supported_methods)}",
"source": pdf_source,
}
# Check if source is URL or local path
if self._is_url(pdf_source):
pdf_path = await self._download_pdf(pdf_source)
if not pdf_path:
return {
"success": False,
"error": "Failed to download PDF from URL",
"source": pdf_source,
}
else:
pdf_path = Path(pdf_source)
if not pdf_path.exists():
return {
"success": False,
"error": "PDF file does not exist",
"source": pdf_source,
}
# Extract text using selected method
extraction_result = None
if method == "auto":
extraction_result = await self._auto_extract(
pdf_path, page_range, include_metadata
)
elif method == "pymupdf":
extraction_result = await self._extract_with_pymupdf(
pdf_path, page_range, include_metadata
)
elif method == "pypdf":
extraction_result = await self._extract_with_pypdf(
pdf_path, page_range, include_metadata
)
if not extraction_result or not extraction_result.get("success"):
return extraction_result or {
"success": False,
"error": "Unknown extraction error",
"source": pdf_source,
}
# Enhanced processing for images, tables, and formulas
enhanced_assets = None
if self.enable_enhanced_features and self.enhanced_processor:
enhanced_assets = await self._extract_enhanced_assets(
pdf_path,
page_range,
extract_images,
extract_tables,
extract_formulas,
)
# Convert to markdown if requested
if output_format == "markdown":
markdown_content = self._convert_to_markdown(extraction_result["text"])
# Enhance markdown with extracted assets
if enhanced_assets:
enhanced_options = enhanced_options or {}
embed_images_setting = enhanced_options.get(
"embed_images", embed_images
)
image_size = enhanced_options.get("image_size")
markdown_content = (
self.enhanced_processor.enhance_markdown_with_assets(
markdown_content,
embed_images=embed_images_setting,
image_size=image_size,
)
)
# Add enhanced assets summary to result
extraction_result["enhanced_assets"] = (
self.enhanced_processor.get_extraction_summary()
)
extraction_result["markdown"] = markdown_content
# Add processing info
extraction_result.update(
{
"source": pdf_source,
"method_used": extraction_result.get("method_used", method),
"output_format": output_format,
"pages_processed": extraction_result.get("pages_processed", 0),
"word_count": len(extraction_result["text"].split()),
"character_count": len(extraction_result["text"]),
}
)
return extraction_result
except Exception as e:
logger.error(f"Error processing PDF {pdf_source}: {str(e)}")
return {"success": False, "error": str(e), "source": pdf_source}
finally:
# Clean up downloaded files if they're in temp directory
if pdf_path and str(pdf_path).startswith(self.temp_dir):
try:
os.unlink(pdf_path)
except (FileNotFoundError, PermissionError, OSError):
# Ignore cleanup errors - file might already be deleted or inaccessible
pass
async def batch_process_pdfs(
self,
pdf_sources: List[str],
method: str = "auto",
include_metadata: bool = True,
page_range: Optional[tuple] = None,
output_format: str = "markdown",
extract_images: bool = True,
extract_tables: bool = True,
extract_formulas: bool = True,
embed_images: bool = False,
enhanced_options: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Process multiple PDF files concurrently.
Args:
pdf_sources: List of URLs or local file paths
method: Extraction method for all PDFs
include_metadata: Include metadata for all PDFs
page_range: Page range for all PDFs (if applicable)
output_format: Output format for all PDFs
extract_images: Extract images from all PDFs
extract_tables: Extract tables from all PDFs
extract_formulas: Extract formulas from all PDFs
embed_images: Embed images as base64 instead of saving as files
enhanced_options: Enhanced processing options for all PDFs
Returns:
Dict containing batch processing results and summary
"""
if not pdf_sources:
return {"success": False, "error": "PDF sources list cannot be empty"}
logger.info(f"Batch processing {len(pdf_sources)} PDFs with method: {method}")
# Process PDFs concurrently
tasks = [
self.process_pdf(
pdf_source=source,
method=method,
include_metadata=include_metadata,
page_range=page_range,
output_format=output_format,
extract_images=extract_images,
extract_tables=extract_tables,
extract_formulas=extract_formulas,
embed_images=embed_images,
enhanced_options=enhanced_options,
)
for source in pdf_sources
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results and handle exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(
{"success": False, "error": str(result), "source": pdf_sources[i]}
)
else:
processed_results.append(result)
# Calculate summary statistics
successful_results = [r for r in processed_results if r.get("success")]
failed_results = [r for r in processed_results if not r.get("success")]
total_pages = sum(r.get("pages_processed", 0) for r in successful_results)
total_words = sum(r.get("word_count", 0) for r in successful_results)
return {
"success": True,
"results": processed_results,
"summary": {
"total_pdfs": len(pdf_sources),
"successful": len(successful_results),
"failed": len(failed_results),
"total_pages_processed": total_pages,
"total_words_extracted": total_words,
"method_used": method,
"output_format": output_format,
},
}
def _is_url(self, source: str) -> bool:
"""Check if source is a URL."""
try:
parsed = urlparse(source)
return parsed.scheme in ["http", "https"]
except Exception:
return False
async def _download_pdf(self, url: str) -> Optional[Path]:
"""Download PDF from URL to temporary file."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
# Create temporary file
temp_file = tempfile.NamedTemporaryFile(
suffix=".pdf", dir=self.temp_dir, delete=False
)
# Write PDF content
content = await response.read()
temp_file.write(content)
temp_file.close()
return Path(temp_file.name)
return None
except Exception as e:
logger.error(f"Error downloading PDF from {url}: {str(e)}")
return None
async def _auto_extract(
self,
pdf_path: Path,
page_range: Optional[tuple] = None,
include_metadata: bool = True,
) -> Dict[str, Any]:
"""Auto-select best method for PDF extraction."""
# Try PyMuPDF first (generally more reliable)
try:
result = await self._extract_with_pymupdf(
pdf_path, page_range, include_metadata
)
if result.get("success"):
result["method_used"] = "pymupdf"
return result
except Exception as e:
logger.warning(f"PyMuPDF failed for {pdf_path}, trying pypdf: {str(e)}")
# Fall back to pypdf
try:
result = await self._extract_with_pypdf(
pdf_path, page_range, include_metadata
)
if result.get("success"):
result["method_used"] = "pypdf"
return result
except Exception as e:
logger.error(f"Both methods failed for {pdf_path}: {str(e)}")
return {
"success": False,
"error": "Both PyMuPDF and pypdf extraction methods failed",
}
async def _extract_with_pymupdf(
self,
pdf_path: Path,
page_range: Optional[tuple] = None,
include_metadata: bool = True,
) -> Dict[str, Any]:
"""Extract text using PyMuPDF (fitz)."""
try:
fitz = _import_fitz()
doc = fitz.open(str(pdf_path))
# Determine page range
total_pages = doc.page_count
start_page = 0
end_page = total_pages
if page_range:
start_page = max(0, page_range[0])
end_page = min(total_pages, page_range[1])
# Extract text from pages
text_content = []
for page_num in range(start_page, end_page):
page = doc.load_page(page_num)
text = page.get_text()
if text.strip(): # Only add non-empty pages
text_content.append(f"<!-- Page {page_num + 1} -->\n{text}")
full_text = "\n\n".join(text_content)
result = {
"success": True,
"text": full_text,
"pages_processed": end_page - start_page,
"total_pages": total_pages,
}
# Add metadata if requested
if include_metadata:
metadata = doc.metadata
result["metadata"] = {
"title": metadata.get("title", ""),
"author": metadata.get("author", ""),
"subject": metadata.get("subject", ""),
"creator": metadata.get("creator", ""),
"producer": metadata.get("producer", ""),
"creation_date": metadata.get("creationDate", ""),
"modification_date": metadata.get("modDate", ""),
"total_pages": total_pages,
"file_size_bytes": pdf_path.stat().st_size,
}
doc.close()
return result
except Exception as e:
return {"success": False, "error": f"PyMuPDF extraction failed: {str(e)}"}
async def _extract_with_pypdf(
self,
pdf_path: Path,
page_range: Optional[tuple] = None,
include_metadata: bool = True,
) -> Dict[str, Any]:
"""Extract text using pypdf library."""
try:
with open(pdf_path, "rb") as file:
pypdf = _import_pypdf()
reader = pypdf.PdfReader(file)
total_pages = len(reader.pages)
# Determine page range
start_page = 0
end_page = total_pages
if page_range:
start_page = max(0, page_range[0])
end_page = min(total_pages, page_range[1])
# Extract text from pages
text_content = []
for page_num in range(start_page, end_page):
page = reader.pages[page_num]
text = page.extract_text()
if text.strip(): # Only add non-empty pages
text_content.append(f"<!-- Page {page_num + 1} -->\n{text}")
full_text = "\n\n".join(text_content)
result = {
"success": True,
"text": full_text,
"pages_processed": end_page - start_page,
"total_pages": total_pages,
}
# Add metadata if requested
if include_metadata:
metadata = reader.metadata or {}
result["metadata"] = {
"title": str(metadata.get("/Title", "")),
"author": str(metadata.get("/Author", "")),
"subject": str(metadata.get("/Subject", "")),
"creator": str(metadata.get("/Creator", "")),
"producer": str(metadata.get("/Producer", "")),
"creation_date": str(metadata.get("/CreationDate", "")),
"modification_date": str(metadata.get("/ModDate", "")),
"total_pages": total_pages,
"file_size_bytes": pdf_path.stat().st_size,
}
return result
except Exception as e:
return {"success": False, "error": f"pypdf extraction failed: {str(e)}"}
def _convert_to_markdown(self, text: str) -> str:
"""Convert extracted text to Markdown format using MarkItDown."""
try:
# Try to use the new MarkdownConverter for better formatting
from .markdown_converter import MarkdownConverter
converter = MarkdownConverter()
# Create a simple HTML structure from the text for better conversion
html_content = f"<html><body><div>{text}</div></body></html>"
# Use MarkItDown through the converter
result = converter.html_to_markdown(html_content)
# Check if the result has proper markdown formatting (headers, structure)
# If not, fall back to our simple conversion which is better for PDFs
if not self._has_markdown_structure(result):
logger.info(
"MarkdownConverter didn't add structure, using simple conversion"
)
return self._simple_markdown_conversion(text)
return result
except Exception as e:
logger.warning(
f"Failed to use MarkdownConverter, falling back to simple conversion: {str(e)}"
)
# Fallback to the simple conversion method
return self._simple_markdown_conversion(text)
def _simple_markdown_conversion(self, text: str) -> str:
"""Simple fallback markdown conversion."""
# Clean up the text
lines = text.split("\n")
cleaned_lines = []
for line in lines:
line = line.strip()
if line:
# Convert common patterns to Markdown
if line.isupper() and len(line.split()) <= 5:
# Potential heading
cleaned_lines.append(f"# {line}")
elif line.endswith(":") and len(line.split()) <= 8:
# Potential subheading
cleaned_lines.append(f"## {line}")
elif self._looks_like_title(line):
# Check if it looks like a title (capitalized, short)
cleaned_lines.append(f"# {line}")
else:
cleaned_lines.append(line)
else:
cleaned_lines.append("")
return "\n".join(cleaned_lines)
def _looks_like_title(self, line: str) -> bool:
"""Check if a line looks like a title."""
# Title heuristics
words = line.split()
if len(words) > 8: # Too long to be a title
return False
# Check if most words are capitalized
capitalized_count = sum(1 for word in words if word and word[0].isupper())
# If more than half the words are capitalized, it might be a title
return capitalized_count > len(words) * 0.6
def _has_markdown_structure(self, text: str) -> bool:
"""Check if text has proper markdown structure (headers, formatting, etc.)."""
# Check for common markdown structures
has_headers = bool(re.search(r"^#{1,6}\s+", text, re.MULTILINE))
has_lists = bool(re.search(r"^[\s]*[-*+]\s+", text, re.MULTILINE))
has_bold = "**" in text or "__" in text
has_italic = "*" in text or "_" in text
has_links = "[" in text and "](" in text
has_code = "`" in text
# If it has any meaningful markdown structure, consider it good
structure_count = sum(
[has_headers, has_lists, has_bold, has_italic, has_links, has_code]
)
# We especially want headers for PDF content
return has_headers or structure_count >= 2
async def _extract_enhanced_assets(
self,
pdf_path: Path,
page_range: Optional[tuple],
extract_images: bool,
extract_tables: bool,
extract_formulas: bool,
) -> Dict[str, Any]:
"""
Extract enhanced assets (images, tables, formulas) from PDF.
Args:
pdf_path: Path to PDF file
page_range: Optional page range tuple
extract_images: Whether to extract images
extract_tables: Whether to extract tables
extract_formulas: Whether to extract formulas
Returns:
Dict with extraction results
"""
if not self.enhanced_processor:
return {}
try:
# Open PDF document
fitz = _import_fitz()
doc = fitz.open(str(pdf_path))
# Determine page range
start_page = 0
end_page = len(doc)
if page_range:
start_page = max(0, page_range[0])
end_page = min(len(doc), page_range[1])
extracted_assets = {
"success": True,
"pages_processed": end_page - start_page,
}
# Extract images
if extract_images:
for page_num in range(start_page, end_page):
try:
images = (
await self.enhanced_processor.extract_images_from_pdf_page(
doc, page_num
)
)
self.enhanced_processor.images.extend(images)
except Exception as e:
logger.warning(
f"Failed to extract images from page {page_num}: {str(e)}"
)
extracted_assets["images_extracted"] = len(
self.enhanced_processor.images
)
# Extract text for table and formula processing
for page_num in range(start_page, end_page):
try:
page = doc[page_num]
text = page.get_text()
# Extract tables
if extract_tables:
tables = self.enhanced_processor.extract_tables_from_text(
text, page_num
)
self.enhanced_processor.tables.extend(tables)
# Extract formulas
if extract_formulas:
formulas = self.enhanced_processor.extract_formulas_from_text(
text, page_num
)
self.enhanced_processor.formulas.extend(formulas)
except Exception as e:
logger.warning(
f"Failed to process page {page_num} for tables/formulas: {str(e)}"
)
# Add extraction summaries
if extract_tables:
extracted_assets["tables_extracted"] = len(
self.enhanced_processor.tables
)
if extract_formulas:
extracted_assets["formulas_extracted"] = len(
self.enhanced_processor.formulas
)
doc.close()
return extracted_assets
except Exception as e:
logger.error(f"Error in enhanced asset extraction: {str(e)}")
return {"success": False, "error": str(e)}
def cleanup(self):
"""Clean up temporary files and directories."""
try:
import shutil
# Clean up enhanced processor
if self.enhanced_processor:
self.enhanced_processor.cleanup()
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
except Exception as e:
logger.warning(f"Failed to cleanup temp directory: {str(e)}")