Skip to main content
Glama
service.py18 kB
from src.desktop.config import BROWSER_NAMES, PROCESS_PER_MONITOR_DPI_AWARE from src.desktop.views import DesktopState, App, Size, Status from locale import getpreferredencoding from contextlib import contextmanager from typing import Optional,Literal from markdownify import markdownify from src.tree.service import Tree from fuzzywuzzy import process from psutil import Process from time import sleep from PIL import Image import win32process import subprocess import win32gui import win32con import requests import logging import base64 import ctypes import csv import re import os import io logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter('[%(levelname)s] %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) try: ctypes.windll.shcore.SetProcessDpiAwareness(PROCESS_PER_MONITOR_DPI_AWARE) except Exception: ctypes.windll.user32.SetProcessDPIAware() import uiautomation as uia import pyautogui as pg pg.FAILSAFE=False pg.PAUSE=1.0 class Desktop: def __init__(self): self.encoding=getpreferredencoding() self.tree=Tree(self) self.desktop_state=None def get_resolution(self)->tuple[int,int]: return pg.size() def get_state(self,use_vision:bool=False,as_bytes:bool=False,scale:float=1.0)->DesktopState: sleep(0.1) apps=self.get_apps() active_app=self.get_active_app() if active_app is not None: apps.remove(active_app) logger.debug(f"Active app: {active_app}") logger.debug(f"Apps: {apps}") tree_state=self.tree.get_state(active_app,apps) if use_vision: screenshot=self.tree.get_annotated_screenshot(tree_state.interactive_nodes,scale=scale) if as_bytes: bytes_io=io.BytesIO() screenshot.save(bytes_io,format='PNG') screenshot=bytes_io.getvalue() else: screenshot=None self.desktop_state=DesktopState(apps= apps,active_app=active_app,screenshot=screenshot,tree_state=tree_state) return self.desktop_state def get_window_element_from_element(self,element:uia.Control)->uia.Control|None: while element is not None: if uia.IsTopLevelWindow(element.NativeWindowHandle): return element element = element.GetParentControl() return None def get_active_app(self)->App|None: try: handle=uia.GetForegroundWindow() for app in self.get_apps(): if app.handle!=handle: continue return app except Exception as ex: logger.error(f"Error in get_active_app: {ex}") return None def get_app_status(self,control:uia.Control)->Status: if uia.IsIconic(control.NativeWindowHandle): return Status.MINIMIZED elif uia.IsZoomed(control.NativeWindowHandle): return Status.MAXIMIZED elif uia.IsWindowVisible(control.NativeWindowHandle): return Status.NORMAL else: return Status.HIDDEN def get_cursor_location(self)->tuple[int,int]: position=pg.position() return (position.x,position.y) def get_element_under_cursor(self)->uia.Control: return uia.ControlFromCursor() def get_apps_from_start_menu(self)->dict[str,str]: command='Get-StartApps | ConvertTo-Csv -NoTypeInformation' apps_info,_=self.execute_command(command) reader=csv.DictReader(io.StringIO(apps_info)) return {row.get('Name').lower():row.get('AppID') for row in reader} def execute_command(self,command:str)->tuple[str,int]: try: encoded = base64.b64encode(command.encode("utf-16le")).decode("ascii") result = subprocess.run( ['powershell', '-NoProfile', '-EncodedCommand', encoded], capture_output=True, errors='ignore', timeout=25, cwd=os.path.expanduser(path='~') ) stdout=result.stdout stderr=result.stderr return (stdout or stderr,result.returncode) except subprocess.TimeoutExpired: return ('Command execution timed out', 1) except Exception as e: return ('Command execution failed', 1) def is_app_browser(self,node:uia.Control): process=Process(node.ProcessId) return process.name() in BROWSER_NAMES def get_default_language(self)->str: command="Get-Culture | Select-Object Name,DisplayName | ConvertTo-Csv -NoTypeInformation" response,_=self.execute_command(command) reader=csv.DictReader(io.StringIO(response)) return "".join([row.get('DisplayName') for row in reader]) def resize_app(self,size:tuple[int,int]=None,loc:tuple[int,int]=None)->tuple[str,int]: active_app=self.desktop_state.active_app if active_app is None: return "No active app found",1 if active_app.status==Status.MINIMIZED: return f"{active_app.name} is minimized",1 elif active_app.status==Status.MAXIMIZED: return f"{active_app.name} is maximized",1 else: app_control=uia.ControlFromHandle(active_app.handle) if loc is None: x=app_control.BoundingRectangle.left y=app_control.BoundingRectangle.top loc=(x,y) if size is None: width=app_control.BoundingRectangle.width() height=app_control.BoundingRectangle.height() size=(width,height) x,y=loc width,height=size app_control.MoveWindow(x,y,width,height) return (f'{active_app.name} resized to {width}x{height} at {x},{y}.',0) def is_app_running(self,name:str)->bool: apps={app.name:app for app in self.get_apps()} return process.extractOne(name,list(apps.keys()),score_cutoff=60) is not None def app(self,mode:Literal['launch','switch','resize'],name:Optional[str]=None,loc:Optional[tuple[int,int]]=None,size:Optional[tuple[int,int]]=None): match mode: case 'launch': response,status=self.launch_app(name) sleep(1.25) if status!=0: return response consecutive_waits=3 for _ in range(consecutive_waits): if not self.is_app_running(name): sleep(1.25) else: return f'{name.title()} launched.' return f'Launching {name.title()} wait for it to come load.' case 'resize': response,status=self.resize_app(size=size,loc=loc) if status!=0: return response else: return response case 'switch': response,status=self.switch_app(name) if status!=0: return response else: return response def launch_app(self,name:str)->tuple[str,int]: apps_map=self.get_apps_from_start_menu() matched_app=process.extractOne(name,apps_map.keys(),score_cutoff=70) if matched_app is None: return (f'{name.title()} not found in start menu.',1) app_name,_=matched_app appid=apps_map.get(app_name) if appid is None: return (name,f'{name.title()} not found in start menu.',1) if name.endswith('.exe'): response,status=self.execute_command(f'Start-Process {appid}') else: response,status=self.execute_command(f'Start-Process shell:AppsFolder\\{appid}') return response,status def switch_app(self,name:str): apps={app.name:app for app in [self.desktop_state.active_app]+self.desktop_state.apps if app is not None} matched_app:Optional[tuple[str,float]]=process.extractOne(name,list(apps.keys()),score_cutoff=70) if matched_app is None: return (f'Application {name.title()} not found.',1) app_name,_=matched_app app=apps.get(app_name) target_handle=app.handle if uia.IsIconic(target_handle): uia.ShowWindow(target_handle, win32con.SW_RESTORE) content=f'{app_name.title()} restored from Minimized state.' else: self.bring_window_to_top(target_handle) content=f'Switched to {app_name.title()} window.' return content,0 def bring_window_to_top(self,target_handle:int): foreground_handle=win32gui.GetForegroundWindow() foreground_thread,_=win32process.GetWindowThreadProcessId(foreground_handle) target_thread,_=win32process.GetWindowThreadProcessId(target_handle) try: ctypes.windll.user32.AllowSetForegroundWindow(-1) win32process.AttachThreadInput(foreground_thread,target_thread,True) win32gui.SetForegroundWindow(target_handle) win32gui.BringWindowToTop(target_handle) except Exception as e: logger.error(f'Failed to bring window to top: {e}') finally: win32process.AttachThreadInput(foreground_thread,target_thread,False) def get_element_handle_from_label(self,label:int)->uia.Control: tree_state=self.desktop_state.tree_state element_node=tree_state.interactive_nodes[label] xpath=element_node.xpath element_handle=self.get_element_from_xpath(xpath) return element_handle def get_coordinates_from_label(self,label:int)->tuple[int,int]: element_handle=self.get_element_handle_from_label(label) bounding_rectangle=element_handle.BoundingRectangle return bounding_rectangle.xcenter(),bounding_rectangle.ycenter() def click(self,loc:tuple[int,int],button:str='left',clicks:int=2): x,y=loc pg.click(x,y,button=button,clicks=clicks,duration=0.1) def type(self,loc:tuple[int,int],text:str,caret_position:Literal['start','end','none']='none',clear:Literal['true','false']='false',press_enter:Literal['true','false']='false'): x,y=loc pg.leftClick(x,y) if caret_position == 'start': pg.press('home') elif caret_position == 'end': pg.press('end') else: pass if clear=='true': pg.sleep(0.5) pg.hotkey('ctrl','a') pg.press('backspace') pg.typewrite(text,interval=0.02) if press_enter=='true': pg.press('enter') def scroll(self,loc:tuple[int,int]=None,type:Literal['horizontal','vertical']='vertical',direction:Literal['up','down','left','right']='down',wheel_times:int=1)->str|None: if loc: self.move(loc) match type: case 'vertical': match direction: case 'up': uia.WheelUp(wheel_times) case 'down': uia.WheelDown(wheel_times) case _: return 'Invalid direction. Use "up" or "down".' case 'horizontal': match direction: case 'left': pg.keyDown('Shift') pg.sleep(0.05) uia.WheelUp(wheel_times) pg.sleep(0.05) pg.keyUp('Shift') case 'right': pg.keyDown('Shift') pg.sleep(0.05) uia.WheelDown(wheel_times) pg.sleep(0.05) pg.keyUp('Shift') case _: return 'Invalid direction. Use "left" or "right".' case _: return 'Invalid type. Use "horizontal" or "vertical".' return None def drag(self,loc:tuple[int,int]): x,y=loc pg.sleep(0.5) pg.dragTo(x,y,duration=0.6) def move(self,loc:tuple[int,int]): x,y=loc pg.moveTo(x,y,duration=0.1) def shortcut(self,shortcut:str): shortcut=shortcut.split('+') if len(shortcut)>1: pg.hotkey(*shortcut) else: pg.press(''.join(shortcut)) def multi_select(self,press_ctrl:Literal['true','false']='false',elements:list[tuple[int,int]|int]=[]): if press_ctrl=='true': pg.keyDown('ctrl') for element in elements: x,y=element pg.click(x,y,duration=0.2) pg.sleep(0.5) pg.keyUp('ctrl') def multi_edit(self,elements:list[tuple[int,int,str]|tuple[int,str]]): for element in elements: x,y,text=element self.type((x,y),text=text,clear='true') def scrape(self,url:str)->str: response=requests.get(url,timeout=10) html=response.text content=markdownify(html=html) return content def get_app_size(self,control:uia.Control): window=control.BoundingRectangle if window.isempty(): return Size(width=0,height=0) return Size(width=window.width(),height=window.height()) def is_app_visible(self,app)->bool: is_minimized=self.get_app_status(app)!=Status.MINIMIZED size=self.get_app_size(app) area=size.width*size.height is_overlay=self.is_overlay_app(app) return not is_overlay and is_minimized and area>10 def is_overlay_app(self,element:uia.Control) -> bool: no_children = len(element.GetChildren()) == 0 is_name = "Overlay" in element.Name.strip() return no_children or is_name def get_apps(self) -> list[App]: try: desktop = uia.GetRootControl() # Get the desktop control children = desktop.GetChildren() apps = [] for depth, child in enumerate(children): if isinstance(child,(uia.WindowControl,uia.PaneControl)): window_pattern=child.GetPattern(uia.PatternId.WindowPattern) if (window_pattern is None): continue if window_pattern.CanMinimize and window_pattern.CanMaximize: status = self.get_app_status(child) size=self.get_app_size(child) apps.append(App(**{ "name":child.Name, "depth":depth, "status":status, "size":size, "handle":child.NativeWindowHandle, "process_id":child.ProcessId })) except Exception as ex: logger.error(f"Error in get_apps: {ex}") apps = [] return apps def get_xpath_from_element(self,element:uia.Control): current=element if current is None: return "" path_parts=[] while current is not None: parent=current.GetParentControl() if parent is None: # we are at the root node path_parts.append(f'{current.ControlTypeName}') break children=parent.GetChildren() same_type_children=["-".join(map(lambda x:str(x),child.GetRuntimeId())) for child in children if child.ControlType==current.ControlType] index=same_type_children.index("-".join(map(lambda x:str(x),current.GetRuntimeId()))) if same_type_children: path_parts.append(f'{current.ControlTypeName}[{index+1}]') else: path_parts.append(f'{current.ControlTypeName}') current=parent path_parts.reverse() xpath="/".join(path_parts) return xpath def get_element_from_xpath(self,xpath:str)->uia.Control: pattern = re.compile(r'(\w+)(?:\[(\d+)\])?') parts=xpath.split("/") root=uia.GetRootControl() element=root for part in parts[1:]: match=pattern.fullmatch(part) if match is None: continue control_type, index=match.groups() index=int(index) if index else None children=element.GetChildren() same_type_children=list(filter(lambda x:x.ControlTypeName==control_type,children)) if index: element=same_type_children[index-1] else: element=same_type_children[0] return element def get_windows_version(self)->str: response,status=self.execute_command("(Get-CimInstance Win32_OperatingSystem).Caption") if status==0: return response.strip() return "Windows" def get_user_account_type(self)->str: response,status=self.execute_command("(Get-LocalUser -Name $env:USERNAME).PrincipalSource") return "Local Account" if response.strip()=='Local' else "Microsoft Account" if status==0 else "Local Account" def get_dpi_scaling(self): user32 = ctypes.windll.user32 dpi = user32.GetDpiForSystem() return dpi / 96.0 def get_screen_size(self)->Size: width, height = uia.GetScreenSize() return Size(width=width,height=height) def get_screenshot(self)->Image.Image: return pg.screenshot() @contextmanager def auto_minimize(self): try: handle = uia.GetForegroundWindow() uia.ShowWindow(handle, win32con.SW_MINIMIZE) yield finally: uia.ShowWindow(handle, win32con.SW_RESTORE)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/CursorTouch/Windows-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server