Skip to main content
Glama

MCP Unified Server

by getfounded
document_management.py26.2 kB
#!/usr/bin/env python3 import os import json import logging import io import base64 import tempfile from typing import List, Dict, Any, Optional, Union # PDF processing libraries import pypdf from pdf2image import convert_from_path from PIL import Image as PILImage import pytesseract # Ensure compatibility with mcp server from mcp.server.fastmcp import FastMCP, Context, Image from mcp.types import Tool, TextContent, ImageContent # External MCP reference for tool registration external_mcp = None def set_external_mcp(mcp): """Set the external MCP reference for tool registration""" global external_mcp external_mcp = mcp logging.info("PDF Document Management tools MCP reference set") class PDFService: """Service to handle PDF document operations""" def __init__(self): # Create temp directory for processing self.temp_dir = tempfile.mkdtemp(prefix="pdf_service_") async def get_pdf_info(self, file_path): """Get information about a PDF file""" try: with open(file_path, 'rb') as file: pdf = pypdf.PdfReader(file) # Extract basic information info = { "pages": len(pdf.pages), "file_size": os.path.getsize(file_path), "creator": pdf.metadata.creator if pdf.metadata else None, "author": pdf.metadata.author if pdf.metadata else None, "title": pdf.metadata.title if pdf.metadata else None, "subject": pdf.metadata.subject if pdf.metadata else None, "producer": pdf.metadata.producer if pdf.metadata else None, "creation_date": pdf.metadata.creation_date_raw if pdf.metadata else None, "modification_date": pdf.metadata.modification_date_raw if pdf.metadata else None, } # Get page dimensions for first page if len(pdf.pages) > 0: page = pdf.pages[0] page_box = page.mediabox info["page_width"] = page_box.width info["page_height"] = page_box.height return info except Exception as e: raise ValueError(f"Error getting PDF info: {str(e)}") async def extract_text(self, file_path, pages=None, ocr=False): """Extract text from a PDF file""" try: results = { "pages": [], "total_text_length": 0 } # Method 1: Use pypdf for text extraction with open(file_path, 'rb') as file: pdf = pypdf.PdfReader(file) # If pages not specified, extract all pages if pages is None: pages = list(range(len(pdf.pages))) else: # Convert 1-indexed pages to 0-indexed pages = [p-1 for p in pages if p > 0 and p <= len(pdf.pages)] # Extract text from each page for page_num in pages: page = pdf.pages[page_num] text = page.extract_text() page_result = { "page": page_num + 1, # Return 1-indexed page numbers "text": text, "text_length": len(text) } # If OCR is requested and no text was extracted, try OCR if ocr and (not text or len(text.strip()) == 0): ocr_text = await self._ocr_page(file_path, page_num) page_result["text"] = ocr_text page_result["text_length"] = len(ocr_text) page_result["source"] = "ocr" else: page_result["source"] = "pdf" results["pages"].append(page_result) results["total_text_length"] += page_result["text_length"] return results except Exception as e: raise ValueError(f"Error extracting text: {str(e)}") async def _ocr_page(self, file_path, page_num): """Extract text from a PDF page using OCR""" try: # Convert page to image images = convert_from_path( file_path, first_page=page_num+1, last_page=page_num+1) if not images: return "" # Run OCR on the image ocr_text = pytesseract.image_to_string(images[0]) return ocr_text except Exception as e: logging.warning(f"OCR failed: {str(e)}") return "" async def extract_images(self, file_path, pages=None, min_size=100): """Extract images from a PDF file""" try: results = { "images": [], "total_images": 0 } # Convert PDF pages to images if pages is None: # Get total pages with open(file_path, 'rb') as file: pdf = pypdf.PdfReader(file) pages = list(range(1, len(pdf.pages) + 1)) # Convert specified pages page_images = convert_from_path( file_path, first_page=min(pages), last_page=max(pages) ) # Process each page for i, image in enumerate(page_images): page_num = pages[i] # Save image to temp file img_path = os.path.join(self.temp_dir, f"page_{page_num}.png") image.save(img_path, "PNG") # Get image info width, height = image.size # Skip if image is too small if width < min_size or height < min_size: continue # Encode image as base64 with open(img_path, "rb") as img_file: img_data = base64.b64encode(img_file.read()).decode() # Add to results results["images"].append({ "page": page_num, "width": width, "height": height, "path": img_path, "data": img_data }) results["total_images"] = len(results["images"]) return results except Exception as e: raise ValueError(f"Error extracting images: {str(e)}") async def split_pdf(self, file_path, output_dir, pages_per_file=1): """Split a PDF into multiple files""" try: results = { "files": [], "total_files": 0 } # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) # Open the source PDF with open(file_path, 'rb') as file: pdf = pypdf.PdfReader(file) total_pages = len(pdf.pages) # Calculate number of files needed num_files = (total_pages + pages_per_file - 1) // pages_per_file # Get base filename base_name = os.path.splitext(os.path.basename(file_path))[0] # Create each file for i in range(num_files): start_page = i * pages_per_file end_page = min(start_page + pages_per_file, total_pages) # Create new PDF writer pdf_writer = pypdf.PdfWriter() # Add pages to the writer for page_num in range(start_page, end_page): pdf_writer.add_page(pdf.pages[page_num]) # Save to file output_path = os.path.join( output_dir, f"{base_name}_part_{i+1}.pdf") with open(output_path, 'wb') as output_file: pdf_writer.write(output_file) # Add to results results["files"].append({ "path": output_path, "start_page": start_page + 1, # 1-indexed "end_page": end_page, "pages": end_page - start_page }) results["total_files"] = len(results["files"]) return results except Exception as e: raise ValueError(f"Error splitting PDF: {str(e)}") async def merge_pdfs(self, file_paths, output_path): """Merge multiple PDFs into a single file""" try: # Create PDF writer pdf_writer = pypdf.PdfWriter() # Track pages from each source source_info = [] # Process each input file for file_path in file_paths: with open(file_path, 'rb') as file: pdf = pypdf.PdfReader(file) # Record source info source_info.append({ "path": file_path, "pages": len(pdf.pages) }) # Add all pages to writer for page in pdf.pages: pdf_writer.add_page(page) # Write output file with open(output_path, 'wb') as output_file: pdf_writer.write(output_file) return { "output_path": output_path, "total_pages": sum(source["pages"] for source in source_info), "source_files": source_info } except Exception as e: raise ValueError(f"Error merging PDFs: {str(e)}") async def add_watermark(self, file_path, output_path, text=None, image_path=None, opacity=0.3): """Add a text or image watermark to each page of a PDF""" try: if not text and not image_path: raise ValueError("Either text or image_path must be provided") # Create a watermark PDF watermark_path = os.path.join(self.temp_dir, "watermark.pdf") if text: # Create a text watermark from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import letter c = canvas.Canvas(watermark_path, pagesize=letter) width, height = letter # Set transparency c.setFillAlpha(opacity) # Add rotated text c.saveState() c.translate(width/2, height/2) c.rotate(45) c.setFont("Helvetica", 60) c.setFillColorRGB(0, 0, 0) # Black c.drawCentredString(0, 0, text) c.restoreState() c.save() elif image_path: # Create an image watermark from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import letter # Open and resize image img = PILImage.open(image_path) width, height = letter img_width, img_height = img.size # Resize to fit on page with some margin scale = min((width * 0.8) / img_width, (height * 0.8) / img_height) new_width = int(img_width * scale) new_height = int(img_height * scale) img = img.resize((new_width, new_height), PILImage.LANCZOS) # Save resized image img_resized_path = os.path.join( self.temp_dir, "watermark_img.png") img.save(img_resized_path) # Create PDF with image c = canvas.Canvas(watermark_path, pagesize=letter) # Set transparency c.setFillAlpha(opacity) # Draw image in center c.drawImage( img_resized_path, (width - new_width) / 2, (height - new_height) / 2, width=new_width, height=new_height ) c.save() # Open the watermark PDF with open(watermark_path, 'rb') as watermark_file: watermark_pdf = pypdf.PdfReader(watermark_file) watermark_page = watermark_pdf.pages[0] # Open the source PDF with open(file_path, 'rb') as file: pdf = pypdf.PdfReader(file) pdf_writer = pypdf.PdfWriter() # Apply watermark to each page for page in pdf.pages: page.merge_page(watermark_page) pdf_writer.add_page(page) # Write output file with open(output_path, 'wb') as output_file: pdf_writer.write(output_file) return { "output_path": output_path, "pages_processed": len(pdf.pages), "watermark_type": "text" if text else "image" } except Exception as e: raise ValueError(f"Error adding watermark: {str(e)}") async def encrypt_pdf(self, file_path, output_path, user_password, owner_password=None): """Encrypt a PDF file with password protection""" try: # If owner password not provided, use the same as user password if not owner_password: owner_password = user_password # Open the source PDF with open(file_path, 'rb') as file: pdf = pypdf.PdfReader(file) pdf_writer = pypdf.PdfWriter() # Add all pages to writer for page in pdf.pages: pdf_writer.add_page(page) # Encrypt the PDF pdf_writer.encrypt(user_password, owner_password) # Write output file with open(output_path, 'wb') as output_file: pdf_writer.write(output_file) return { "output_path": output_path, "pages": len(pdf.pages), "encrypted": True } except Exception as e: raise ValueError(f"Error encrypting PDF: {str(e)}") async def decrypt_pdf(self, file_path, output_path, password): """Decrypt an encrypted PDF file""" try: # Open the source PDF with open(file_path, 'rb') as file: pdf = pypdf.PdfReader(file) # Check if PDF is encrypted if not pdf.is_encrypted: return { "output_path": file_path, "pages": len(pdf.pages), "encrypted": False, "message": "PDF is not encrypted" } # Try to decrypt success = pdf.decrypt(password) if not success: raise ValueError("Invalid password") # Create new PDF without encryption pdf_writer = pypdf.PdfWriter() # Add all pages to writer for page in pdf.pages: pdf_writer.add_page(page) # Write output file with open(output_path, 'wb') as output_file: pdf_writer.write(output_file) return { "output_path": output_path, "pages": len(pdf.pages), "decrypted": True } except Exception as e: raise ValueError(f"Error decrypting PDF: {str(e)}") async def get_form_fields(self, file_path): """Get all form fields in a PDF file""" try: with open(file_path, 'rb') as file: pdf = pypdf.PdfReader(file) # Get form fields and their values try: form_fields = pdf.get_form_text_fields() or {} except Exception: # If there are no form fields or an error occurs form_fields = {} return { "form_fields": form_fields, "count": len(form_fields) } except Exception as e: raise ValueError(f"Error getting form fields: {str(e)}") async def fill_form(self, file_path, output_path, form_data): """Fill out form fields in a PDF file""" try: # Open the source PDF with open(file_path, 'rb') as file: pdf = pypdf.PdfReader(file) # Get existing form fields to check if the provided fields exist try: existing_fields = pdf.get_form_text_fields() or {} except Exception: existing_fields = {} if not existing_fields: raise ValueError( "The PDF does not contain any form fields") # Validate form data for field_name in form_data.keys(): if field_name not in existing_fields: raise ValueError( f"Field '{field_name}' does not exist in the PDF form") # Create a PDF writer pdf_writer = pypdf.PdfWriter() # Add all pages to the writer for page in pdf.pages: pdf_writer.add_page(page) # Use the appropriate method based on the pypdf version try: # Try the newer method first (pypdf 3.0+) pdf_writer.update_form_fields(form_data) except AttributeError: # Fall back to updating pages individually for i in range(len(pdf_writer.pages)): pdf_writer.update_page_form_field_values( pdf_writer.pages[i], form_data) # Write output file with open(output_path, 'wb') as output_file: pdf_writer.write(output_file) return { "output_path": output_path, "filled_fields": list(form_data.keys()), "pages": len(pdf.pages) } except Exception as e: raise ValueError(f"Error filling form: {str(e)}") # Tool function definitions that will be registered with MCP async def pdf_info(file_path: str, ctx: Context = None) -> str: """Get information about a PDF document. Parameters: - file_path: Path to the PDF file """ try: pdf_service = _get_pdf_service() info = await pdf_service.get_pdf_info(file_path) return json.dumps(info, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def pdf_extract_text(file_path: str, pages: List[int] = None, ocr: bool = False, ctx: Context = None) -> str: """Extract text from a PDF document. Parameters: - file_path: Path to the PDF file - pages: List of page numbers to extract (1-indexed) - ocr: Whether to use OCR for pages with no text """ try: pdf_service = _get_pdf_service() results = await pdf_service.extract_text(file_path, pages, ocr) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def pdf_extract_images(file_path: str, pages: List[int] = None, min_size: int = 100, ctx: Context = None) -> str: """Extract images from a PDF document. Parameters: - file_path: Path to the PDF file - pages: List of page numbers to extract (1-indexed) - min_size: Minimum image dimension in pixels """ try: pdf_service = _get_pdf_service() results = await pdf_service.extract_images(file_path, pages, min_size) # If images were extracted and we have MCP context, create resources if "images" in results and results["images"] and ctx: for i, img_info in enumerate(results["images"]): if "data" in img_info: img = Image(data=img_info["data"], format="png") img_resource_id = f"pdf_img_{os.path.basename(file_path)}_{img_info['page']}" ctx.set_resource(img_resource_id, img) # Replace base64 data with resource ID results["images"][i]["resource_id"] = img_resource_id # Remove base64 data to keep response smaller del results["images"][i]["data"] return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def pdf_split(file_path: str, output_dir: str, pages_per_file: int = 1, ctx: Context = None) -> str: """Split a PDF into multiple files. Parameters: - file_path: Path to the PDF file - output_dir: Directory to save the split files - pages_per_file: Number of pages per output file """ try: pdf_service = _get_pdf_service() results = await pdf_service.split_pdf(file_path, output_dir, pages_per_file) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def pdf_merge(file_paths: List[str], output_path: str, ctx: Context = None) -> str: """Merge multiple PDF files into one. Parameters: - file_paths: List of paths to the PDF files to merge - output_path: Path to save the merged file """ try: pdf_service = _get_pdf_service() results = await pdf_service.merge_pdfs(file_paths, output_path) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def pdf_add_watermark(file_path: str, output_path: str, text: str = None, image_path: str = None, opacity: float = 0.3, ctx: Context = None) -> str: """Add a watermark to a PDF document. Parameters: - file_path: Path to the PDF file - output_path: Path to save the watermarked file - text: Text to use as watermark - image_path: Path to image to use as watermark - opacity: Opacity of the watermark (0-1) """ try: pdf_service = _get_pdf_service() results = await pdf_service.add_watermark(file_path, output_path, text, image_path, opacity) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def pdf_encrypt(file_path: str, output_path: str, user_password: str, owner_password: str = None, ctx: Context = None) -> str: """Encrypt a PDF document with password protection. Parameters: - file_path: Path to the PDF file - output_path: Path to save the encrypted file - user_password: Password required to open the PDF - owner_password: Password for full access (optional, defaults to user_password) """ try: pdf_service = _get_pdf_service() results = await pdf_service.encrypt_pdf(file_path, output_path, user_password, owner_password) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def pdf_decrypt(file_path: str, output_path: str, password: str, ctx: Context = None) -> str: """Decrypt an encrypted PDF document. Parameters: - file_path: Path to the encrypted PDF file - output_path: Path to save the decrypted file - password: Password to decrypt the PDF """ try: pdf_service = _get_pdf_service() results = await pdf_service.decrypt_pdf(file_path, output_path, password) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def pdf_get_form_fields(file_path: str, ctx: Context = None) -> str: """Get all form fields in a PDF document. Parameters: - file_path: Path to the PDF file """ try: pdf_service = _get_pdf_service() results = await pdf_service.get_form_fields(file_path) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def pdf_fill_form(file_path: str, output_path: str, form_data: Dict[str, str], ctx: Context = None) -> str: """Fill out form fields in a PDF document. Parameters: - file_path: Path to the PDF file - output_path: Path to save the filled form - form_data: Dictionary with field names as keys and field values as values """ try: pdf_service = _get_pdf_service() results = await pdf_service.fill_form(file_path, output_path, form_data) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tool registration and initialization _pdf_service = None def initialize_pdf_service(): """Initialize the PDF service""" global _pdf_service _pdf_service = PDFService() return _pdf_service def _get_pdf_service(): """Get or initialize the PDF service""" global _pdf_service if _pdf_service is None: _pdf_service = initialize_pdf_service() return _pdf_service def get_pdf_tools(): """Get a dictionary of all PDF tools for registration with MCP""" return { "pdf_info": pdf_info, "pdf_extract_text": pdf_extract_text, "pdf_extract_images": pdf_extract_images, "pdf_split": pdf_split, "pdf_merge": pdf_merge, "pdf_add_watermark": pdf_add_watermark, "pdf_encrypt": pdf_encrypt, "pdf_decrypt": pdf_decrypt, "pdf_get_form_fields": pdf_get_form_fields, "pdf_fill_form": pdf_fill_form }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/getfounded/mcp-tool-kit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server