"""
Document Analysis Mixin - PDF metadata extraction and structure analysis
"""
import time
from pathlib import Path
from typing import Dict, Any, List
import logging
# PDF processing libraries
import fitz # PyMuPDF
from .base import MCPMixin, mcp_tool
from ..security import validate_pdf_path, sanitize_error_message
logger = logging.getLogger(__name__)
class DocumentAnalysisMixin(MCPMixin):
"""
Handles all PDF document analysis and metadata operations.
Tools provided:
- extract_metadata: Comprehensive metadata extraction
- get_document_structure: Document structure and outline analysis
- analyze_pdf_health: PDF health and quality analysis
"""
def get_mixin_name(self) -> str:
return "DocumentAnalysis"
def get_required_permissions(self) -> List[str]:
return ["read_files", "metadata_access"]
def _setup(self):
"""Initialize document analysis specific configuration"""
self.max_pages_analyze = 100 # Limit for detailed analysis
@mcp_tool(
name="extract_metadata",
description="Extract comprehensive PDF metadata"
)
async def extract_metadata(self, pdf_path: str) -> Dict[str, Any]:
"""
Extract comprehensive metadata from PDF.
Args:
pdf_path: Path to PDF file or URL
Returns:
Dictionary containing all available metadata
"""
try:
# Validate inputs using centralized security functions
path = await validate_pdf_path(pdf_path)
# Get file stats
file_stats = path.stat()
# PyMuPDF metadata
doc = fitz.open(str(path))
fitz_metadata = {
"title": doc.metadata.get("title", ""),
"author": doc.metadata.get("author", ""),
"subject": doc.metadata.get("subject", ""),
"keywords": doc.metadata.get("keywords", ""),
"creator": doc.metadata.get("creator", ""),
"producer": doc.metadata.get("producer", ""),
"creation_date": str(doc.metadata.get("creationDate", "")),
"modification_date": str(doc.metadata.get("modDate", "")),
"trapped": doc.metadata.get("trapped", ""),
}
# Document statistics
has_annotations = False
has_links = False
try:
for page in doc:
if hasattr(page, 'annots') and page.annots() is not None:
annots_list = list(page.annots())
if len(annots_list) > 0:
has_annotations = True
break
except Exception:
pass
try:
for page in doc:
if page.get_links():
has_links = True
break
except Exception:
pass
# Additional document properties
document_stats = {
"page_count": len(doc),
"file_size_bytes": file_stats.st_size,
"file_size_mb": round(file_stats.st_size / 1024 / 1024, 2),
"has_annotations": has_annotations,
"has_links": has_links,
"is_encrypted": doc.is_encrypted,
"needs_password": doc.needs_pass,
"pdf_version": getattr(doc, 'pdf_version', 'unknown'),
}
doc.close()
return {
"success": True,
"metadata": fitz_metadata,
"document_stats": document_stats,
"file_info": {
"path": str(path),
"name": path.name,
"extension": path.suffix,
"created": file_stats.st_ctime,
"modified": file_stats.st_mtime,
"size_bytes": file_stats.st_size
}
}
except Exception as e:
error_msg = sanitize_error_message(str(e))
logger.error(f"Metadata extraction failed: {error_msg}")
return {
"success": False,
"error": error_msg
}
@mcp_tool(
name="get_document_structure",
description="Extract document structure including headers, sections, and metadata"
)
async def get_document_structure(self, pdf_path: str) -> Dict[str, Any]:
"""
Extract document structure including headers, sections, and metadata.
Args:
pdf_path: Path to PDF file or URL
Returns:
Dictionary containing document structure information
"""
try:
# Validate inputs using centralized security functions
path = await validate_pdf_path(pdf_path)
doc = fitz.open(str(path))
structure = {
"metadata": {
"title": doc.metadata.get("title", ""),
"author": doc.metadata.get("author", ""),
"subject": doc.metadata.get("subject", ""),
"keywords": doc.metadata.get("keywords", ""),
"creator": doc.metadata.get("creator", ""),
"producer": doc.metadata.get("producer", ""),
"creation_date": str(doc.metadata.get("creationDate", "")),
"modification_date": str(doc.metadata.get("modDate", "")),
},
"pages": len(doc),
"outline": []
}
# Extract table of contents / bookmarks
toc = doc.get_toc()
for level, title, page in toc:
structure["outline"].append({
"level": level,
"title": title,
"page": page
})
# Extract page-level information (sample first few pages)
page_info = []
sample_pages = min(5, len(doc))
for i in range(sample_pages):
page = doc[i]
page_data = {
"page_number": i + 1,
"width": page.rect.width,
"height": page.rect.height,
"rotation": page.rotation,
"text_length": len(page.get_text()),
"image_count": len(page.get_images()),
"link_count": len(page.get_links())
}
page_info.append(page_data)
structure["page_samples"] = page_info
structure["total_pages_analyzed"] = sample_pages
doc.close()
return {
"success": True,
"structure": structure
}
except Exception as e:
error_msg = sanitize_error_message(str(e))
logger.error(f"Document structure extraction failed: {error_msg}")
return {
"success": False,
"error": error_msg
}
@mcp_tool(
name="analyze_pdf_health",
description="Comprehensive PDF health and quality analysis"
)
async def analyze_pdf_health(self, pdf_path: str) -> Dict[str, Any]:
"""
Analyze PDF health, quality, and potential issues.
Args:
pdf_path: Path to PDF file or URL
Returns:
Dictionary containing health analysis results
"""
start_time = time.time()
try:
# Validate inputs using centralized security functions
path = await validate_pdf_path(pdf_path)
doc = fitz.open(str(path))
health_report = {
"file_info": {
"path": str(path),
"size_bytes": path.stat().st_size,
"size_mb": round(path.stat().st_size / 1024 / 1024, 2)
},
"document_health": {},
"quality_metrics": {},
"optimization_suggestions": [],
"warnings": [],
"errors": []
}
# Basic document health
page_count = len(doc)
health_report["document_health"]["page_count"] = page_count
health_report["document_health"]["is_valid"] = page_count > 0
# Check for corruption by trying to access each page
corrupted_pages = []
total_text_length = 0
total_images = 0
for i, page in enumerate(doc):
try:
text = page.get_text()
total_text_length += len(text)
total_images += len(page.get_images())
except Exception as e:
corrupted_pages.append({"page": i + 1, "error": str(e)})
health_report["document_health"]["corrupted_pages"] = corrupted_pages
health_report["document_health"]["corruption_detected"] = len(corrupted_pages) > 0
# Quality metrics
health_report["quality_metrics"]["average_text_per_page"] = total_text_length / page_count if page_count > 0 else 0
health_report["quality_metrics"]["total_images"] = total_images
health_report["quality_metrics"]["images_per_page"] = total_images / page_count if page_count > 0 else 0
# Font analysis
fonts_used = set()
embedded_fonts = 0
for page in doc:
try:
for font_info in page.get_fonts():
font_name = font_info[3]
fonts_used.add(font_name)
if font_info[1] != "n/a": # Embedded font
embedded_fonts += 1
except Exception:
pass
health_report["quality_metrics"]["fonts_used"] = len(fonts_used)
health_report["quality_metrics"]["fonts_list"] = list(fonts_used)
health_report["quality_metrics"]["embedded_fonts"] = embedded_fonts
# Security and protection
health_report["document_health"]["is_encrypted"] = doc.is_encrypted
health_report["document_health"]["needs_password"] = doc.needs_pass
# Optimization suggestions
file_size_mb = health_report["file_info"]["size_mb"]
if file_size_mb > 10:
health_report["optimization_suggestions"].append(
"Large file size detected. Consider optimizing images or using compression."
)
if total_images > page_count * 5:
health_report["optimization_suggestions"].append(
"High image density detected. Consider image compression or resolution reduction."
)
if len(fonts_used) > 20:
health_report["optimization_suggestions"].append(
f"Many fonts in use ({len(fonts_used)}). Consider font subset embedding to reduce file size."
)
if embedded_fonts < len(fonts_used) / 2:
health_report["warnings"].append(
"Many non-embedded fonts detected. Document may not display correctly on other systems."
)
# Calculate overall health score
health_score = 100
if len(corrupted_pages) > 0:
health_score -= 30
if file_size_mb > 20:
health_score -= 10
if not health_report["document_health"]["is_valid"]:
health_score -= 50
if embedded_fonts < len(fonts_used) / 2:
health_score -= 5
health_report["overall_health_score"] = max(0, health_score)
health_report["processing_time"] = round(time.time() - start_time, 2)
doc.close()
return {
"success": True,
**health_report
}
except Exception as e:
error_msg = sanitize_error_message(str(e))
logger.error(f"PDF health analysis failed: {error_msg}")
return {
"success": False,
"error": error_msg,
"processing_time": round(time.time() - start_time, 2)
}