mcp-pdf-tools

The Unlicense
19
from typing import Any, Optional import asyncio import PyPDF2 from PyPDF2 import PdfReader import os import mcp.types as types from mcp.server.models import InitializationOptions from mcp.server import NotificationOptions, Server import mcp.server.stdio import fnmatch from difflib import get_close_matches import re # Initialize server server = Server("pdf-tools") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available PDF manipulation tools.""" return [ types.Tool( name="merge-pdfs", description="Merge multiple PDF files into a single PDF", inputSchema={ "type": "object", "properties": { "input_paths": { "type": "array", "items": {"type": "string"}, "description": "List of input PDF file paths" }, "output_path": { "type": "string", "description": "Output path for merged PDF" } }, "required": ["input_paths", "output_path"] } ), types.Tool( name="extract-pages", description="Extract specific pages from a PDF file", inputSchema={ "type": "object", "properties": { "input_path": { "type": "string", "description": "Input PDF file path" }, "output_path": { "type": "string", "description": "Output path for new PDF" }, "pages": { "type": "array", "items": {"type": "integer"}, "description": "List of page numbers to extract (1-based indexing)" } }, "required": ["input_path", "output_path", "pages"] } ), types.Tool( name="search-pdfs", description="Search for PDF files in a directory with optional pattern matching", inputSchema={ "type": "object", "properties": { "base_path": { "type": "string", "description": "Base directory to search in" }, "pattern": { "type": "string", "description": "Pattern to match against filenames (e.g., 'report*.pdf')" }, "recursive": { "type": "boolean", "description": "Whether to search in subdirectories", "default": True } }, "required": ["base_path"] } ), types.Tool( name="merge-pdfs-ordered", description="Merge PDFs in a specific order based on patterns or exact names", inputSchema={ "type": "object", "properties": { "base_path": { "type": "string", "description": "Base directory containing PDFs" }, "patterns": { "type": "array", "items": {"type": "string"}, "description": "List of patterns or names in desired order" }, "output_path": { "type": "string", "description": "Output path for merged PDF" }, "fuzzy_matching": { "type": "boolean", "description": "Use fuzzy matching for filenames", "default": True } }, "required": ["base_path", "patterns", "output_path"] } ), types.Tool( name="find-related-pdfs", description="Find a PDF and then search for related PDFs based on its content, including common substring patterns", inputSchema={ "type": "object", "properties": { "base_path": { "type": "string", "description": "Base directory to search in" }, "target_filename": { "type": "string", "description": "Name of the initial PDF to analyze" }, "pattern_matching_only": { "type": "boolean", "description": "Only search for repeating substring patterns", "default": False }, "min_pattern_occurrences": { "type": "integer", "description": "Minimum times a pattern must appear to be considered significant", "default": 2 } }, "required": ["base_path", "target_filename"] } ) ] @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """Handle PDF tool execution requests.""" if not arguments: raise ValueError("Missing arguments") if name == "merge-pdfs": input_paths = arguments.get("input_paths", []) output_path = arguments.get("output_path") if not input_paths or not output_path: raise ValueError("Missing required arguments") merger = PyPDF2.PdfMerger() try: # Add each PDF to the merger for path in input_paths: with open(path, 'rb') as pdf_file: merger.append(pdf_file) # Write the merged PDF with open(output_path, 'wb') as output_file: merger.write(output_file) return [types.TextContent( type="text", text=f"Successfully merged {len(input_paths)} PDFs into {output_path}" )] except Exception as e: return [types.TextContent( type="text", text=f"Error merging PDFs: {str(e)}" )] finally: merger.close() elif name == "extract-pages": input_path = arguments.get("input_path") output_path = arguments.get("output_path") pages = arguments.get("pages", []) if not input_path or not output_path or not pages: raise ValueError("Missing required arguments") try: reader = PyPDF2.PdfReader(input_path) writer = PyPDF2.PdfWriter() # Convert 1-based page numbers to 0-based indices for page_num in pages: if 1 <= page_num <= len(reader.pages): writer.add_page(reader.pages[page_num - 1]) else: return [types.TextContent( type="text", text=f"Error: Page number {page_num} is out of range" )] # Write the extracted pages to the output file with open(output_path, 'wb') as output_file: writer.write(output_file) return [types.TextContent( type="text", text=f"Successfully extracted {len(pages)} pages to {output_path}" )] except Exception as e: return [types.TextContent( type="text", text=f"Error extracting pages: {str(e)}" )] elif name == "search-pdfs": base_path = arguments.get("base_path") pattern = arguments.get("pattern", "*.pdf") recursive = arguments.get("recursive", True) if not base_path: raise ValueError("Missing base_path argument") # Normalize the base path to handle Windows paths base_path = os.path.normpath(base_path) found_files = [] try: if recursive: for root, _, files in os.walk(base_path): for filename in files: # Convert both pattern and filename to lowercase for case-insensitive matching if filename.lower().endswith('.pdf'): # Remove the .pdf from pattern if it exists for more flexible matching search_pattern = pattern.lower().replace('.pdf', '') if search_pattern in filename.lower(): full_path = os.path.join(root, filename) found_files.append(full_path) else: for file in os.listdir(base_path): if file.lower().endswith('.pdf'): search_pattern = pattern.lower().replace('.pdf', '') if search_pattern in file.lower(): full_path = os.path.join(base_path, file) found_files.append(full_path) return [types.TextContent( type="text", text=f"Found {len(found_files)} PDF files:\n" + "\n".join(f"- {f}" for f in found_files) )] except Exception as e: return [types.TextContent( type="text", text=f"Error searching for PDFs: {str(e)}\nBase path: {base_path}" )] elif name == "merge-pdfs-ordered": base_path = arguments.get("base_path") patterns = arguments.get("patterns", []) output_path = arguments.get("output_path") fuzzy_matching = arguments.get("fuzzy_matching", True) if not all([base_path, patterns, output_path]): raise ValueError("Missing required arguments") try: # Get all PDF files in the directory all_pdfs = [] for root, _, files in os.walk(base_path): for file in files: if file.lower().endswith('.pdf'): all_pdfs.append(os.path.join(root, file)) # Match files to patterns selected_files = [] for pattern in patterns: pattern_matched = False # Try exact matches first exact_matches = [f for f in all_pdfs if pattern in os.path.basename(f)] if exact_matches: selected_files.extend(exact_matches) pattern_matched = True # Try fuzzy matching if enabled and no exact matches elif fuzzy_matching: filenames = [os.path.basename(f) for f in all_pdfs] matches = get_close_matches(pattern, filenames, n=3, cutoff=0.6) if matches: for match in matches: matching_files = [f for f in all_pdfs if os.path.basename(f) == match] selected_files.extend(matching_files) pattern_matched = True if not pattern_matched: return [types.TextContent( type="text", text=f"Warning: No matches found for pattern '{pattern}'" )] # Merge the matched files merger = PyPDF2.PdfMerger() for pdf_path in selected_files: with open(pdf_path, 'rb') as pdf_file: merger.append(pdf_file) with open(output_path, 'wb') as output_file: merger.write(output_file) return [types.TextContent( type="text", text=f"Successfully merged {len(selected_files)} PDFs into {output_path}\n" + "Files merged in this order:\n" + "\n".join(f"- {os.path.basename(f)}" for f in selected_files) )] except Exception as e: return [types.TextContent( type="text", text=f"Error merging PDFs: {str(e)}" )] elif name == "find-related-pdfs": base_path = arguments.get("base_path") target_filename = arguments.get("target_filename") if not base_path or not target_filename: raise ValueError("Missing required arguments") try: # First, find the target PDF target_pdf_path = None for root, _, files in os.walk(base_path): for filename in files: if target_filename.lower() in filename.lower() and filename.lower().endswith('.pdf'): target_pdf_path = os.path.join(root, filename) break if target_pdf_path: break if not target_pdf_path: return [types.TextContent( type="text", text=f"Could not find target PDF: {target_filename}" )] # Extract text from the target PDF reader = PdfReader(target_pdf_path) extracted_text = "" for page in reader.pages: extracted_text += page.extract_text() + "\n" # Process the extracted text search_terms = set() if arguments.get("pattern_matching_only", False): # Find common patterns that look like related filenames # Look for 2-3 letter prefixes followed by numbers and optional suffix pattern_regex = re.compile(r'([A-Z]{2,3}\d{3,7}(?:[-_][A-Z0-9]+)?)', re.IGNORECASE) potential_parts = pattern_regex.findall(extracted_text) # Count occurrences of each prefix prefix_counts = {} prefix_patterns = {} for part in potential_parts: # Extract prefix (2-3 letters at start) prefix_match = re.match(r'([A-Z]{2,3})', part, re.IGNORECASE) if prefix_match: prefix = prefix_match.group(1).upper() if prefix not in prefix_counts: prefix_counts[prefix] = 1 prefix_patterns[prefix] = set() else: prefix_counts[prefix] += 1 prefix_patterns[prefix].add(part) # Only keep prefixes that appear frequently enough min_occurrences = arguments.get("min_pattern_occurrences", 2) common_prefixes = {prefix for prefix, count in prefix_counts.items() if count >= min_occurrences} # Add patterns for common prefixes for prefix in common_prefixes: search_terms.update(prefix_patterns[prefix]) # Also add the prefix itself to catch related files search_terms.add(prefix) else: # Original word-based logic words = re.findall(r'\b\w+\b', extracted_text) search_terms.update(words) # Add pattern matching on top of word-based search pattern_regex = re.compile(r'([A-Z]{2,3}\d{3,7}(?:[-_][A-Z0-9]+)?)', re.IGNORECASE) potential_parts = pattern_regex.findall(extracted_text) search_terms.update(potential_parts) # Remove common words and very short terms common_words = {'THE', 'AND', 'OR', 'IN', 'ON', 'AT', 'TO', 'FOR', 'WITH', 'BY'} search_terms = {term for term in search_terms if len(term) > 2 and term.upper() not in common_words} # Search for PDFs matching any of the search terms found_files = set() for root, _, files in os.walk(base_path): for filename in files: if filename.lower().endswith('.pdf'): file_lower = filename.lower() # Extract potential matches from filename file_parts = re.findall(r'([A-Z]{2,3}\d{3,7}(?:[-_][A-Z0-9]+)?)', filename, re.IGNORECASE) for term in search_terms: term_lower = term.lower() if (term_lower in file_lower or any(part.lower() == term_lower for part in file_parts)): full_path = os.path.join(root, filename) found_files.add((full_path, term)) break # Format the results if found_files: result_text = f"Extracted {len(search_terms)} search terms from {os.path.basename(target_pdf_path)}.\n" result_text += "Search terms found: " + ", ".join(sorted(search_terms)) + "\n\n" result_text += "Found related PDFs:\n" # Group files by matching term term_groups = {} for file_path, term in found_files: if term not in term_groups: term_groups[term] = [] term_groups[term].append(file_path) # Output grouped results for term in sorted(term_groups.keys()): result_text += f"\nFiles matching '{term}':\n" for file_path in sorted(term_groups[term]): result_text += f"- {file_path}\n" return [types.TextContent( type="text", text=result_text )] else: return [types.TextContent( type="text", text=f"No related PDFs found for terms extracted from {os.path.basename(target_pdf_path)}" )] except Exception as e: return [types.TextContent( type="text", text=f"Error processing PDFs: {str(e)}" )] else: raise ValueError(f"Unknown tool: {name}") async def main(): """Run the server using stdin/stdout streams.""" async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="pdf-tools", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())