Skip to main content
Glama

Computer Control MCP

by AB498
core.py25.6 kB
#!/usr/bin/env python3 """ Computer Control MCP - Core Implementation A compact ModelContextProtocol server that provides computer control capabilities using PyAutoGUI for mouse/keyboard control. """ import json import shutil import sys import os from typing import Dict, Any, List, Optional, Tuple from io import BytesIO import re import asyncio import uuid import datetime from pathlib import Path import tempfile from typing import Union # --- Auto-install dependencies if needed --- import pyautogui from mcp.server.fastmcp import FastMCP, Image import mss from PIL import Image as PILImage try: import pywinctl as gw except (NotImplementedError, ImportError): import pygetwindow as gw from fuzzywuzzy import fuzz, process import cv2 from rapidocr import RapidOCR from pydantic import BaseModel BaseModel.model_config = {"arbitrary_types_allowed": True} engine = RapidOCR() DEBUG = True # Set to False in production RELOAD_ENABLED = True # Set to False to disable auto-reload # Create FastMCP server instance at module level mcp = FastMCP("ComputerControlMCP") # Determine mode automatically IS_DEVELOPMENT = os.getenv("ENV") == "development" def log(message: str) -> None: """Log to stderr in dev, to stdout or file in production.""" if IS_DEVELOPMENT: # In dev, write to stderr print(f"[DEV] {message}", file=sys.stderr) else: # In production, write to stdout or a file print(f"[PROD] {message}", file=sys.stdout) # or append to a file: open("app.log", "a").write(message+"\n") def get_downloads_dir() -> Path: """Get the OS downloads directory.""" if os.name == "nt": # Windows import winreg sub_key = r"SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders" downloads_guid = "{374DE290-123F-4565-9164-39C4925E467B}" with winreg.OpenKey(winreg.HKEY_CURRENT_USER, sub_key) as key: downloads_dir = winreg.QueryValueEx(key, downloads_guid)[0] return Path(downloads_dir) else: # macOS, Linux, etc. return Path.home() / "Downloads" def _mss_screenshot(region=None): """Take a screenshot using mss and return PIL Image. Args: region: Optional tuple (left, top, width, height) for region capture Returns: PIL Image object """ with mss.mss() as sct: if region is None: # Full screen screenshot monitor = sct.monitors[0] # All monitors combined else: # Region screenshot left, top, width, height = region monitor = { "left": left, "top": top, "width": width, "height": height, } screenshot = sct.grab(monitor) # Convert to PIL Image return PILImage.frombytes( "RGB", screenshot.size, screenshot.bgra, "raw", "BGRX" ) def save_image_to_downloads( image, prefix: str = "screenshot", directory: Path = None ) -> Tuple[str, bytes]: """Save an image to the downloads directory and return its absolute path. Args: image: Either a PIL Image object or MCP Image object prefix: Prefix for the filename (default: 'screenshot') directory: Optional directory to save the image to Returns: Tuple of (absolute_path, image_data_bytes) """ # Create a unique filename with timestamp timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") unique_id = str(uuid.uuid4())[:8] filename = f"{prefix}_{timestamp}_{unique_id}.png" # Get downloads directory downloads_dir = directory or get_downloads_dir() filepath = downloads_dir / filename # Handle different image types if hasattr(image, "save"): # PIL Image image.save(filepath) # Also get the bytes for returning img_byte_arr = BytesIO() image.save(img_byte_arr, format="PNG") img_bytes = img_byte_arr.getvalue() elif hasattr(image, "data"): # MCP Image img_bytes = image.data with open(filepath, "wb") as f: f.write(img_bytes) else: raise TypeError("Unsupported image type") log(f"Saved image to {filepath}") return str(filepath.absolute()), img_bytes def _find_matching_window( windows: any, title_pattern: str = None, use_regex: bool = False, threshold: int = 10, ) -> Optional[Dict[str, Any]]: """Helper function to find a matching window based on title pattern. Args: windows: List of window dictionaries title_pattern: Pattern to match window title use_regex: If True, treat the pattern as a regex, otherwise use fuzzy matching threshold: Minimum score (0-100) required for a fuzzy match Returns: The best matching window or None if no match found """ if not title_pattern: log("No title pattern provided, returning None") return None # For regex matching if use_regex: for window in windows: if re.search(title_pattern, window["title"], re.IGNORECASE): log(f"Regex match found: {window['title']}") return window return None # For fuzzy matching using fuzzywuzzy # Extract all window titles window_titles = [window["title"] for window in windows] # Use process.extractOne to find the best match best_match_title, score = process.extractOne( title_pattern, window_titles, scorer=fuzz.partial_ratio ) log(f"Best fuzzy match: '{best_match_title}' with score {score}") # Only return if the score is above the threshold if score >= threshold: # Find the window with the matching title for window in windows: if window["title"] == best_match_title: return window return None # --- MCP Function Handlers --- @mcp.tool() def click_screen(x: int, y: int) -> str: """Click at the specified screen coordinates.""" try: pyautogui.click(x=x, y=y) return f"Successfully clicked at coordinates ({x}, {y})" except Exception as e: return f"Error clicking at coordinates ({x}, {y}): {str(e)}" @mcp.tool() def get_screen_size() -> Dict[str, Any]: """Get the current screen resolution.""" try: width, height = pyautogui.size() return { "width": width, "height": height, "message": f"Screen size: {width}x{height}", } except Exception as e: return {"error": str(e), "message": f"Error getting screen size: {str(e)}"} @mcp.tool() def type_text(text: str) -> str: """Type the specified text at the current cursor position.""" try: pyautogui.typewrite(text) return f"Successfully typed text: {text}" except Exception as e: return f"Error typing text: {str(e)}" @mcp.tool() def take_screenshot( title_pattern: str = None, use_regex: bool = False, threshold: int = 10, scale_percent_for_ocr: int = None, save_to_downloads: bool = False, ) -> Image: """ Get screenshot Image as MCP Image object. If no title pattern is provided, get screenshot of entire screen and all text on the screen. Args: title_pattern: Pattern to match window title, if None, take screenshot of entire screen use_regex: If True, treat the pattern as a regex, otherwise best match with fuzzy matching threshold: Minimum score (0-100) required for a fuzzy match scale_percent_for_ocr: Percentage to scale the image down before processing, you wont need this most of the time unless your pc is extremely old or slow save_to_downloads: If True, save the screenshot to the downloads directory and return the absolute path Returns: Returns a single screenshot as MCP Image object. "content type image not supported" means preview isnt supported but Image object is there and returned successfully. """ try: all_windows = gw.getAllWindows() # Convert to list of dictionaries for _find_matching_window windows = [] for window in all_windows: if window.title: # Only include windows with titles windows.append( { "title": window.title, "window_obj": window, # Store the actual window object } ) log(f"Found {len(windows)} windows") window = _find_matching_window(windows, title_pattern, use_regex, threshold) window = window["window_obj"] if window else None import ctypes import time def force_activate(window): """Force a window to the foreground on Windows.""" try: hwnd = window._hWnd # pywinctl window handle # Restore if minimized if window.isMinimized: window.restore() time.sleep(0.1) # Bring to top and set foreground ctypes.windll.user32.SetForegroundWindow(hwnd) ctypes.windll.user32.BringWindowToTop(hwnd) window.activate() # fallback time.sleep(0.3) # wait for OS to update except Exception as e: print(f"Warning: Could not force window: {e}", file=sys.stderr) # Take the screenshot if not window: log("No matching window found, taking screenshot of entire screen") screenshot = _mss_screenshot() else: try: # Re-fetch window handle to ensure it's valid window = gw.getWindowsWithTitle(window.title)[0] current_active_window = gw.getActiveWindow() log(f"Taking screenshot of window: {window.title}") if sys.platform == "win32": force_activate(window) else: window.activate() pyautogui.sleep(0.5) # Give Windows time to focus screen_width, screen_height = pyautogui.size() screenshot = _mss_screenshot( region=( max(window.left, 0), max(window.top, 0), min(window.width, screen_width), min(window.height, screen_height), ) ) # Restore previously active window if current_active_window and current_active_window != window: try: if sys.platform == "win32": force_activate(current_active_window) else: current_active_window.activate() pyautogui.sleep(0.2) except Exception as e: log(f"Error restoring previous window: {str(e)}") except Exception as e: log(f"Error taking screenshot of window: {str(e)}") screenshot = _mss_screenshot() # fallback to full screen # Create temp directory temp_dir = Path(tempfile.mkdtemp()) # Save screenshot and get filepath filepath, _ = save_image_to_downloads( screenshot, prefix="screenshot", directory=temp_dir ) # Create Image object from filepath image = Image(filepath) if save_to_downloads: log("Copying screenshot from temp to downloads") shutil.copy(filepath, get_downloads_dir()) return image # MCP Image object except Exception as e: log(f"Error in screenshot or getting UI elements: {str(e)}") import traceback stack_trace = traceback.format_exc() log(f"Stack trace:\n{stack_trace}") return f"Error in screenshot or getting UI elements: {str(e)}\nStack trace:\n{stack_trace}" def is_low_spec_pc() -> bool: try: import psutil cpu_low = psutil.cpu_count(logical=False) < 4 ram_low = psutil.virtual_memory().total < 8 * 1024**3 return cpu_low or ram_low except Exception: # Fallback if psutil not available or info unavailable return False @mcp.tool() def take_screenshot_with_ocr( title_pattern: str = None, use_regex: bool = False, threshold: int = 10, scale_percent_for_ocr: int = None, save_to_downloads: bool = False, ) -> str: """ Get OCR text from screenshot with absolute coordinates as JSON string of List[Tuple[List[List[int]], str, float]] (returned after adding the window offset from true (0, 0) of screen to the OCR coordinates, so clicking is on-point. Recommended to click in the middle of OCR Box) and using confidence from window with the specified title pattern. If no title pattern is provided, get screenshot of entire screen and all text on the screen. Know that OCR takes around 20 seconds on an mid-spec pc at 1080p resolution. Args: title_pattern: Pattern to match window title, if None, take screenshot of entire screen use_regex: If True, treat the pattern as a regex, otherwise best match with fuzzy matching threshold: Minimum score (0-100) required for a fuzzy match scale_percent_for_ocr: Percentage to scale the image down before processing, you wont need this most of the time unless your pc is extremely old or slow save_to_downloads: If True, save the screenshot to the downloads directory and return the absolute path Returns: Returns a list of UI elements as List[Tuple[List[List[int]], str, float]] where each tuple is [[4 corners of box], text, confidence], "content type image not supported" means preview isnt supported but Image object is there. """ try: all_windows = gw.getAllWindows() # Convert to list of dictionaries for _find_matching_window windows = [] for window in all_windows: if window.title: # Only include windows with titles windows.append( { "title": window.title, "window_obj": window, # Store the actual window object } ) log(f"Found {len(windows)} windows") window = _find_matching_window(windows, title_pattern, use_regex, threshold) window = window["window_obj"] if window else None # Store the currently active window # Take the screenshot if not window: log("No matching window found, taking screenshot of entire screen") screenshot = _mss_screenshot() else: current_active_window = gw.getActiveWindow() log(f"Taking screenshot of window: {window.title}") # Activate the window and wait for it to be fully in focus try: window.activate() pyautogui.sleep(0.5) # Wait for 0.5 seconds to ensure window is active screenshot = _mss_screenshot( region=(window.left, window.top, window.width, window.height) ) # Restore the previously active window if current_active_window: try: current_active_window.activate() pyautogui.sleep( 0.2 ) # Wait a bit to ensure previous window is restored except Exception as e: log(f"Error restoring previous window: {str(e)}") except Exception as e: log(f"Error taking screenshot of window: {str(e)}") return f"Error taking screenshot of window: {str(e)}" # Create temp directory temp_dir = Path(tempfile.mkdtemp()) # Save screenshot and get filepath filepath, _ = save_image_to_downloads( screenshot, prefix="screenshot", directory=temp_dir ) # Create Image object from filepath image = Image(filepath) # Copy from temp to downloads if save_to_downloads: log("Copying screenshot from temp to downloads") shutil.copy(filepath, get_downloads_dir()) image_path = image.path img = cv2.imread(image_path) if scale_percent_for_ocr is None: # Calculate percent to scale height to 360 pixels scale_percent_for_ocr = 100 # 360 / img.shape[0] * 100 # Lower down resolution before processing width = int(img.shape[1] * scale_percent_for_ocr / 100) height = int(img.shape[0] * scale_percent_for_ocr / 100) dim = (width, height) resized_img = cv2.resize(img, dim, interpolation=cv2.INTER_AREA) # save resized image to pwd # cv2.imwrite("resized_img.png", resized_img) output = engine(resized_img) boxes = output.boxes txts = output.txts scores = output.scores zipped_results = list(zip(boxes, txts, scores)) zipped_results = [ ( box.tolist(), text, float(score), ) # convert np.array -> list, ensure score is float for box, text, score in zipped_results ] log(f"Found {len(zipped_results)} text items in OCR result.") log(f"First 5 items: {zipped_results[:5]}") return ( ",\n".join([str(item) for item in zipped_results]) if zipped_results else "No text found" ) except Exception as e: log(f"Error in screenshot or getting UI elements: {str(e)}") import traceback stack_trace = traceback.format_exc() log(f"Stack trace:\n{stack_trace}") return f"Error in screenshot or getting UI elements: {str(e)}\nStack trace:\n{stack_trace}" @mcp.tool() def move_mouse(x: int, y: int) -> str: """Move the mouse to the specified screen coordinates.""" try: pyautogui.moveTo(x=x, y=y) return f"Successfully moved mouse to coordinates ({x}, {y})" except Exception as e: return f"Error moving mouse to coordinates ({x}, {y}): {str(e)}" @mcp.tool() def mouse_down(button: str = "left") -> str: """Hold down a mouse button ('left', 'right', 'middle').""" try: pyautogui.mouseDown(button=button) return f"Held down {button} mouse button" except Exception as e: return f"Error holding {button} mouse button: {str(e)}" @mcp.tool() def mouse_up(button: str = "left") -> str: """Release a mouse button ('left', 'right', 'middle').""" try: pyautogui.mouseUp(button=button) return f"Released {button} mouse button" except Exception as e: return f"Error releasing {button} mouse button: {str(e)}" @mcp.tool() async def drag_mouse( from_x: int, from_y: int, to_x: int, to_y: int, duration: float = 0.5 ) -> str: """ Drag the mouse from one position to another. Args: from_x: Starting X coordinate from_y: Starting Y coordinate to_x: Ending X coordinate to_y: Ending Y coordinate duration: Duration of the drag in seconds (default: 0.5) Returns: Success or error message """ try: # First move to the starting position pyautogui.moveTo(x=from_x, y=from_y) # Then drag to the destination log("starting drag") await asyncio.to_thread(pyautogui.dragTo, x=to_x, y=to_y, duration=duration) log("done drag") return f"Successfully dragged from ({from_x}, {from_y}) to ({to_x}, {to_y})" except Exception as e: return f"Error dragging from ({from_x}, {from_y}) to ({to_x}, {to_y}): {str(e)}" import pyautogui from typing import Union, List @mcp.tool() def key_down(key: str) -> str: """Hold down a specific keyboard key until released.""" try: pyautogui.keyDown(key) return f"Held down key: {key}" except Exception as e: return f"Error holding key {key}: {str(e)}" @mcp.tool() def key_up(key: str) -> str: """Release a specific keyboard key.""" try: pyautogui.keyUp(key) return f"Released key: {key}" except Exception as e: return f"Error releasing key {key}: {str(e)}" @mcp.tool() def press_keys(keys: Union[str, List[Union[str, List[str]]]]) -> str: """ Press keyboard keys. Args: keys: - Single key as string (e.g., "enter") - Sequence of keys as list (e.g., ["a", "b", "c"]) - Key combinations as nested list (e.g., [["ctrl", "c"], ["alt", "tab"]]) Examples: press_keys("enter") press_keys(["a", "b", "c"]) press_keys([["ctrl", "c"], ["alt", "tab"]]) """ try: if isinstance(keys, str): # Single key pyautogui.press(keys) return f"Pressed single key: {keys}" elif isinstance(keys, list): for item in keys: if isinstance(item, str): # Sequential key press pyautogui.press(item) elif isinstance(item, list): # Key combination (e.g., ctrl+c) pyautogui.hotkey(*item) else: return f"Invalid key format: {item}" return f"Successfully pressed keys sequence: {keys}" else: return "Invalid input: must be str or list" except Exception as e: return f"Error pressing keys {keys}: {str(e)}" @mcp.tool() def list_windows() -> List[Dict[str, Any]]: """List all open windows on the system.""" try: windows = gw.getAllWindows() result = [] for window in windows: if window.title: # Only include windows with titles result.append( { "title": window.title, "left": window.left, "top": window.top, "width": window.width, "height": window.height, "is_active": window.isActive, "is_visible": window.visible, "is_minimized": window.isMinimized, "is_maximized": window.isMaximized, # "screenshot": pyautogui.screenshot( # region=( # window.left, # window.top, # window.width, # window.height, # ) # ), } ) return result except Exception as e: log(f"Error listing windows: {str(e)}") return [{"error": str(e)}] @mcp.tool() def wait_milliseconds(milliseconds: int) -> str: """ Wait for a specified number of milliseconds. Args: milliseconds: Number of milliseconds to wait Returns: Success message after waiting """ try: import time seconds = milliseconds / 1000.0 time.sleep(seconds) return f"Successfully waited for {milliseconds} milliseconds" except Exception as e: return f"Error waiting for {milliseconds} milliseconds: {str(e)}" @mcp.tool() def activate_window( title_pattern: str, use_regex: bool = False, threshold: int = 60 ) -> str: """ Activate a window (bring it to the foreground) by matching its title. Args: title_pattern: Pattern to match window title use_regex: If True, treat the pattern as a regex, otherwise use fuzzy matching threshold: Minimum score (0-100) required for a fuzzy match Returns: Success or error message """ try: # Get all windows all_windows = gw.getAllWindows() # Convert to list of dictionaries for _find_matching_window windows = [] for window in all_windows: if window.title: # Only include windows with titles windows.append( { "title": window.title, "window_obj": window, # Store the actual window object } ) # Find matching window using our improved function matched_window_dict = _find_matching_window( windows, title_pattern, use_regex, threshold ) if not matched_window_dict: log(f"No window found matching pattern: {title_pattern}") return f"Error: No window found matching pattern: {title_pattern}" # Get the actual window object matched_window = matched_window_dict["window_obj"] # Activate the window matched_window.activate() return f"Successfully activated window: '{matched_window.title}'" except Exception as e: log(f"Error activating window: {str(e)}") return f"Error activating window: {str(e)}" def main(): """Main entry point for the MCP server.""" pyautogui.FAILSAFE = True try: # Run the server log("Computer Control MCP Server Started...") mcp.run() except KeyboardInterrupt: log("Server shutting down...") except Exception as e: log(f"Error: {str(e)}") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AB498/computer-control-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server