Skip to main content
Glama

Auto-Snap MCP

server.py22 kB
""" MCP Server for automated screenshot capture and PDF conversion. """ import asyncio import logging import json import time from typing import Any, Dict, List, Optional from pathlib import Path from mcp.server.fastmcp import FastMCP from mcp.server.models import InitializationOptions from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource from capture import CrossPlatformWindowManager, WindowCapture, check_dependencies from processing import ImageProcessor, check_tesseract from pdf_utils import PDFConverter from config import get_config, get_output_directory, get_temp_directory, generate_page_filename # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) # Initialize MCP server logger.info("Initializing MCP server...") mcp = FastMCP("Auto-Snap MCP") logger.info("MCP server initialized") # Initialize components lazily to avoid blocking MCP initialization window_manager = None image_processor = None pdf_converter = None def get_window_manager(): """Lazy initialization of window manager.""" global window_manager if window_manager is None: logger.info("Initializing CrossPlatformWindowManager...") try: window_manager = CrossPlatformWindowManager() logger.info("CrossPlatformWindowManager initialized successfully") except Exception as e: logger.error(f"Failed to initialize CrossPlatformWindowManager: {e}") raise return window_manager def get_image_processor(): """Lazy initialization of image processor.""" global image_processor if image_processor is None: logger.info("Initializing ImageProcessor...") image_processor = ImageProcessor() logger.info("ImageProcessor initialized successfully") return image_processor def get_pdf_converter(): """Lazy initialization of PDF converter.""" global pdf_converter if pdf_converter is None: logger.info("Initializing PDFConverter...") pdf_converter = PDFConverter() logger.info("PDFConverter initialized successfully") return pdf_converter @mcp.tool() async def list_windows() -> str: """ List all available windows for screenshot capture. Returns: JSON string containing list of windows with their IDs, titles, and properties. """ try: wm = get_window_manager() windows = wm.list_windows() env_info = wm.get_environment_info() result = { "status": "success", "windows": windows, "count": len(windows), "environment": env_info } return json.dumps(result, indent=2) except Exception as e: logger.error(f"Failed to list windows: {e}") return json.dumps({ "status": "error", "error": str(e), "windows": [], "count": 0, "environment": {"error": "Could not determine environment"} }) @mcp.tool() async def capture_window(window_id: str, output_path: Optional[str] = None) -> str: """ Capture screenshot of a specific window. Args: window_id: Window ID to capture (from list_windows) output_path: Optional path to save the screenshot Returns: JSON string with capture results and file path. """ try: wm = get_window_manager() captured_path = wm.capture_window(window_id, output_path) result = { "status": "success", "window_id": window_id, "output_path": captured_path, "file_exists": Path(captured_path).exists(), "file_size_mb": round(Path(captured_path).stat().st_size / (1024 * 1024), 2) } return json.dumps(result, indent=2) except Exception as e: logger.error(f"Failed to capture window {window_id}: {e}") return json.dumps({ "status": "error", "error": str(e), "window_id": window_id, "output_path": output_path }) @mcp.tool() async def capture_full_screen(output_path: Optional[str] = None) -> str: """ Capture screenshot of the entire screen. Args: output_path: Optional path to save the screenshot Returns: JSON string with capture results and file path. """ try: wm = get_window_manager() captured_path = wm.capture_full_screen(output_path) result = { "status": "success", "output_path": captured_path, "file_exists": Path(captured_path).exists(), "file_size_mb": round(Path(captured_path).stat().st_size / (1024 * 1024), 2) } return json.dumps(result, indent=2) except Exception as e: logger.error(f"Failed to capture full screen: {e}") return json.dumps({ "status": "error", "error": str(e), "output_path": output_path }) @mcp.tool() async def capture_document_pages( window_id: str, page_count: int, output_dir: Optional[str] = None, navigation_key: str = "Page_Down", delay_seconds: float = 1.0 ) -> str: """ Capture multiple pages from a document window with automatic navigation. Args: window_id: Window ID containing the document page_count: Number of pages to capture output_dir: Directory to save captured pages navigation_key: Key to press for navigation (Page_Down, Right, space) delay_seconds: Delay between navigation and capture Returns: JSON string with capture results and list of captured files. """ try: # Get configured output directory (with backward compatibility) config = get_config() if output_dir is None: if config.should_use_legacy_mode(): output_dir = "captures" # Legacy default else: actual_output_dir = get_output_directory() else: actual_output_dir = get_output_directory(output_dir) # Convert to string for compatibility with existing manager interface output_dir_str = str(actual_output_dir) # For multi-page capture, use the underlying manager if it supports it wm = get_window_manager() if hasattr(wm.manager, 'capture_multiple_pages'): captured_files = wm.manager.capture_multiple_pages( window_id=window_id, page_count=page_count, output_dir=output_dir_str, navigation_key=navigation_key, delay_seconds=delay_seconds ) else: # Fallback: capture individual pages manually actual_output_dir.mkdir(parents=True, exist_ok=True) captured_files = [] for page_num in range(1, page_count + 1): filename = generate_page_filename(page_num) output_path = actual_output_dir / filename captured_path = wm.capture_window(window_id, str(output_path)) captured_files.append(captured_path) # Simple delay between captures (no navigation for Windows apps yet) if page_num < page_count: time.sleep(delay_seconds) import os total_size = sum(os.path.getsize(f) for f in captured_files if os.path.exists(f)) result = { "status": "success", "window_id": window_id, "pages_captured": len(captured_files), "output_directory": output_dir_str, "captured_files": captured_files, "total_size_mb": round(total_size / (1024 * 1024), 2) } return json.dumps(result, indent=2) except Exception as e: logger.error(f"Failed to capture document pages: {e}") return json.dumps({ "status": "error", "error": str(e), "window_id": window_id, "page_count": page_count, "captured_files": [] }) @mcp.tool() async def process_images( image_dir: str, operations: List[str] = ["enhance"], ocr_language: str = "eng" ) -> str: """ Process images in a directory with various operations. Args: image_dir: Directory containing images to process operations: List of operations (enhance, ocr, resize, deduplicate) ocr_language: Language for OCR processing (default: eng) Returns: JSON string with processing results. """ try: ip = get_image_processor() results = ip.process_batch(image_dir, operations) # Add OCR language info to results if OCR was performed if "ocr" in operations: for ocr_result in results.get("ocr_results", []): ocr_result["language"] = ocr_language result = { "status": "success", "image_directory": image_dir, "operations": operations, "results": results } return json.dumps(result, indent=2) except Exception as e: logger.error(f"Failed to process images: {e}") return json.dumps({ "status": "error", "error": str(e), "image_directory": image_dir, "operations": operations }) @mcp.tool() async def convert_to_pdf( image_paths: List[str], output_path: str, title: Optional[str] = None, sort_files: bool = True ) -> str: """ Convert a list of images to a PDF document. Args: image_paths: List of image file paths to convert output_path: Path for the output PDF file title: Optional title for the PDF document sort_files: Whether to sort files by name before conversion Returns: JSON string with conversion results. """ try: # Validate images first pc = get_pdf_converter() validation = pc.validate_images_for_pdf(image_paths) if not validation["valid_images"]: return json.dumps({ "status": "error", "error": "No valid images found for conversion", "validation": validation }) # Convert to PDF pdf_path = pc.images_to_pdf( validation["valid_images"], output_path, sort_files=sort_files, title=title ) pdf_info = pc.get_pdf_info(pdf_path) result = { "status": "success", "input_images": len(image_paths), "valid_images": len(validation["valid_images"]), "output_pdf": pdf_path, "pdf_info": pdf_info, "validation": validation } return json.dumps(result, indent=2) except Exception as e: logger.error(f"Failed to convert to PDF: {e}") return json.dumps({ "status": "error", "error": str(e), "input_images": len(image_paths) if image_paths else 0, "output_pdf": output_path }) @mcp.tool() async def directory_to_pdf( image_dir: str, output_path: str, title: Optional[str] = None, pattern: str = "*" ) -> str: """ Convert all images in a directory to a PDF document. Args: image_dir: Directory containing images output_path: Path for the output PDF file title: Optional title for the PDF document pattern: File pattern to match (default: all files) Returns: JSON string with conversion results. """ try: pc = get_pdf_converter() pdf_path = pc.directory_to_pdf( image_dir=image_dir, output_path=output_path, pattern=pattern, title=title ) pdf_info = pc.get_pdf_info(pdf_path) result = { "status": "success", "input_directory": image_dir, "pattern": pattern, "output_pdf": pdf_path, "pdf_info": pdf_info } return json.dumps(result, indent=2) except Exception as e: logger.error(f"Failed to convert directory to PDF: {e}") return json.dumps({ "status": "error", "error": str(e), "input_directory": image_dir, "output_pdf": output_path }) @mcp.tool() async def full_document_workflow( window_id: str, page_count: int, output_pdf: str, capture_dir: Optional[str] = None, title: Optional[str] = None, navigation_key: str = "Page_Down", delay_seconds: float = 1.0, process_images_flag: bool = True ) -> str: """ Complete workflow: capture document pages, optionally process them, and convert to PDF. Args: window_id: Window ID containing the document page_count: Number of pages to capture output_pdf: Path for the final PDF file capture_dir: Temporary directory for captures title: Optional PDF title navigation_key: Key for page navigation delay_seconds: Delay between navigation and capture process_images_flag: Whether to enhance images before PDF conversion Returns: JSON string with complete workflow results. """ try: # Get configured temp directory (with backward compatibility) config = get_config() if capture_dir is None: if config.should_use_legacy_mode(): capture_dir = "temp_captures" # Legacy default else: actual_capture_dir = get_temp_directory() else: actual_capture_dir = get_temp_directory(capture_dir) # Convert to string for compatibility with existing manager interface capture_dir_str = str(actual_capture_dir) workflow_results = { "status": "success", "steps": [] } # Step 1: Capture pages logger.info("Step 1: Capturing document pages") wm = get_window_manager() if hasattr(wm.manager, 'capture_multiple_pages'): captured_files = wm.manager.capture_multiple_pages( window_id=window_id, page_count=page_count, output_dir=capture_dir_str, navigation_key=navigation_key, delay_seconds=delay_seconds ) else: # Fallback for Windows applications actual_capture_dir.mkdir(parents=True, exist_ok=True) captured_files = [] for page_num in range(1, page_count + 1): filename = generate_page_filename(page_num) output_path = actual_capture_dir / filename captured_path = wm.capture_window(window_id, str(output_path)) captured_files.append(captured_path) if page_num < page_count: time.sleep(delay_seconds) workflow_results["steps"].append({ "step": "capture", "status": "success", "files_captured": len(captured_files), "output_directory": capture_dir_str }) # Step 2: Process images (if requested) processed_files = captured_files if process_images_flag: logger.info("Step 2: Processing captured images") ip = get_image_processor() processing_results = ip.process_batch( capture_dir_str, ["enhance"] ) if processing_results["enhanced_files"]: processed_files = processing_results["enhanced_files"] workflow_results["steps"].append({ "step": "processing", "status": "success", "enhanced_files": len(processing_results["enhanced_files"]) }) # Step 3: Convert to PDF logger.info("Step 3: Converting to PDF") pc = get_pdf_converter() pdf_path = pc.images_to_pdf( processed_files, output_pdf, sort_files=True, title=title or f"Document captured from window {window_id}" ) pdf_info = pc.get_pdf_info(pdf_path) workflow_results["steps"].append({ "step": "pdf_conversion", "status": "success", "output_pdf": pdf_path, "pdf_info": pdf_info }) # Step 4: Cleanup temporary files (optional) import shutil try: config = get_config() if config.get_config_summary().get("auto_cleanup_temp", True): # Only cleanup if it's a temp directory or legacy temp pattern if (capture_dir is None and not config.should_use_legacy_mode()) or \ (capture_dir_str.startswith("temp_") or "temp" in str(actual_capture_dir).lower()): shutil.rmtree(capture_dir_str) workflow_results["steps"].append({ "step": "cleanup", "status": "success", "cleaned_directory": capture_dir_str }) except Exception as cleanup_error: logger.warning(f"Failed to cleanup {capture_dir_str}: {cleanup_error}") workflow_results.update({ "window_id": window_id, "pages_captured": len(captured_files), "final_pdf": pdf_path, "pdf_info": pdf_info }) return json.dumps(workflow_results, indent=2) except Exception as e: logger.error(f"Workflow failed: {e}") return json.dumps({ "status": "error", "error": str(e), "window_id": window_id, "page_count": page_count, "output_pdf": output_pdf }) @mcp.tool() async def check_system_dependencies() -> str: """ Check if all required system dependencies are installed. Returns: JSON string with dependency check results. """ try: missing_deps = check_dependencies() tesseract_available = check_tesseract() result = { "status": "success", "dependencies": { "wmctrl": "wmctrl" not in missing_deps, "xdotool": "xdotool" not in missing_deps, "tesseract": tesseract_available }, "missing_dependencies": missing_deps, "install_commands": { "wmctrl": "sudo apt-get install wmctrl", "xdotool": "sudo apt-get install xdotool", "tesseract": "sudo apt-get install tesseract-ocr" } } if missing_deps or not tesseract_available: result["status"] = "warning" result["message"] = "Some dependencies are missing" return json.dumps(result, indent=2) except Exception as e: logger.error(f"Failed to check dependencies: {e}") return json.dumps({ "status": "error", "error": str(e) }) @mcp.tool() async def debug_window_detection() -> str: """ Comprehensive debugging information for window detection issues. Returns: JSON string with detailed diagnostics about PowerShell environment, process enumeration, and window detection capabilities. """ try: wm = get_window_manager() # Get basic environment info env_info = wm.get_environment_info() debug_result = { "status": "success", "environment": env_info, "window_detection_debug": {} } # If using Windows manager, get detailed debug info if hasattr(wm.manager, 'debug_window_detection'): logger.info("Running comprehensive window detection debug...") debug_info = wm.manager.debug_window_detection() debug_result["window_detection_debug"] = debug_info else: debug_result["window_detection_debug"] = { "message": "Debug functionality only available for Windows Window Manager" } # Also include current window list for comparison current_windows = wm.list_windows() debug_result["current_window_list"] = current_windows debug_result["current_window_count"] = len(current_windows) logger.info(f"Debug complete: found {len(current_windows)} windows") return json.dumps(debug_result, indent=2) except Exception as e: logger.error(f"Failed to run debug window detection: {e}") return json.dumps({ "status": "error", "error": str(e), "environment": {"error": "Could not determine environment"} }) def main(): """Run the MCP server.""" logger.info("Starting Auto-Snap MCP Server") # Check dependencies on startup missing_deps = check_dependencies() if missing_deps: logger.warning(f"Missing system dependencies: {missing_deps}") logger.warning("Some functionality may not work properly") if not check_tesseract(): logger.warning("Tesseract OCR not available - OCR functionality disabled") # Use default FastMCP run (which should handle STDIO automatically) mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PovedaAqui/auto-snap-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server