from windows_mcp.vdm.core import get_all_desktops, get_current_desktop, is_window_on_current_desktop
from windows_mcp.desktop.views import DesktopState, Window, Browser, Status, Size
from windows_mcp.desktop.config import PROCESS_PER_MONITOR_DPI_AWARE
from windows_mcp.tree.views import BoundingBox, TreeElementNode
from concurrent.futures import ThreadPoolExecutor, as_completed
from PIL import ImageGrab, ImageFont, ImageDraw, Image
from windows_mcp.tree.service import Tree
from locale import getpreferredencoding
from contextlib import contextmanager
from typing import Optional,Literal
from markdownify import markdownify
from fuzzywuzzy import process
from time import sleep,time
from psutil import Process
import win32process
import subprocess
import win32gui
import win32con
import requests
import logging
import base64
import ctypes
import csv
import re
import os
import io
import random
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
try:
ctypes.windll.shcore.SetProcessDpiAwareness(PROCESS_PER_MONITOR_DPI_AWARE)
except Exception:
ctypes.windll.user32.SetProcessDPIAware()
import windows_mcp.uia as uia
import pyautogui as pg
pg.FAILSAFE=False
pg.PAUSE=1.0
class Desktop:
def __init__(self):
self.encoding=getpreferredencoding()
self.tree=Tree(self)
self.desktop_state=None
def get_state(self,use_annotation:bool|str=True,use_vision:bool|str=False,use_dom:bool|str=False,as_bytes:bool|str=False,scale:float=1.0)->DesktopState:
use_annotation = use_annotation is True or (isinstance(use_annotation, str) and use_annotation.lower() == 'true')
use_vision = use_vision is True or (isinstance(use_vision, str) and use_vision.lower() == 'true')
use_dom = use_dom is True or (isinstance(use_dom, str) and use_dom.lower() == 'true')
as_bytes = as_bytes is True or (isinstance(as_bytes, str) and as_bytes.lower() == 'true')
start_time = time()
controls_handles=self.get_controls_handles() # Taskbar,Program Manager,Apps, Dialogs
windows,windows_handles=self.get_windows(controls_handles=controls_handles) # Apps
active_window=self.get_active_window(windows=windows) #Active Window
active_window_handle=active_window.handle if active_window else None
try:
active_desktop=get_current_desktop()
all_desktops=get_all_desktops()
except RuntimeError:
active_desktop = {'id': '00000000-0000-0000-0000-000000000000', 'name': 'Default Desktop'}
all_desktops = [active_desktop]
if active_window is not None and active_window in windows:
windows.remove(active_window)
logger.debug(f"Active window: {active_window or 'No Active Window Found'}")
logger.debug(f"Windows: {windows}")
#Preparing handles for Tree
other_windows_handles=list(controls_handles-windows_handles)
tree_state=self.tree.get_state(active_window_handle,other_windows_handles,use_dom=use_dom)
if use_vision:
if use_annotation:
nodes=tree_state.interactive_nodes
screenshot=self.get_annotated_screenshot(nodes=nodes)
else:
screenshot=self.get_screenshot()
if scale != 1.0:
screenshot = screenshot.resize((int(screenshot.width * scale), int(screenshot.height * scale)), Image.LANCZOS)
if as_bytes:
buffered = io.BytesIO()
screenshot.save(buffered, format="PNG")
screenshot = buffered.getvalue()
buffered.close()
else:
screenshot=None
self.desktop_state=DesktopState(
active_window=active_window,
windows=windows,
active_desktop=active_desktop,
all_desktops=all_desktops,
screenshot=screenshot,
tree_state=tree_state
)
# Log the time taken to capture the state
end_time = time()
logger.info(f"Desktop State capture took {end_time - start_time:.2f} seconds")
return self.desktop_state
def get_window_status(self,control:uia.Control)->Status:
if uia.IsIconic(control.NativeWindowHandle):
return Status.MINIMIZED
elif uia.IsZoomed(control.NativeWindowHandle):
return Status.MAXIMIZED
elif uia.IsWindowVisible(control.NativeWindowHandle):
return Status.NORMAL
else:
return Status.HIDDEN
def get_cursor_location(self)->tuple[int,int]:
position=pg.position()
return (position.x,position.y)
def get_element_under_cursor(self)->uia.Control:
return uia.ControlFromCursor()
def get_apps_from_start_menu(self)->dict[str,str]:
command='Get-StartApps | ConvertTo-Csv -NoTypeInformation'
apps_info, status = self.execute_command(command)
if status != 0 or not apps_info:
logger.error(f"Failed to get apps from start menu: {apps_info}")
return {}
try:
reader = csv.DictReader(io.StringIO(apps_info.strip()))
return {
row.get('Name', '').lower(): row.get('AppID', '')
for row in reader
if row.get('Name') and row.get('AppID')
}
except Exception as e:
logger.error(f"Error parsing start menu apps: {e}")
return {}
def execute_command(self, command: str,timeout:int=10) -> tuple[str, int]:
try:
encoded = base64.b64encode(command.encode("utf-16le")).decode("ascii")
result = subprocess.run(
['powershell', '-NoProfile', '-OutputFormat', 'Text', '-EncodedCommand', encoded],
capture_output=True, # No errors='ignore' - let subprocess return bytes
timeout=timeout,
cwd=os.path.expanduser(path='~'),
env=os.environ.copy() # Inherit environment variables including PATH
)
# Handle both bytes and str output (subprocess behavior varies by environment)
stdout = result.stdout
stderr = result.stderr
if isinstance(stdout, bytes):
stdout = stdout.decode(self.encoding, errors='ignore')
if isinstance(stderr, bytes):
stderr = stderr.decode(self.encoding, errors='ignore')
return (stdout or stderr, result.returncode)
except subprocess.TimeoutExpired:
return ('Command execution timed out', 1)
except Exception as e:
return (f'Command execution failed: {type(e).__name__}: {e}', 1)
def is_window_browser(self,node:uia.Control):
'''Give any node of the app and it will return True if the app is a browser, False otherwise.'''
try:
process=Process(node.ProcessId)
return Browser.has_process(process.name())
except:
return False
def get_default_language(self)->str:
command="Get-Culture | Select-Object Name,DisplayName | ConvertTo-Csv -NoTypeInformation"
response,_=self.execute_command(command)
reader=csv.DictReader(io.StringIO(response))
return "".join([row.get('DisplayName') for row in reader])
def resize_app(self,size:tuple[int,int]=None,loc:tuple[int,int]=None)->tuple[str,int]:
active_window=self.desktop_state.active_window
if active_window is None:
return "No active window found",1
if active_window.status==Status.MINIMIZED:
return f"{active_window.name} is minimized",1
elif active_window.status==Status.MAXIMIZED:
return f"{active_window.name} is maximized",1
else:
window_control=uia.ControlFromHandle(active_window.handle)
if loc is None:
x=window_control.BoundingRectangle.left
y=window_control.BoundingRectangle.top
loc=(x,y)
if size is None:
width=window_control.BoundingRectangle.width()
height=window_control.BoundingRectangle.height()
size=(width,height)
x,y=loc
width,height=size
window_control.MoveWindow(x,y,width,height)
return (f'{active_window.name} resized to {width}x{height} at {x},{y}.',0)
def is_app_running(self,name:str)->bool:
windows, _ = self.get_windows()
windows_dict = {window.name: window for window in windows}
return process.extractOne(name,list(windows_dict.keys()),score_cutoff=60) is not None
def app(self,mode:Literal['launch','switch','resize'],name:Optional[str]=None,loc:Optional[tuple[int,int]]=None,size:Optional[tuple[int,int]]=None):
match mode:
case 'launch':
response,status,pid=self.launch_app(name)
if status!=0:
return response
# Smart wait using UIA Exists (avoids manual Python loops)
launched = False
if pid > 0:
if uia.WindowControl(ProcessId=pid).Exists(maxSearchSeconds=10):
launched = True
if not launched:
# Fallback: Regex search for the window title
safe_name = re.escape(name)
if uia.WindowControl(RegexName=f'(?i).*{safe_name}.*').Exists(maxSearchSeconds=10):
launched = True
if launched:
return f'{name.title()} launched.'
return f'Launching {name.title()} sent, but window not detected yet.'
case 'resize':
response,status=self.resize_app(size=size,loc=loc)
if status!=0:
return response
else:
return response
case 'switch':
response,status=self.switch_app(name)
if status!=0:
return response
else:
return response
def launch_app(self,name:str)->tuple[str,int,int]:
apps_map=self.get_apps_from_start_menu()
matched_app=process.extractOne(name,apps_map.keys(),score_cutoff=70)
if matched_app is None:
return (f'{name.title()} not found in start menu.',1,0)
app_name,_=matched_app
appid=apps_map.get(app_name)
if appid is None:
return (f'{name.title()} not found in start menu.',1,0)
pid = 0
if os.path.exists(appid) or "\\" in appid:
# It's a file path, we can try to get the PID using PassThru
# Escape any single quotes and wrap in single quotes for PowerShell safety
safe_appid = appid.replace("'", "''")
command = f"Start-Process '{safe_appid}' -PassThru | Select-Object -ExpandProperty Id"
response, status = self.execute_command(command)
if status == 0 and response.strip().isdigit():
pid = int(response.strip())
else:
# It's an AUMID (Store App) - validate it only contains expected characters
if not appid.replace('\\', '').replace('_', '').replace('.', '').replace('-', '').isalnum():
return (f'Invalid app identifier: {appid}', 1, 0)
command = f'Start-Process "shell:AppsFolder\\{appid}"'
response, status = self.execute_command(command)
return response, status, pid
def switch_app(self,name:str):
windows={window.name:window for window in [self.desktop_state.active_window]+self.desktop_state.windows if window is not None}
matched_window:Optional[tuple[str,float]]=process.extractOne(name,list(windows.keys()),score_cutoff=70)
if matched_window is None:
return (f'Application {name.title()} not found.',1)
window_name,_=matched_window
window=windows.get(window_name)
target_handle=window.handle
if uia.IsIconic(target_handle):
uia.ShowWindow(target_handle, win32con.SW_RESTORE)
content=f'{window_name.title()} restored from Minimized state.'
else:
self.bring_window_to_top(target_handle)
content=f'Switched to {window_name.title()} window.'
return content,0
def bring_window_to_top(self, target_handle: int):
if not win32gui.IsWindow(target_handle):
raise ValueError("Invalid window handle")
try:
if win32gui.IsIconic(target_handle):
win32gui.ShowWindow(target_handle, win32con.SW_RESTORE)
foreground_handle = win32gui.GetForegroundWindow()
# Validate both handles before proceeding
if not win32gui.IsWindow(foreground_handle):
# No valid foreground window, just try to set target as foreground
win32gui.SetForegroundWindow(target_handle)
win32gui.BringWindowToTop(target_handle)
return
foreground_thread, _ = win32process.GetWindowThreadProcessId(foreground_handle)
target_thread, _ = win32process.GetWindowThreadProcessId(target_handle)
if not foreground_thread or not target_thread or foreground_thread == target_thread:
win32gui.SetForegroundWindow(target_handle)
win32gui.BringWindowToTop(target_handle)
return
ctypes.windll.user32.AllowSetForegroundWindow(-1)
attached = False
try:
win32process.AttachThreadInput(foreground_thread, target_thread, True)
attached = True
win32gui.SetForegroundWindow(target_handle)
win32gui.BringWindowToTop(target_handle)
win32gui.SetWindowPos(
target_handle,
win32con.HWND_TOP,
0, 0, 0, 0,
win32con.SWP_NOMOVE | win32con.SWP_NOSIZE | win32con.SWP_SHOWWINDOW
)
finally:
if attached:
win32process.AttachThreadInput(foreground_thread, target_thread, False)
except Exception as e:
logger.exception(f"Failed to bring window to top: {e}")
def get_element_handle_from_label(self,label:int)->uia.Control:
tree_state=self.desktop_state.tree_state
element_node=tree_state.interactive_nodes[label]
xpath=element_node.xpath
element_handle=self.get_element_from_xpath(xpath)
return element_handle
def get_coordinates_from_label(self,label:int)->tuple[int,int]:
element_handle=self.get_element_handle_from_label(label)
bounding_rectangle=element_handle.BoundingRectangle
return bounding_rectangle.xcenter(),bounding_rectangle.ycenter()
def click(self,loc:tuple[int,int],button:str='left',clicks:int=2):
x,y=loc
pg.click(x,y,button=button,clicks=clicks,duration=0.1)
def type(self,loc:tuple[int,int],text:str,caret_position:Literal['start', 'idle', 'end']='idle',clear:bool|str=False,press_enter:bool|str=False):
x,y=loc
pg.leftClick(x,y)
if caret_position == 'start':
pg.press('home')
elif caret_position == 'end':
pg.press('end')
else:
pass
# Handle both boolean and string 'true'/'false'
if clear is True or (isinstance(clear, str) and clear.lower() == 'true'):
pg.sleep(0.5)
pg.hotkey('ctrl','a')
pg.press('backspace')
pg.typewrite(text,interval=0.02)
if press_enter is True or (isinstance(press_enter, str) and press_enter.lower() == 'true'):
pg.press('enter')
def scroll(self,loc:tuple[int,int]=None,type:Literal['horizontal','vertical']='vertical',direction:Literal['up','down','left','right']='down',wheel_times:int=1)->str|None:
if loc:
self.move(loc)
match type:
case 'vertical':
match direction:
case 'up':
uia.WheelUp(wheel_times)
case 'down':
uia.WheelDown(wheel_times)
case _:
return 'Invalid direction. Use "up" or "down".'
case 'horizontal':
match direction:
case 'left':
pg.keyDown('Shift')
pg.sleep(0.05)
uia.WheelUp(wheel_times)
pg.sleep(0.05)
pg.keyUp('Shift')
case 'right':
pg.keyDown('Shift')
pg.sleep(0.05)
uia.WheelDown(wheel_times)
pg.sleep(0.05)
pg.keyUp('Shift')
case _:
return 'Invalid direction. Use "left" or "right".'
case _:
return 'Invalid type. Use "horizontal" or "vertical".'
return None
def drag(self,loc:tuple[int,int]):
x,y=loc
pg.sleep(0.5)
pg.dragTo(x,y,duration=0.6)
def move(self,loc:tuple[int,int]):
x,y=loc
pg.moveTo(x,y,duration=0.1)
def shortcut(self,shortcut:str):
shortcut=shortcut.split('+')
if len(shortcut)>1:
pg.hotkey(*shortcut)
else:
pg.press(''.join(shortcut))
def multi_select(self,press_ctrl:bool|str=False,locs:list[tuple[int,int]]=[]):
press_ctrl = press_ctrl is True or (isinstance(press_ctrl, str) and press_ctrl.lower() == 'true')
if press_ctrl:
pg.keyDown('ctrl')
for loc in locs:
x,y=loc
pg.click(x,y,duration=0.2)
pg.sleep(0.5)
pg.keyUp('ctrl')
def multi_edit(self,locs:list[tuple[int,int,str]]):
for loc in locs:
x,y,text=loc
self.type((x,y),text=text,clear=True)
def scrape(self,url:str)->str:
response=requests.get(url,timeout=10)
html=response.text
content=markdownify(html=html)
return content
def get_window_from_element(self,element:uia.Control)->Window|None:
if element is None:
return None
top_window=element.GetTopLevelControl()
if top_window is None:
return None
handle=top_window.NativeWindowHandle
windows,_=self.get_windows()
for window in windows:
if window.handle==handle:
return window
return None
def is_window_visible(self,window:uia.Control)->bool:
is_minimized=self.get_window_status(window)!=Status.MINIMIZED
size=window.BoundingRectangle
area=size.width()*size.height()
is_overlay=self.is_overlay_window(window)
return not is_overlay and is_minimized and area>10
def is_overlay_window(self,element:uia.Control) -> bool:
no_children = len(element.GetChildren()) == 0
is_name = "Overlay" in element.Name.strip()
return no_children or is_name
def get_controls_handles(self,optimized:bool=False):
handles = set()
# For even more faster results (still under development)
def callback(hwnd, _):
try:
# Validate handle before checking properties
if win32gui.IsWindow(hwnd) and win32gui.IsWindowVisible(hwnd) and is_window_on_current_desktop(hwnd):
handles.add(hwnd)
except Exception:
# Skip invalid handles without logging (common during window enumeration)
pass
win32gui.EnumWindows(callback, None)
if desktop_hwnd:= win32gui.FindWindow('Progman',None):
handles.add(desktop_hwnd)
if taskbar_hwnd:= win32gui.FindWindow('Shell_TrayWnd',None):
handles.add(taskbar_hwnd)
if secondary_taskbar_hwnd:= win32gui.FindWindow('Shell_SecondaryTrayWnd',None):
handles.add(secondary_taskbar_hwnd)
return handles
def get_active_window(self,windows:list[Window]|None=None)->Window|None:
try:
if windows is None:
windows,_=self.get_windows()
active_window=self.get_foreground_window()
if active_window.ClassName=="Progman":
return None
active_window_handle=active_window.NativeWindowHandle
for window in windows:
if window.handle!=active_window_handle:
continue
return window
# In case active window is not present in the windows list
return Window(**{
"name":active_window.Name,
"is_browser":self.is_window_browser(active_window),
"depth":0,
"bounding_box":BoundingBox(
left=active_window.BoundingRectangle.left,
top=active_window.BoundingRectangle.top,
right=active_window.BoundingRectangle.right,
bottom=active_window.BoundingRectangle.bottom,
width=active_window.BoundingRectangle.width(),
height=active_window.BoundingRectangle.height()
),
"status":self.get_window_status(active_window),
"handle":active_window_handle,
"process_id":active_window.ProcessId,
})
except Exception as ex:
logger.error(f"Error in get_active_window: {ex}")
return None
def get_foreground_window(self)->uia.Control:
handle=uia.GetForegroundWindow()
active_window=self.get_window_from_element_handle(handle)
return active_window
def get_window_from_element_handle(self, element_handle: int) -> uia.Control:
current = uia.ControlFromHandle(element_handle)
root_handle = uia.GetRootControl().NativeWindowHandle
while True:
parent = current.GetParentControl()
if parent is None or parent.NativeWindowHandle == root_handle:
return current
current = parent
def get_windows(self,controls_handles:set[int]|None=None) -> tuple[list[Window],set[int]]:
try:
windows = []
window_handles = set()
controls_handles=controls_handles or self.get_controls_handles()
for depth, hwnd in enumerate(controls_handles):
try:
child = uia.ControlFromHandle(hwnd)
except Exception:
continue
# Filter out Overlays (e.g. NVIDIA, Steam)
if self.is_overlay_window(child):
continue
if isinstance(child,(uia.WindowControl,uia.PaneControl)):
window_pattern=child.GetPattern(uia.PatternId.WindowPattern)
if (window_pattern is None):
continue
if window_pattern.CanMinimize and window_pattern.CanMaximize:
status = self.get_window_status(child)
bounding_rect=child.BoundingRectangle
if bounding_rect.isempty() and status!=Status.MINIMIZED:
continue
windows.append(Window(**{
"name":child.Name,
"depth":depth,
"status":status,
"bounding_box":BoundingBox(
left=bounding_rect.left,
top=bounding_rect.top,
right=bounding_rect.right,
bottom=bounding_rect.bottom,
width=bounding_rect.width(),
height=bounding_rect.height()
),
"handle":child.NativeWindowHandle,
"process_id":child.ProcessId,
"is_browser":self.is_window_browser(child)
}))
window_handles.add(child.NativeWindowHandle)
except Exception as ex:
logger.error(f"Error in get_windows: {ex}")
windows = []
return windows,window_handles
def get_xpath_from_element(self,element:uia.Control):
current=element
if current is None:
return ""
path_parts=[]
while current is not None:
parent=current.GetParentControl()
if parent is None:
# we are at the root node
path_parts.append(f'{current.ControlTypeName}')
break
children=parent.GetChildren()
same_type_children=["-".join(map(lambda x:str(x),child.GetRuntimeId())) for child in children if child.ControlType==current.ControlType]
index=same_type_children.index("-".join(map(lambda x:str(x),current.GetRuntimeId())))
if same_type_children:
path_parts.append(f'{current.ControlTypeName}[{index+1}]')
else:
path_parts.append(f'{current.ControlTypeName}')
current=parent
path_parts.reverse()
xpath="/".join(path_parts)
return xpath
def get_element_from_xpath(self,xpath:str)->uia.Control:
pattern = re.compile(r'(\w+)(?:\[(\d+)\])?')
parts=xpath.split("/")
root=uia.GetRootControl()
element=root
for part in parts[1:]:
match=pattern.fullmatch(part)
if match is None:
continue
control_type, index=match.groups()
index=int(index) if index else None
children=element.GetChildren()
same_type_children=list(filter(lambda x:x.ControlTypeName==control_type,children))
if index:
element=same_type_children[index-1]
else:
element=same_type_children[0]
return element
def get_windows_version(self)->str:
response,status=self.execute_command("(Get-CimInstance Win32_OperatingSystem).Caption")
if status==0:
return response.strip()
return "Windows"
def get_user_account_type(self)->str:
response,status=self.execute_command("(Get-LocalUser -Name $env:USERNAME).PrincipalSource")
return "Local Account" if response.strip()=='Local' else "Microsoft Account" if status==0 else "Local Account"
def get_dpi_scaling(self):
try:
user32 = ctypes.windll.user32
dpi = user32.GetDpiForSystem()
return dpi / 96.0 if dpi > 0 else 1.0
except Exception:
# Fallback to standard DPI if system call fails
return 1.0
def get_screen_size(self)->Size:
width, height = uia.GetVirtualScreenSize()
return Size(width=width,height=height)
def get_screenshot(self)->Image.Image:
try:
return ImageGrab.grab(all_screens=True)
except Exception as e:
logger.warning(f"Failed to capture virtual screen, using primary screen")
return pg.screenshot()
def get_annotated_screenshot(self, nodes: list[TreeElementNode]) -> Image.Image:
screenshot = self.get_screenshot()
# Add padding
padding = 5
width = int(screenshot.width + (1.5 * padding))
height = int(screenshot.height + (1.5 * padding))
padded_screenshot = Image.new("RGB", (width, height), color=(255, 255, 255))
padded_screenshot.paste(screenshot, (padding, padding))
draw = ImageDraw.Draw(padded_screenshot)
font_size = 12
try:
font = ImageFont.truetype('arial.ttf', font_size)
except IOError:
font = ImageFont.load_default()
def get_random_color():
return "#{:06x}".format(random.randint(0, 0xFFFFFF))
left_offset, top_offset, _, _ = uia.GetVirtualScreenRect()
def draw_annotation(label, node: TreeElementNode):
box = node.bounding_box
color = get_random_color()
# Scale and pad the bounding box also clip the bounding box
# Adjust for virtual screen offset so coordinates map to the screenshot image
adjusted_box = (
int(box.left - left_offset) + padding,
int(box.top - top_offset) + padding,
int(box.right - left_offset) + padding,
int(box.bottom - top_offset) + padding
)
# Draw bounding box
draw.rectangle(adjusted_box, outline=color, width=2)
# Label dimensions
label_width = draw.textlength(str(label), font=font)
label_height = font_size
left, top, right, bottom = adjusted_box
# Label position above bounding box
label_x1 = right - label_width
label_y1 = top - label_height - 4
label_x2 = label_x1 + label_width
label_y2 = label_y1 + label_height + 4
# Draw label background and text
draw.rectangle([(label_x1, label_y1), (label_x2, label_y2)], fill=color)
draw.text((label_x1 + 2, label_y1 + 2), str(label), fill=(255, 255, 255), font=font)
# Draw annotations in parallel
with ThreadPoolExecutor() as executor:
executor.map(draw_annotation, range(len(nodes)), nodes)
return padded_screenshot
@contextmanager
def auto_minimize(self):
try:
handle = uia.GetForegroundWindow()
uia.ShowWindow(handle, win32con.SW_MINIMIZE)
yield
finally:
uia.ShowWindow(handle, win32con.SW_RESTORE)