Skip to main content
Glama
Dazlarus
by Dazlarus
driver_robust.py14.2 kB
#!/usr/bin/env python3 """ Robust ChatGPT Desktop Automation Driver for Windows This driver wraps robust_flow.py to provide a JSON-based interface for the MCP server. All the heavy lifting is done by RobustChatGPTFlow. Protocol: - Receives JSON commands on stdin - Returns JSON responses on stdout - Debug messages go to stderr Commands: - check_chatgpt: Check if ChatGPT can be found/started - focus_chatgpt: Focus the ChatGPT window - find_conversation: Navigate to project + conversation - send_message: Send a message - wait_for_response: Wait for response to complete - get_last_response: Copy and return the response """ import sys import json import os # Add driver directory to path _driver_dir = os.path.dirname(os.path.abspath(__file__)) if _driver_dir not in sys.path: sys.path.insert(0, _driver_dir) from robust_flow import RobustChatGPTFlow, log_debug class RobustDriver: """ MCP-compatible driver using RobustChatGPTFlow. Maps MCP commands to robust_flow methods. """ def __init__(self): self.flow = RobustChatGPTFlow() self._last_prompt = None def check_chatgpt(self) -> dict: """Check if ChatGPT is available (running or can be started).""" # Try to find existing window first import win32gui import win32process import psutil def find_chatgpt_window(): result = [] def callback(hwnd, _): if win32gui.IsWindow(hwnd): try: _, pid = win32process.GetWindowThreadProcessId(hwnd) proc = psutil.Process(pid) if proc.name().lower() == "chatgpt.exe": title = win32gui.GetWindowText(hwnd) if title: result.append((hwnd, title)) except: pass return True win32gui.EnumWindows(callback, None) return result windows = find_chatgpt_window() if windows: hwnd, title = windows[0] return { "success": True, "data": { "found": True, "title": title, "message": "ChatGPT Desktop is running" } } else: return { "success": True, "data": { "found": False, "message": "ChatGPT Desktop not running (will be started automatically)" } } def focus_chatgpt(self) -> dict: """Focus or start ChatGPT window.""" # Use robust flow's step sequence # Kill -> Start -> Focus ensures clean state if not self.flow.step1_kill_chatgpt(): return {"success": False, "error": "Failed to kill existing ChatGPT"} if not self.flow.step2_start_chatgpt(): return {"success": False, "error": "Failed to start ChatGPT"} if not self.flow.step3_focus_chatgpt(): return {"success": False, "error": "Failed to focus ChatGPT"} return {"success": True} def find_conversation(self, title: str, project_name: str = None) -> dict: """ Navigate to a conversation. Args: title: Conversation name project_name: Project/folder name (optional but recommended) """ # Ensure we have a focused window if self.flow.hwnd is None: # Run initialization steps if not self.flow.step1_kill_chatgpt(): return {"success": False, "error": "Failed to kill existing ChatGPT"} if not self.flow.step2_start_chatgpt(): return {"success": False, "error": "Failed to start ChatGPT"} if not self.flow.step3_focus_chatgpt(): return {"success": False, "error": "Failed to focus ChatGPT"} # Open sidebar if not self.flow.step4_open_sidebar(): return {"success": False, "error": "Failed to open sidebar"} # Navigate to project (if specified) if project_name: if not self.flow.step5_click_project(project_name): return {"success": False, "error": f"Failed to find project: {project_name}"} # Navigate to conversation if not self.flow.step6_click_conversation(title): return {"success": False, "error": f"Failed to find conversation: {title}"} return { "success": True, "data": { "conversation": title, "project": project_name, "method": "robust_flow" } } def send_message(self, message: str) -> dict: """Send a message to the current conversation (combined focus+send).""" log_debug(f"[driver] send_message called, len={len(message)}") ok = self.flow.step7_send_prompt(message) log_debug(f"[driver] step7_send_prompt returned: {ok}") if not ok: return {"success": False, "error": "Failed to focus/send message"} self._last_prompt = message return {"success": True} def wait_for_response(self, timeout_ms: int = 600000) -> dict: """Wait for ChatGPT to finish generating.""" timeout_sec = timeout_ms / 1000.0 if not self.flow.step9_wait_for_response(timeout=timeout_sec): return {"success": False, "error": f"Timeout waiting for response ({timeout_ms}ms)"} return {"success": True} def get_last_response(self) -> dict: """Copy and return the last response.""" response = self.flow.step10_copy_response() if not response: return {"success": False, "error": "Failed to copy response"} return { "success": True, "data": { "response": response } } # Convenience method: full escalation in one call def escalate(self, project_name: str, conversation: str, message: str, timeout_ms: int = 600000, run_id: str = None) -> dict: """ Full escalation flow in one call using execute_full_flow. Uses robust_flow's execute_full_flow which has: - Proper phase logging - Navigation retry logic - Response validation - failed_step tracking Includes aggressive retry logic - will retry the ENTIRE flow multiple times since many failures are transient (chaos moved window, focus lost, etc). If all retries fail, returns a clear error message instructing the user not to interfere while the agent is working. """ if run_id: log_debug(f"[{run_id}] Starting escalation: project={project_name}, conversation={conversation}") else: log_debug(f"Starting escalation: project={project_name}, conversation={conversation}") # Almost all failures are recoverable - the whole point is to retry the entire flow # Only truly fatal errors (like invalid config) should not be retried NON_RECOVERABLE_REASONS = { "invalid_config", # Bad project/conversation names "invalid_response", # Response validation failed (not a retry issue) } max_attempts = 4 # Try up to 4 times total (initial + 3 retries) last_error = None last_step = None last_reason = None for attempt in range(max_attempts): if attempt > 0: log_debug(f"[{run_id or 'no-run-id'}] RETRY attempt {attempt + 1}/{max_attempts}: restarting entire flow") # Fresh flow instance to reset all state self.flow = RobustChatGPTFlow() # Brief pause between retries - not too long or chaos will time out import time time.sleep(1.5) # Use the full flow with all its bells and whistles result = self.flow.execute_full_flow( project_name=project_name or "", conversation_name=conversation, prompt=message ) if result["success"]: response = result.get("response", "") # Check if we actually got a response if response and len(response.strip()) > 10: return { "success": True, "data": { "response": response, "project": project_name, "conversation": conversation, "run_id": run_id, "attempts": attempt + 1 } } else: # "Success" but empty/invalid response - treat as failure and retry log_debug(f"[{run_id or 'no-run-id'}] Flow succeeded but response is empty/invalid, retrying...") last_error = "Response was empty or too short" last_step = 10 last_reason = "empty_response" continue # Failure - capture details failed_step = result.get("failed_step") error = result.get("error", "Unknown error") error_reason = self._derive_error_reason(failed_step, error) last_error = error last_step = failed_step last_reason = error_reason log_debug(f"[{run_id or 'no-run-id'}] Attempt {attempt + 1} failed: step={failed_step}, reason={error_reason}, error={error}") # Check if this is a non-recoverable error if error_reason in NON_RECOVERABLE_REASONS: log_debug(f"[{run_id or 'no-run-id'}] Non-recoverable error ({error_reason}), stopping retries") break # Otherwise, we'll retry on the next iteration # All attempts failed - return a clear error message for the agent to relay to the user user_message = ( f"ChatGPT escalation failed after {max_attempts} attempts. " f"Last failure was at step {last_step} ({last_reason}): {last_error}. " f"\n\n⚠️ IMPORTANT: Please keep your hands off the keyboard and mouse while the agent is working with ChatGPT. " f"Window movements, clicks, or keyboard input can interfere with the automation. " f"If the problem persists, try closing other applications and running the escalation again." ) return { "success": False, "error": user_message, "failed_step": last_step, "error_reason": last_reason, "run_id": run_id, "attempts": max_attempts } def _derive_error_reason(self, step: int, error: str) -> str: """Derive a structured error reason code from step and error message.""" if step is None: return "unknown" error_lower = error.lower() # Step-specific reason codes step_reasons = { 1: "kill_failed", 2: "start_failed", 3: "focus_failed", 4: "sidebar_failed", 5: "project_not_found", 6: "conversation_not_found", 7: "send_failed", 9: "response_timeout", 10: "copy_failed", } base_reason = step_reasons.get(step, f"step{step}_failed") # Refine based on error message if "empty" in error_lower or "too short" in error_lower: return "empty_response" elif "not found" in error_lower or "failed to find" in error_lower: if step == 5: return "project_not_found" elif step == 6: return "conversation_not_found" elif "timeout" in error_lower: return "timeout" elif "focus" in error_lower: return "focus_lost" elif "validation failed" in error_lower: return "invalid_response" return base_reason def main(): """Main entry point - process commands from stdin.""" driver = RobustDriver() # Read command from stdin try: input_data = sys.stdin.read() command = json.loads(input_data) except json.JSONDecodeError as e: print(json.dumps({"success": False, "error": f"Invalid JSON input: {e}"})) return action = command.get("action") params = command.get("params", {}) log_debug(f"Executing action: {action}") # Dispatch to appropriate handler if action == "check_chatgpt": result = driver.check_chatgpt() elif action == "focus_chatgpt": result = driver.focus_chatgpt() elif action == "find_conversation": result = driver.find_conversation( title=params.get("title", ""), project_name=params.get("project_name") ) elif action == "send_message": result = driver.send_message(params.get("message", "")) elif action == "wait_for_response": result = driver.wait_for_response(params.get("timeout_ms", 600000)) elif action == "get_last_response": result = driver.get_last_response() elif action == "escalate": # Full flow in one call result = driver.escalate( project_name=params.get("project_name"), conversation=params.get("conversation"), message=params.get("message", ""), timeout_ms=params.get("timeout_ms", 600000), run_id=params.get("run_id") ) else: result = {"success": False, "error": f"Unknown action: {action}"} print(json.dumps(result)) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dazlarus/chatgpt-escalation-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server