"""
Windows MCP Server - Comprehensive Windows PC automation and control.
This MCP server provides AI with tools to:
- Capture and view the screen
- Control mouse and keyboard
- Manage windows and applications
- Control system operations (restart, shutdown, logout)
- Automate PC tasks
"""
import asyncio
import base64
import io
import os
import subprocess
import sys
import time
import logging
import contextlib
from typing import Any, Optional
import mss
import psutil
import pyautogui
from PIL import Image
try:
import win32api
import win32con
import win32gui
import win32process
WINDOWS_AVAILABLE = True
except ImportError:
WINDOWS_AVAILABLE = False
print("Warning: pywin32 not available. Some features will be limited.")
try:
from windows_mcp.desktop.service import Desktop
from windows_mcp.tree.service import Tree
DESKTOP_SERVICE_AVAILABLE = True
except ImportError:
DESKTOP_SERVICE_AVAILABLE = False
print("Warning: Desktop service not available. State tool will be limited.")
try:
from windows_mcp.utils import (
retry_on_failure,
validate_coordinates,
validate_label,
validate_string,
validate_number,
create_error_response,
create_success_response,
PerformanceTimer,
sanitize_file_path
)
UTILS_AVAILABLE = True
except ImportError:
UTILS_AVAILABLE = False
print("Warning: Utils not available. Error handling will be basic.")
# Provide fallback functions
def create_error_response(msg, tool=""): return [TextContent(type="text", text=f"Error: {msg}")]
def create_success_response(msg, extra=None): return [TextContent(type="text", text=msg)]
from mcp.server import Server
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger('windows-mcp.server')
# Configure PyAutoGUI safety
pyautogui.FAILSAFE = True
pyautogui.PAUSE = 0.1
# Initialize MCP server
app = Server("windows-mcp-server")
# Initialize desktop service and cached state
desktop_service = Desktop() if DESKTOP_SERVICE_AVAILABLE else None
cached_tree_state = None
cached_tree_timestamp = 0
logger.info("=" * 60)
logger.info("Windows MCP Server v0.4.0 - ULTRA-FAST Edition Starting...")
logger.info(f"Windows API available: {WINDOWS_AVAILABLE}")
logger.info(f"Desktop Service available: {DESKTOP_SERVICE_AVAILABLE}")
logger.info(f"Utils available: {UTILS_AVAILABLE}")
logger.info("=" * 60)
# ============================================================================
# SCREEN CAPTURE TOOLS
# ============================================================================
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List all available Windows automation tools."""
return [
# Desktop State Tool (MOST IMPORTANT - USE THIS FIRST!)
Tool(
name="get_desktop_state",
description="[CRITICAL - OPTIMIZED] Capture comprehensive desktop state with ALL interactive elements (buttons, links, text fields). Returns text-only by default (FAST!). Set use_vision=true to get annotated screenshot saved to file (JPEG compressed, 10x faster than base64). Each element has numbered label for click_element/type_element. USE THIS TOOL FIRST before any actions!",
inputSchema={
"type": "object",
"properties": {
"use_vision": {
"type": "boolean",
"description": "Save annotated screenshot to temp file (JPEG format, optimized for speed). Returns file path instead of embedding image.",
"default": False
},
"include_informative": {
"type": "boolean",
"description": "Include informative text elements (labels, status text)",
"default": True
},
"include_scrollable": {
"type": "boolean",
"description": "Include scrollable elements with scroll state",
"default": True
}
}
}
),
# Enhanced Click Tool
Tool(
name="click_element",
description="Click on a UI element using its label from get_desktop_state. More reliable than mouse_click for UI automation. Use the label number from the desktop state.",
inputSchema={
"type": "object",
"properties": {
"label": {
"type": "integer",
"description": "Element label number from get_desktop_state output"
},
"button": {
"type": "string",
"description": "Mouse button to click",
"enum": ["left", "right", "middle"],
"default": "left"
},
"clicks": {
"type": "integer",
"description": "Number of clicks (1=single, 2=double)",
"default": 1
}
},
"required": ["label"]
}
),
# Enhanced Type Tool
Tool(
name="type_into_element",
description="Type text into a UI element using its label from get_desktop_state. Automatically clicks the element first. More reliable than keyboard_type for filling forms.",
inputSchema={
"type": "object",
"properties": {
"label": {
"type": "integer",
"description": "Element label number from get_desktop_state output"
},
"text": {
"type": "string",
"description": "Text to type into the element"
},
"clear_first": {
"type": "boolean",
"description": "Clear existing text before typing (Ctrl+A, Delete)",
"default": False
},
"press_enter": {
"type": "boolean",
"description": "Press Enter after typing",
"default": False
}
},
"required": ["label", "text"]
}
),
# Screen Capture Tools
Tool(
name="screenshot",
description="[OPTIMIZED] Capture screenshot - MUCH FASTER now! Saves to temp file by default (JPEG compressed). Optionally returns base64 or saves to custom path. Use save_to_file=true for 10x speed improvement.",
inputSchema={
"type": "object",
"properties": {
"monitor": {
"type": "integer",
"description": "Monitor number (0=all, 1=primary)",
"default": 1
},
"save_path": {
"type": "string",
"description": "Custom file path to save (optional). If not provided, saves to temp folder."
},
"save_to_file": {
"type": "boolean",
"description": "Save to temp file instead of base64 (10x faster, recommended!)",
"default": True
},
"format": {
"type": "string",
"description": "Image format: jpeg (fast, small) or png (slower, larger)",
"enum": ["jpeg", "png"],
"default": "jpeg"
},
"quality": {
"type": "integer",
"description": "JPEG quality 1-100 (85 recommended for speed/quality balance)",
"default": 85
}
}
}
),
Tool(
name="get_screen_size",
description="Get the dimensions of the screen(s)",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="locate_on_screen",
description="Find an image on the screen and return its coordinates",
inputSchema={
"type": "object",
"properties": {
"image_path": {
"type": "string",
"description": "Path to the image file to locate on screen"
},
"confidence": {
"type": "number",
"description": "Confidence threshold (0.0 to 1.0)",
"default": 0.9
}
},
"required": ["image_path"]
}
),
# Mouse Control Tools
Tool(
name="mouse_move",
description="Move the mouse cursor to specific coordinates",
inputSchema={
"type": "object",
"properties": {
"x": {
"type": "integer",
"description": "X coordinate"
},
"y": {
"type": "integer",
"description": "Y coordinate"
},
"duration": {
"type": "number",
"description": "Duration of movement in seconds",
"default": 0.25
}
},
"required": ["x", "y"]
}
),
Tool(
name="mouse_click",
description="Click the mouse at current position or specified coordinates",
inputSchema={
"type": "object",
"properties": {
"x": {
"type": "integer",
"description": "X coordinate (optional, uses current position if not provided)"
},
"y": {
"type": "integer",
"description": "Y coordinate (optional, uses current position if not provided)"
},
"button": {
"type": "string",
"description": "Mouse button to click",
"enum": ["left", "right", "middle"],
"default": "left"
},
"clicks": {
"type": "integer",
"description": "Number of clicks",
"default": 1
}
}
}
),
Tool(
name="mouse_scroll",
description="Scroll the mouse wheel",
inputSchema={
"type": "object",
"properties": {
"clicks": {
"type": "integer",
"description": "Number of scroll clicks (positive for up, negative for down)"
}
},
"required": ["clicks"]
}
),
Tool(
name="get_mouse_position",
description="Get the current mouse cursor position",
inputSchema={
"type": "object",
"properties": {}
}
),
# Keyboard Control Tools
Tool(
name="keyboard_type",
description="Type text using the keyboard",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Text to type"
},
"interval": {
"type": "number",
"description": "Interval between key presses in seconds",
"default": 0.01
}
},
"required": ["text"]
}
),
Tool(
name="keyboard_press",
description="Press a specific key or key combination",
inputSchema={
"type": "object",
"properties": {
"keys": {
"type": "array",
"description": "Key(s) to press (e.g., ['ctrl', 'c'] for copy)",
"items": {"type": "string"}
}
},
"required": ["keys"]
}
),
# Window Management Tools
Tool(
name="list_windows",
description="List all open windows with their titles and handles",
inputSchema={
"type": "object",
"properties": {
"visible_only": {
"type": "boolean",
"description": "Only list visible windows",
"default": True
}
}
}
),
Tool(
name="get_active_window",
description="Get information about the currently active window",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="activate_window",
description="Activate (bring to front) a window by title or handle",
inputSchema={
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Window title (partial match supported)"
},
"handle": {
"type": "integer",
"description": "Window handle (HWND)"
}
}
}
),
Tool(
name="close_window",
description="Close a window by title or handle",
inputSchema={
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Window title (partial match supported)"
},
"handle": {
"type": "integer",
"description": "Window handle (HWND)"
}
}
}
),
Tool(
name="resize_window",
description="Resize and/or move a window",
inputSchema={
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Window title (partial match supported)"
},
"handle": {
"type": "integer",
"description": "Window handle (HWND)"
},
"x": {"type": "integer", "description": "X position"},
"y": {"type": "integer", "description": "Y position"},
"width": {"type": "integer", "description": "Width"},
"height": {"type": "integer", "description": "Height"}
}
}
),
# Application Control Tools
Tool(
name="launch_application",
description="Launch an application by path or command",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to executable or command to run"
},
"args": {
"type": "array",
"description": "Command line arguments",
"items": {"type": "string"}
},
"working_dir": {
"type": "string",
"description": "Working directory for the application"
}
},
"required": ["path"]
}
),
Tool(
name="kill_process",
description="Kill a process by name or PID",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Process name (e.g., 'notepad.exe')"
},
"pid": {
"type": "integer",
"description": "Process ID"
}
}
}
),
Tool(
name="list_processes",
description="List all running processes",
inputSchema={
"type": "object",
"properties": {
"name_filter": {
"type": "string",
"description": "Filter processes by name (partial match)"
}
}
}
),
# System Control Tools
Tool(
name="shutdown",
description="Shutdown the computer",
inputSchema={
"type": "object",
"properties": {
"force": {
"type": "boolean",
"description": "Force shutdown without waiting for applications",
"default": False
},
"delay": {
"type": "integer",
"description": "Delay in seconds before shutdown",
"default": 0
}
}
}
),
Tool(
name="restart",
description="Restart the computer",
inputSchema={
"type": "object",
"properties": {
"force": {
"type": "boolean",
"description": "Force restart without waiting for applications",
"default": False
},
"delay": {
"type": "integer",
"description": "Delay in seconds before restart",
"default": 0
}
}
}
),
Tool(
name="logout",
description="Log out the current user",
inputSchema={
"type": "object",
"properties": {
"force": {
"type": "boolean",
"description": "Force logout without waiting for applications",
"default": False
}
}
}
),
Tool(
name="lock_screen",
description="Lock the workstation",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="get_system_info",
description="Get system information (CPU, memory, disk usage, etc.)",
inputSchema={
"type": "object",
"properties": {}
}
),
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent | ImageContent]:
"""Handle tool execution."""
# Desktop State Tool
if name == "get_desktop_state":
return await tool_get_desktop_state(arguments)
elif name == "click_element":
return await tool_click_element(arguments)
elif name == "type_into_element":
return await tool_type_into_element(arguments)
# Screen Capture Tools
elif name == "screenshot":
return await tool_screenshot(arguments)
elif name == "get_screen_size":
return await tool_get_screen_size(arguments)
elif name == "locate_on_screen":
return await tool_locate_on_screen(arguments)
# Mouse Control Tools
elif name == "mouse_move":
return await tool_mouse_move(arguments)
elif name == "mouse_click":
return await tool_mouse_click(arguments)
elif name == "mouse_scroll":
return await tool_mouse_scroll(arguments)
elif name == "get_mouse_position":
return await tool_get_mouse_position(arguments)
# Keyboard Control Tools
elif name == "keyboard_type":
return await tool_keyboard_type(arguments)
elif name == "keyboard_press":
return await tool_keyboard_press(arguments)
# Window Management Tools
elif name == "list_windows":
return await tool_list_windows(arguments)
elif name == "get_active_window":
return await tool_get_active_window(arguments)
elif name == "activate_window":
return await tool_activate_window(arguments)
elif name == "close_window":
return await tool_close_window(arguments)
elif name == "resize_window":
return await tool_resize_window(arguments)
# Application Control Tools
elif name == "launch_application":
return await tool_launch_application(arguments)
elif name == "kill_process":
return await tool_kill_process(arguments)
elif name == "list_processes":
return await tool_list_processes(arguments)
# System Control Tools
elif name == "shutdown":
return await tool_shutdown(arguments)
elif name == "restart":
return await tool_restart(arguments)
elif name == "logout":
return await tool_logout(arguments)
elif name == "lock_screen":
return await tool_lock_screen(arguments)
elif name == "get_system_info":
return await tool_get_system_info(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
# ============================================================================
# DESKTOP STATE TOOL IMPLEMENTATIONS
# ============================================================================
@retry_on_failure(max_retries=2, delay=0.5) if UTILS_AVAILABLE else (lambda f: f)
async def tool_get_desktop_state(args: dict) -> list[TextContent | ImageContent]:
"""Get comprehensive desktop state with UI element detection."""
global cached_tree_state, cached_tree_timestamp
logger.info("Getting desktop state...")
if not DESKTOP_SERVICE_AVAILABLE or desktop_service is None:
return create_error_response(
"Desktop service not available. Install uiautomation library.",
"get_desktop_state"
)
try:
with PerformanceTimer("get_desktop_state") if UTILS_AVAILABLE else contextlib.nullcontext():
# Validate arguments
use_vision = args.get("use_vision", False)
include_informative = args.get("include_informative", True)
include_scrollable = args.get("include_scrollable", True)
if not isinstance(use_vision, bool):
return create_error_response("use_vision must be a boolean", "get_desktop_state")
if not isinstance(include_informative, bool):
return create_error_response("include_informative must be a boolean", "get_desktop_state")
if not isinstance(include_scrollable, bool):
return create_error_response("include_scrollable must be a boolean", "get_desktop_state")
# Get the tree service
tree = Tree(desktop_service)
# Get the UI tree state with caching
tree_state = tree.get_state(force_refresh=False)
# Update cache
cached_tree_state = tree_state
cached_tree_timestamp = time.time()
logger.info(f"Found {len(tree_state.interactive_nodes)} interactive elements")
# Get system information with error handling
try:
windows_version = desktop_service.get_windows_version()
except Exception as e:
logger.warning(f"Could not get Windows version: {e}")
windows_version = "Unknown"
try:
default_language = desktop_service.get_default_language()
except Exception as e:
logger.warning(f"Could not get default language: {e}")
default_language = "Unknown"
# Build the response
result = []
# Add system info
system_info = f"""=== DESKTOP STATE ===
Windows Version: {windows_version}
Default Language: {default_language}
Encoding: {getattr(desktop_service, 'encoding', 'utf-8')}
Scan Time: {time.strftime('%Y-%m-%d %H:%M:%S')}
"""
# Add interactive elements (most important!)
interactive_text = tree_state.interactive_elements_to_string()
system_info += f"=== INTERACTIVE ELEMENTS ===\n"
system_info += "(Use these labels with click_element and type_into_element tools)\n\n"
system_info += interactive_text + "\n\n"
# Add informative elements if requested
if include_informative:
informative_text = tree_state.informative_elements_to_string()
system_info += f"=== INFORMATIVE ELEMENTS ===\n"
system_info += informative_text + "\n\n"
# Add scrollable elements if requested
if include_scrollable:
scrollable_text = tree_state.scrollable_elements_to_string()
system_info += f"=== SCROLLABLE ELEMENTS ===\n"
system_info += scrollable_text + "\n\n"
# Add statistics
system_info += f"=== SUMMARY ===\n"
system_info += f"Interactive Elements: {len(tree_state.interactive_nodes)}\n"
system_info += f"Informative Elements: {len(tree_state.informative_nodes)}\n"
system_info += f"Scrollable Elements: {len(tree_state.scrollable_nodes)}\n"
system_info += f"\nTip: Use click_element(label=N) or type_into_element(label=N) to interact with elements.\n"
result.append(TextContent(type="text", text=system_info))
# Add annotated screenshot if requested
if use_vision and tree_state.interactive_nodes:
try:
logger.info("Generating annotated screenshot (FAST MODE)...")
# OPTIMIZED: Save to file instead of base64 (10x faster!)
screenshot_bytes, file_path = tree.create_annotated_screenshot(
tree_state.interactive_nodes,
scale=0.4, # Smaller = faster
save_to_file=True
)
if file_path:
# Return file path (much faster than base64!)
result.append(TextContent(
type="text",
text=f"📸 Annotated screenshot saved to: {file_path}\n\n"
f"💡 Open this file to see all interactive elements labeled.\n"
f" Each element is marked with its label number in a colored box.\n"
f" Image format: JPEG (compressed for speed)"
))
logger.info(f"Screenshot saved successfully: {file_path}")
else:
# Fallback: use base64 (slower but works)
screenshot_b64 = base64.b64encode(screenshot_bytes).decode()
result.append(ImageContent(
type="image",
data=screenshot_b64,
mimeType="image/jpeg"
))
result.append(TextContent(
type="text",
text="📸 Annotated screenshot: Each interactive element is marked with its label number."
))
logger.info("Screenshot generated (base64 mode)")
except Exception as e:
logger.error(f"Failed to generate screenshot: {e}", exc_info=True)
result.append(TextContent(
type="text",
text=f"⚠️ Warning: Could not generate annotated screenshot: {str(e)}"
))
logger.info("Desktop state retrieved successfully")
return result
except Exception as e:
logger.error(f"Error in get_desktop_state: {e}", exc_info=True)
return create_error_response(f"Failed to get desktop state: {str(e)}", "get_desktop_state")
@retry_on_failure(max_retries=2, delay=0.3) if UTILS_AVAILABLE else (lambda f: f)
async def tool_click_element(args: dict) -> list[TextContent]:
"""Click on a UI element by its label."""
global cached_tree_state, cached_tree_timestamp
logger.info(f"Clicking element with args: {args}")
# Check if cache exists and is recent
if cached_tree_state is None:
return create_error_response(
"No cached desktop state. Please run get_desktop_state first.",
"click_element"
)
# Check if cache is stale (older than 30 seconds)
if UTILS_AVAILABLE and (time.time() - cached_tree_timestamp) > 30:
logger.warning("Cached tree state is stale (>30s old). Consider refreshing with get_desktop_state.")
try:
# Validate label
if "label" not in args:
return create_error_response("Missing required parameter: label", "click_element")
label = args["label"]
if UTILS_AVAILABLE:
is_valid, error_msg = validate_label(label, len(cached_tree_state.interactive_nodes))
if not is_valid:
return create_error_response(error_msg, "click_element")
else:
if not isinstance(label, int) or label < 0 or label >= len(cached_tree_state.interactive_nodes):
return create_error_response(
f"Invalid label {label}. Valid range: 0-{len(cached_tree_state.interactive_nodes)-1}",
"click_element"
)
button = args.get("button", "left")
clicks = args.get("clicks", 1)
# Validate button
if button not in ["left", "right", "middle"]:
return create_error_response(f"Invalid button: {button}. Must be 'left', 'right', or 'middle'", "click_element")
# Validate clicks
if not isinstance(clicks, int) or clicks < 1 or clicks > 3:
return create_error_response("clicks must be 1, 2, or 3", "click_element")
element = cached_tree_state.interactive_nodes[label]
# Get click coordinates
x, y = element.center.x, element.center.y
# Validate coordinates are on screen
if UTILS_AVAILABLE:
screen_size = pyautogui.size()
is_valid, error_msg = validate_coordinates(x, y, screen_size.width, screen_size.height)
if not is_valid:
return create_error_response(f"Element coordinates invalid: {error_msg}", "click_element")
# Perform click
logger.info(f"Clicking element {label} at ({x},{y}) with {button} button, {clicks} clicks")
pyautogui.click(x=x, y=y, button=button, clicks=clicks, duration=0.2)
click_type = "Triple-clicked" if clicks == 3 else ("Double-clicked" if clicks == 2 else "Clicked")
success_msg = (
f"✓ {click_type} {button} button on element {label}: '{element.name}' "
f"({element.control_type}) at ({x},{y}) in '{element.app_name}'"
)
logger.info(f"Click successful: {success_msg}")
return create_success_response(success_msg)
except KeyError as e:
return create_error_response(f"Missing required parameter: {str(e)}", "click_element")
except Exception as e:
logger.error(f"Error in click_element: {e}", exc_info=True)
return create_error_response(f"Failed to click element: {str(e)}", "click_element")
@retry_on_failure(max_retries=2, delay=0.3) if UTILS_AVAILABLE else (lambda f: f)
async def tool_type_into_element(args: dict) -> list[TextContent]:
"""Type text into a UI element."""
global cached_tree_state, cached_tree_timestamp
logger.info(f"Typing into element with args: {args}")
# Check if cache exists
if cached_tree_state is None:
return create_error_response(
"No cached desktop state. Please run get_desktop_state first.",
"type_into_element"
)
# Warn if cache is stale
if UTILS_AVAILABLE and (time.time() - cached_tree_timestamp) > 30:
logger.warning("Cached tree state is stale (>30s old). Consider refreshing with get_desktop_state.")
try:
# Validate required parameters
if "label" not in args:
return create_error_response("Missing required parameter: label", "type_into_element")
if "text" not in args:
return create_error_response("Missing required parameter: text", "type_into_element")
label = args["label"]
text = args["text"]
clear_first = args.get("clear_first", False)
press_enter = args.get("press_enter", False)
# Validate label
if UTILS_AVAILABLE:
is_valid, error_msg = validate_label(label, len(cached_tree_state.interactive_nodes))
if not is_valid:
return create_error_response(error_msg, "type_into_element")
# Validate text
is_valid, error_msg = validate_string(text, "text", min_length=0, max_length=10000)
if not is_valid:
return create_error_response(error_msg, "type_into_element")
else:
if not isinstance(label, int) or label < 0 or label >= len(cached_tree_state.interactive_nodes):
return create_error_response(
f"Invalid label {label}. Valid range: 0-{len(cached_tree_state.interactive_nodes)-1}",
"type_into_element"
)
if not isinstance(text, str):
return create_error_response("text must be a string", "type_into_element")
# Validate boolean parameters
if not isinstance(clear_first, bool):
return create_error_response("clear_first must be a boolean", "type_into_element")
if not isinstance(press_enter, bool):
return create_error_response("press_enter must be a boolean", "type_into_element")
element = cached_tree_state.interactive_nodes[label]
# Click the element first to focus it
x, y = element.center.x, element.center.y
logger.info(f"Clicking element {label} at ({x},{y}) to focus")
pyautogui.click(x=x, y=y, duration=0.2)
time.sleep(0.15) # Give time for focus
# Clear existing text if requested
if clear_first:
logger.info("Clearing existing text")
pyautogui.hotkey('ctrl', 'a')
time.sleep(0.05)
pyautogui.press('delete')
time.sleep(0.05)
# Type the text
logger.info(f"Typing text: {text[:50]}{'...' if len(text) > 50 else ''}")
pyautogui.write(text, interval=0.01)
# Press enter if requested
if press_enter:
time.sleep(0.1)
pyautogui.press('enter')
logger.info("Pressed Enter")
action = "✓ Typed (cleared first)" if clear_first else "✓ Typed"
enter_msg = " and pressed Enter" if press_enter else ""
success_msg = (
f"{action} text into element {label}: '{element.name}' "
f"({element.control_type}) in '{element.app_name}'{enter_msg}"
)
logger.info(f"Type successful: {success_msg}")
return create_success_response(success_msg)
except KeyError as e:
return create_error_response(f"Missing required parameter: {str(e)}", "type_into_element")
except Exception as e:
logger.error(f"Error in type_into_element: {e}", exc_info=True)
return create_error_response(f"Failed to type into element: {str(e)}", "type_into_element")
# ============================================================================
# SCREEN CAPTURE TOOL IMPLEMENTATIONS
# ============================================================================
async def tool_screenshot(args: dict) -> list[TextContent | ImageContent]:
"""Capture screenshot - OPTIMIZED for speed!"""
try:
monitor = args.get("monitor", 1)
save_path = args.get("save_path")
save_to_file = args.get("save_to_file", True)
img_format = args.get("format", "jpeg").upper()
quality = args.get("quality", 85)
with mss.mss() as sct:
if monitor == 0:
screenshot = sct.grab(sct.monitors[0])
else:
if monitor > len(sct.monitors) - 1:
return [TextContent(
type="text",
text=f"Error: Monitor {monitor} not found. Available: {len(sct.monitors) - 1}"
)]
screenshot = sct.grab(sct.monitors[monitor])
img = Image.frombytes("RGB", screenshot.size, screenshot.rgb)
# OPTIMIZED: Save to file (10x faster!)
if save_to_file or save_path:
import tempfile
if not save_path:
# Auto-generate temp file path
temp_dir = tempfile.gettempdir()
timestamp = int(time.time() * 1000)
ext = "jpg" if img_format == "JPEG" else "png"
save_path = os.path.join(temp_dir, f"windows_mcp_screen_{timestamp}.{ext}")
# Save with optimization
if img_format == "JPEG":
img.save(save_path, format="JPEG", quality=quality, optimize=True)
else:
img.save(save_path, format="PNG", optimize=True)
logger.info(f"Screenshot saved to: {save_path}")
return [TextContent(
type="text",
text=f"✅ Screenshot captured (Monitor {monitor})\n"
f"📁 Saved to: {save_path}\n"
f"📐 Size: {screenshot.width}x{screenshot.height}\n"
f"🎨 Format: {img_format} " + (f"(Quality: {quality})" if img_format == "JPEG" else "")
)]
# Fallback: base64 mode (slower)
buffer = io.BytesIO()
mime_type = f"image/{img_format.lower()}"
if img_format == "JPEG":
img.save(buffer, format="JPEG", quality=quality, optimize=True)
else:
img.save(buffer, format="PNG", optimize=True)
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return [
ImageContent(type="image", data=img_base64, mimeType=mime_type),
TextContent(
type="text",
text=f"Screenshot (Monitor {monitor}): {screenshot.width}x{screenshot.height}"
)
]
except Exception as e:
logger.error(f"Screenshot error: {e}", exc_info=True)
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def tool_get_screen_size(args: dict) -> list[TextContent]:
"""Get screen dimensions."""
try:
width, height = pyautogui.size()
# Also get info about all monitors
with mss.mss() as sct:
monitors_info = []
for i, monitor in enumerate(sct.monitors[1:], 1):
monitors_info.append(
f"Monitor {i}: {monitor['width']}x{monitor['height']} "
f"at ({monitor['left']}, {monitor['top']})"
)
return [TextContent(
type="text",
text=f"Primary screen size: {width}x{height}\n" +
"\n".join(monitors_info)
)]
except Exception as e:
return [TextContent(type="text", text=f"Error getting screen size: {str(e)}")]
async def tool_locate_on_screen(args: dict) -> list[TextContent]:
"""Locate an image on screen."""
try:
image_path = args["image_path"]
confidence = args.get("confidence", 0.9)
if not os.path.exists(image_path):
return [TextContent(type="text", text=f"Error: Image file not found: {image_path}")]
location = pyautogui.locateOnScreen(image_path, confidence=confidence)
if location:
center = pyautogui.center(location)
return [TextContent(
type="text",
text=f"Image found at: ({location.left}, {location.top})\n"
f"Size: {location.width}x{location.height}\n"
f"Center: ({center.x}, {center.y})"
)]
else:
return [TextContent(type="text", text="Image not found on screen")]
except Exception as e:
return [TextContent(type="text", text=f"Error locating image: {str(e)}")]
# ============================================================================
# MOUSE CONTROL TOOL IMPLEMENTATIONS
# ============================================================================
async def tool_mouse_move(args: dict) -> list[TextContent]:
"""Move mouse cursor."""
try:
x = args["x"]
y = args["y"]
duration = args.get("duration", 0.25)
pyautogui.moveTo(x, y, duration=duration)
return [TextContent(type="text", text=f"Mouse moved to ({x}, {y})")]
except Exception as e:
return [TextContent(type="text", text=f"Error moving mouse: {str(e)}")]
async def tool_mouse_click(args: dict) -> list[TextContent]:
"""Click mouse."""
try:
x = args.get("x")
y = args.get("y")
button = args.get("button", "left")
clicks = args.get("clicks", 1)
if x is not None and y is not None:
pyautogui.click(x, y, clicks=clicks, button=button)
location = f"at ({x}, {y})"
else:
pyautogui.click(clicks=clicks, button=button)
pos = pyautogui.position()
location = f"at current position ({pos.x}, {pos.y})"
click_type = "Double-clicked" if clicks == 2 else "Clicked"
return [TextContent(type="text", text=f"{click_type} {button} button {location}")]
except Exception as e:
return [TextContent(type="text", text=f"Error clicking mouse: {str(e)}")]
async def tool_mouse_scroll(args: dict) -> list[TextContent]:
"""Scroll mouse wheel."""
try:
clicks = args["clicks"]
pyautogui.scroll(clicks)
direction = "up" if clicks > 0 else "down"
return [TextContent(type="text", text=f"Scrolled {abs(clicks)} clicks {direction}")]
except Exception as e:
return [TextContent(type="text", text=f"Error scrolling: {str(e)}")]
async def tool_get_mouse_position(args: dict) -> list[TextContent]:
"""Get current mouse position."""
try:
x, y = pyautogui.position()
return [TextContent(type="text", text=f"Mouse position: ({x}, {y})")]
except Exception as e:
return [TextContent(type="text", text=f"Error getting mouse position: {str(e)}")]
# ============================================================================
# KEYBOARD CONTROL TOOL IMPLEMENTATIONS
# ============================================================================
async def tool_keyboard_type(args: dict) -> list[TextContent]:
"""Type text."""
try:
text = args["text"]
interval = args.get("interval", 0.01)
pyautogui.write(text, interval=interval)
return [TextContent(type="text", text=f"Typed text: {text[:50]}{'...' if len(text) > 50 else ''}")]
except Exception as e:
return [TextContent(type="text", text=f"Error typing: {str(e)}")]
async def tool_keyboard_press(args: dict) -> list[TextContent]:
"""Press key(s)."""
try:
keys = args["keys"]
if len(keys) == 1:
pyautogui.press(keys[0])
return [TextContent(type="text", text=f"Pressed key: {keys[0]}")]
else:
pyautogui.hotkey(*keys)
return [TextContent(type="text", text=f"Pressed key combination: {'+'.join(keys)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error pressing keys: {str(e)}")]
# ============================================================================
# WINDOW MANAGEMENT TOOL IMPLEMENTATIONS
# ============================================================================
async def tool_list_windows(args: dict) -> list[TextContent]:
"""List all windows."""
if not WINDOWS_AVAILABLE:
return [TextContent(type="text", text="Error: Windows API not available")]
try:
visible_only = args.get("visible_only", True)
windows = []
def callback(hwnd, extra):
if visible_only and not win32gui.IsWindowVisible(hwnd):
return
title = win32gui.GetWindowText(hwnd)
if title:
try:
_, pid = win32process.GetWindowThreadProcessId(hwnd)
process = psutil.Process(pid)
windows.append({
"handle": hwnd,
"title": title,
"pid": pid,
"process": process.name()
})
except:
windows.append({
"handle": hwnd,
"title": title
})
win32gui.EnumWindows(callback, None)
if not windows:
return [TextContent(type="text", text="No windows found")]
result = f"Found {len(windows)} window(s):\n\n"
for w in windows:
result += f"Handle: {w['handle']}\n"
result += f"Title: {w['title']}\n"
if 'process' in w:
result += f"Process: {w['process']} (PID: {w['pid']})\n"
result += "\n"
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(type="text", text=f"Error listing windows: {str(e)}")]
async def tool_get_active_window(args: dict) -> list[TextContent]:
"""Get active window info."""
if not WINDOWS_AVAILABLE:
return [TextContent(type="text", text="Error: Windows API not available")]
try:
hwnd = win32gui.GetForegroundWindow()
title = win32gui.GetWindowText(hwnd)
rect = win32gui.GetWindowRect(hwnd)
_, pid = win32process.GetWindowThreadProcessId(hwnd)
process = psutil.Process(pid)
result = f"Active Window:\n"
result += f"Handle: {hwnd}\n"
result += f"Title: {title}\n"
result += f"Process: {process.name()} (PID: {pid})\n"
result += f"Position: ({rect[0]}, {rect[1]})\n"
result += f"Size: {rect[2] - rect[0]}x{rect[3] - rect[1]}"
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(type="text", text=f"Error getting active window: {str(e)}")]
async def tool_activate_window(args: dict) -> list[TextContent]:
"""Activate a window."""
if not WINDOWS_AVAILABLE:
return [TextContent(type="text", text="Error: Windows API not available")]
try:
handle = args.get("handle")
title = args.get("title")
if handle:
hwnd = handle
elif title:
hwnd = None
def callback(h, extra):
nonlocal hwnd
if title.lower() in win32gui.GetWindowText(h).lower():
hwnd = h
return False
return True
win32gui.EnumWindows(callback, None)
if not hwnd:
return [TextContent(type="text", text=f"Window not found with title: {title}")]
else:
return [TextContent(type="text", text="Error: Must provide either 'handle' or 'title'")]
win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(hwnd)
window_title = win32gui.GetWindowText(hwnd)
return [TextContent(type="text", text=f"Activated window: {window_title}")]
except Exception as e:
return [TextContent(type="text", text=f"Error activating window: {str(e)}")]
async def tool_close_window(args: dict) -> list[TextContent]:
"""Close a window."""
if not WINDOWS_AVAILABLE:
return [TextContent(type="text", text="Error: Windows API not available")]
try:
handle = args.get("handle")
title = args.get("title")
if handle:
hwnd = handle
elif title:
hwnd = None
def callback(h, extra):
nonlocal hwnd
if title.lower() in win32gui.GetWindowText(h).lower():
hwnd = h
return False
return True
win32gui.EnumWindows(callback, None)
if not hwnd:
return [TextContent(type="text", text=f"Window not found with title: {title}")]
else:
return [TextContent(type="text", text="Error: Must provide either 'handle' or 'title'")]
window_title = win32gui.GetWindowText(hwnd)
win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)
return [TextContent(type="text", text=f"Closed window: {window_title}")]
except Exception as e:
return [TextContent(type="text", text=f"Error closing window: {str(e)}")]
async def tool_resize_window(args: dict) -> list[TextContent]:
"""Resize/move a window."""
if not WINDOWS_AVAILABLE:
return [TextContent(type="text", text="Error: Windows API not available")]
try:
handle = args.get("handle")
title = args.get("title")
if handle:
hwnd = handle
elif title:
hwnd = None
def callback(h, extra):
nonlocal hwnd
if title.lower() in win32gui.GetWindowText(h).lower():
hwnd = h
return False
return True
win32gui.EnumWindows(callback, None)
if not hwnd:
return [TextContent(type="text", text=f"Window not found with title: {title}")]
else:
return [TextContent(type="text", text="Error: Must provide either 'handle' or 'title'")]
# Get current rect if some values not provided
current_rect = win32gui.GetWindowRect(hwnd)
x = args.get("x", current_rect[0])
y = args.get("y", current_rect[1])
width = args.get("width", current_rect[2] - current_rect[0])
height = args.get("height", current_rect[3] - current_rect[1])
win32gui.MoveWindow(hwnd, x, y, width, height, True)
window_title = win32gui.GetWindowText(hwnd)
return [TextContent(
type="text",
text=f"Resized window '{window_title}' to ({x}, {y}) {width}x{height}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error resizing window: {str(e)}")]
# ============================================================================
# APPLICATION CONTROL TOOL IMPLEMENTATIONS
# ============================================================================
async def tool_launch_application(args: dict) -> list[TextContent]:
"""Launch an application."""
try:
path = args["path"]
cmd_args = args.get("args", [])
working_dir = args.get("working_dir")
cmd = [path] + cmd_args
if working_dir:
process = subprocess.Popen(cmd, cwd=working_dir)
else:
process = subprocess.Popen(cmd)
return [TextContent(
type="text",
text=f"Launched application: {path}\nPID: {process.pid}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error launching application: {str(e)}")]
async def tool_kill_process(args: dict) -> list[TextContent]:
"""Kill a process."""
try:
name = args.get("name")
pid = args.get("pid")
if pid:
process = psutil.Process(pid)
process_name = process.name()
process.kill()
return [TextContent(type="text", text=f"Killed process: {process_name} (PID: {pid})")]
elif name:
killed = []
for proc in psutil.process_iter(['pid', 'name']):
if name.lower() in proc.info['name'].lower():
proc.kill()
killed.append(f"{proc.info['name']} (PID: {proc.info['pid']})")
if killed:
return [TextContent(
type="text",
text=f"Killed {len(killed)} process(es):\n" + "\n".join(killed)
)]
else:
return [TextContent(type="text", text=f"No processes found matching: {name}")]
else:
return [TextContent(type="text", text="Error: Must provide either 'name' or 'pid'")]
except Exception as e:
return [TextContent(type="text", text=f"Error killing process: {str(e)}")]
async def tool_list_processes(args: dict) -> list[TextContent]:
"""List running processes."""
try:
name_filter = args.get("name_filter", "").lower()
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
if name_filter and name_filter not in proc.info['name'].lower():
continue
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Sort by CPU usage
processes.sort(key=lambda x: x.get('cpu_percent', 0), reverse=True)
# Limit to top 50
processes = processes[:50]
result = f"Found {len(processes)} process(es)" + (f" matching '{name_filter}'" if name_filter else "") + ":\n\n"
for p in processes:
result += f"PID: {p['pid']:6d} | {p['name']:30s} | "
result += f"CPU: {p.get('cpu_percent', 0):5.1f}% | "
result += f"MEM: {p.get('memory_percent', 0):5.1f}%\n"
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(type="text", text=f"Error listing processes: {str(e)}")]
# ============================================================================
# SYSTEM CONTROL TOOL IMPLEMENTATIONS
# ============================================================================
async def tool_shutdown(args: dict) -> list[TextContent]:
"""Shutdown the computer."""
try:
force = args.get("force", False)
delay = args.get("delay", 0)
if sys.platform == "win32":
cmd = ["shutdown", "/s", "/t", str(delay)]
if force:
cmd.append("/f")
subprocess.run(cmd)
return [TextContent(
type="text",
text=f"Shutdown initiated (delay: {delay}s, force: {force})"
)]
else:
return [TextContent(type="text", text="Error: Shutdown only supported on Windows")]
except Exception as e:
return [TextContent(type="text", text=f"Error initiating shutdown: {str(e)}")]
async def tool_restart(args: dict) -> list[TextContent]:
"""Restart the computer."""
try:
force = args.get("force", False)
delay = args.get("delay", 0)
if sys.platform == "win32":
cmd = ["shutdown", "/r", "/t", str(delay)]
if force:
cmd.append("/f")
subprocess.run(cmd)
return [TextContent(
type="text",
text=f"Restart initiated (delay: {delay}s, force: {force})"
)]
else:
return [TextContent(type="text", text="Error: Restart only supported on Windows")]
except Exception as e:
return [TextContent(type="text", text=f"Error initiating restart: {str(e)}")]
async def tool_logout(args: dict) -> list[TextContent]:
"""Logout current user."""
if not WINDOWS_AVAILABLE:
return [TextContent(type="text", text="Error: Windows API not available")]
try:
force = args.get("force", False)
flags = win32con.EWX_LOGOFF
if force:
flags |= win32con.EWX_FORCE
win32api.ExitWindowsEx(flags, 0)
return [TextContent(type="text", text=f"Logout initiated (force: {force})")]
except Exception as e:
return [TextContent(type="text", text=f"Error initiating logout: {str(e)}")]
async def tool_lock_screen(args: dict) -> list[TextContent]:
"""Lock the workstation."""
if not WINDOWS_AVAILABLE:
return [TextContent(type="text", text="Error: Windows API not available")]
try:
import ctypes
ctypes.windll.user32.LockWorkStation()
return [TextContent(type="text", text="Workstation locked")]
except Exception as e:
return [TextContent(type="text", text=f"Error locking workstation: {str(e)}")]
async def tool_get_system_info(args: dict) -> list[TextContent]:
"""Get system information."""
try:
import platform
# CPU info
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
cpu_freq = psutil.cpu_freq()
# Memory info
mem = psutil.virtual_memory()
# Disk info
disk = psutil.disk_usage('/')
# System info
boot_time = psutil.boot_time()
result = "=== SYSTEM INFORMATION ===\n\n"
result += f"Platform: {platform.system()} {platform.release()}\n"
result += f"Machine: {platform.machine()}\n"
result += f"Processor: {platform.processor()}\n\n"
result += f"CPU Usage: {cpu_percent}%\n"
result += f"CPU Cores: {cpu_count}\n"
if cpu_freq:
result += f"CPU Frequency: {cpu_freq.current:.2f} MHz\n\n"
result += f"Memory Total: {mem.total / (1024**3):.2f} GB\n"
result += f"Memory Used: {mem.used / (1024**3):.2f} GB ({mem.percent}%)\n"
result += f"Memory Available: {mem.available / (1024**3):.2f} GB\n\n"
result += f"Disk Total: {disk.total / (1024**3):.2f} GB\n"
result += f"Disk Used: {disk.used / (1024**3):.2f} GB ({disk.percent}%)\n"
result += f"Disk Free: {disk.free / (1024**3):.2f} GB\n\n"
result += f"Boot Time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(boot_time))}\n"
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(type="text", text=f"Error getting system info: {str(e)}")]
# ============================================================================
# MAIN ENTRY POINT
# ============================================================================
def main():
"""Run the Windows MCP server."""
import sys
from mcp.server.stdio import stdio_server
print("Starting Windows MCP Server...", file=sys.stderr)
print(f"Platform: {sys.platform}", file=sys.stderr)
print(f"Python: {sys.version}", file=sys.stderr)
if not WINDOWS_AVAILABLE:
print("WARNING: pywin32 not available, some features will be limited", file=sys.stderr)
async def run():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
asyncio.run(run())
if __name__ == "__main__":
main()