#!/usr/bin/env python3
"""
Robust ChatGPT automation flow with verification gates.
Each step has:
1. Action
2. Verification (with retries)
3. Only proceed when verified
This prevents race conditions and ensures reliable automation.
"""
import time
import sys
import os
import ctypes
from contextlib import contextmanager
from datetime import datetime
# Add driver directory to path
_driver_dir = os.path.dirname(os.path.abspath(__file__))
if _driver_dir not in sys.path:
sys.path.insert(0, _driver_dir)
# Start OCR model preloading immediately (background thread)
# This runs during steps 1-4 so OCR is ready by step 5
from ocr_extraction import start_ocr_preload
start_ocr_preload()
# Flow start time for elapsed time calculation
_flow_start_time = None
def log_debug(msg: str):
"""Log debug message to stderr."""
print(f"[FLOW] {msg}", file=sys.stderr)
def log_phase(step: int, phase: str, status: str = ""):
"""
Log a phase marker with timestamp and elapsed time.
Args:
step: Step number (1-10)
phase: Phase name (e.g., "open_sidebar", "click_project")
status: Status indicator (e.g., "START", "OK", "FAIL", "RETRY")
"""
global _flow_start_time
now = datetime.now()
timestamp = now.strftime("%H:%M:%S.%f")[:-3] # HH:MM:SS.mmm
if _flow_start_time is None:
elapsed = "0.000"
else:
elapsed = f"{(now.timestamp() - _flow_start_time):.3f}"
status_str = f" [{status}]" if status else ""
print(f"[PHASE] {timestamp} +{elapsed}s | Step {step} | {phase}{status_str}", file=sys.stderr)
def reset_flow_timer():
"""Reset the flow timer (call at start of execute_full_flow)."""
global _flow_start_time
_flow_start_time = datetime.now().timestamp()
# =============================================================================
# INPUT BLOCKING: Prevent external interference during automation
# =============================================================================
_user32 = ctypes.windll.user32
_input_blocked = False
def block_input(block: bool = True) -> bool:
"""
Block or unblock all mouse and keyboard input.
IMPORTANT: Requires admin/elevated privileges to work.
If not elevated, this will silently fail (returns False).
Args:
block: True to block input, False to unblock
Returns:
True if successful, False if failed (e.g., not admin)
"""
global _input_blocked
try:
result = _user32.BlockInput(block)
if result:
_input_blocked = block
log_debug(f"[input] {'BLOCKED' if block else 'UNBLOCKED'} user input")
else:
# BlockInput requires admin privileges
log_debug(f"[input] BlockInput failed (need admin privileges?)")
return bool(result)
except Exception as e:
log_debug(f"[input] BlockInput error: {e}")
return False
def unblock_input():
"""Ensure input is unblocked."""
global _input_blocked
if _input_blocked:
block_input(False)
@contextmanager
def input_blocked():
"""
Context manager to block input during critical operations.
Usage:
with input_blocked():
# Do automation that shouldn't be interrupted
pass
# Input automatically unblocked when exiting
If BlockInput fails (not admin), operations proceed normally.
"""
blocked = block_input(True)
try:
yield blocked
finally:
if blocked:
block_input(False)
# Register cleanup on exit to ensure input is never left blocked
import atexit
atexit.register(unblock_input)
class RobustChatGPTFlow:
"""
Robust ChatGPT automation with verification gates between each step.
"""
def __init__(self):
self.hwnd = None
self.window_rect = None
# Allow test harness to disable keyboard fallbacks to avoid global Tab usage in CI
self._keyboard_fallback_enabled = True
# Enable input blocking during critical operations (requires admin)
self._block_input_enabled = True
# =========================================================================
# SAFETY GUARDRAILS: Window State & Focus Management
# =========================================================================
def _ensure_foreground(self, max_attempts: int = 3) -> bool:
"""
Ensure ChatGPT window is foreground before UI actions.
If window has lost focus (e.g., user alt-tabbed), restore it.
Uses AttachThreadInput technique to bypass Windows focus restrictions.
Returns: True if window is foreground, False if failed.
"""
import win32gui
import win32con
import win32process
import win32api
for attempt in range(1, max_attempts + 1):
try:
# Check current foreground
fg_hwnd = win32gui.GetForegroundWindow()
if fg_hwnd == self.hwnd:
return True # Already foreground
# Lost focus - restore it
log_debug(f" [safety] Window lost focus (attempt {attempt}/{max_attempts}), restoring...")
# Restore if minimized
if win32gui.IsIconic(self.hwnd):
win32gui.ShowWindow(self.hwnd, win32con.SW_RESTORE)
time.sleep(0.3)
# Use AttachThreadInput technique to bypass Windows foreground restrictions
try:
# Get thread IDs
fg_thread, _ = win32process.GetWindowThreadProcessId(fg_hwnd)
target_thread, _ = win32process.GetWindowThreadProcessId(self.hwnd)
# Attach to foreground thread temporarily
if fg_thread != target_thread:
win32process.AttachThreadInput(target_thread, fg_thread, True)
try:
win32gui.SetForegroundWindow(self.hwnd)
win32gui.BringWindowToTop(self.hwnd)
win32gui.SetFocus(self.hwnd)
finally:
win32process.AttachThreadInput(target_thread, fg_thread, False)
else:
win32gui.SetForegroundWindow(self.hwnd)
except Exception as e:
# Fallback to simple approach
log_debug(f" [safety] AttachThreadInput failed: {e}, trying simple approach")
win32gui.SetForegroundWindow(self.hwnd)
time.sleep(0.2)
# Verify
if win32gui.GetForegroundWindow() == self.hwnd:
log_debug(f" [safety] ✓ Focus restored (attempt {attempt})")
self.window_rect = win32gui.GetWindowRect(self.hwnd)
return True
# Try ShowWindow with SW_SHOW as another fallback
win32gui.ShowWindow(self.hwnd, win32con.SW_SHOW)
time.sleep(0.2)
if win32gui.GetForegroundWindow() == self.hwnd:
log_debug(f" [safety] ✓ Focus restored via SW_SHOW (attempt {attempt})")
self.window_rect = win32gui.GetWindowRect(self.hwnd)
return True
except Exception as e:
log_debug(f" [safety] Focus restore attempt {attempt} failed: {e}")
time.sleep(0.3)
log_debug(f" [safety] ✗ Could not restore focus after {max_attempts} attempts")
return False
def _is_window_ready(self) -> bool:
"""
Check if window is in a valid state for automation.
Returns False if:
- Window is minimized
- Window is not visible
- Window handle is invalid
"""
import win32gui
try:
if not self.hwnd:
log_debug(" [safety] ✗ No window handle")
return False
# Check if window still exists
if not win32gui.IsWindow(self.hwnd):
log_debug(" [safety] ✗ Window handle is invalid")
return False
# Check if minimized
if win32gui.IsIconic(self.hwnd):
log_debug(" [safety] ✗ Window is minimized")
return False
# Check if visible
if not win32gui.IsWindowVisible(self.hwnd):
log_debug(" [safety] ✗ Window is not visible")
return False
return True
except Exception as e:
log_debug(f" [safety] ✗ Window state check failed: {e}")
return False
def _refresh_hwnd(self) -> bool:
"""
Refresh window handle if it became invalid.
Returns: True if hwnd successfully refreshed, False otherwise.
"""
log_debug(" [safety] Refreshing window handle...")
hwnd = self._find_chatgpt_hwnd()
if hwnd:
self.hwnd = hwnd
import win32gui
self.window_rect = win32gui.GetWindowRect(hwnd)
log_debug(f" [safety] ✓ Window handle refreshed (hwnd={hwnd})")
return True
else:
log_debug(" [safety] ✗ Could not find ChatGPT window")
return False
def _ensure_window_viable(self, step_name: str = "") -> bool:
"""
Comprehensive pre-step validation - ensures window is in a viable state.
Call this before EVERY critical step to guard against chaos:
1. Verify hwnd is still valid
2. Restore from minimized if needed
3. Ensure foreground focus
4. Update window_rect to handle moves/resizes
Args:
step_name: Name of the step (for logging)
Returns: True if window is viable and ready, False if unrecoverable
"""
import win32gui
import win32con
prefix = f"[viable:{step_name}]" if step_name else "[viable]"
log_debug(f" {prefix} Pre-step validation...")
# 1. Check hwnd validity
if not self.hwnd or not win32gui.IsWindow(self.hwnd):
log_debug(f" {prefix} hwnd invalid, refreshing...")
if not self._refresh_hwnd():
log_debug(f" {prefix} ✗ Could not refresh hwnd")
return False
# 2. Restore from minimized
if win32gui.IsIconic(self.hwnd):
log_debug(f" {prefix} Window minimized, restoring...")
try:
win32gui.ShowWindow(self.hwnd, win32con.SW_RESTORE)
time.sleep(0.3)
except Exception as e:
log_debug(f" {prefix} ✗ Could not restore: {e}")
return False
# 3. Ensure visible
if not win32gui.IsWindowVisible(self.hwnd):
log_debug(f" {prefix} Window not visible, showing...")
try:
win32gui.ShowWindow(self.hwnd, win32con.SW_SHOW)
time.sleep(0.2)
except Exception as e:
log_debug(f" {prefix} ✗ Could not show: {e}")
return False
# 4. Update window rect (handles chaos moving/resizing the window)
try:
new_rect = win32gui.GetWindowRect(self.hwnd)
if new_rect != self.window_rect:
log_debug(f" {prefix} Window rect changed: {self.window_rect} -> {new_rect}")
self.window_rect = new_rect
except Exception as e:
log_debug(f" {prefix} ⚠ Could not get window rect: {e}")
# 5. Ensure foreground
if not self._ensure_foreground():
log_debug(f" {prefix} ✗ Could not ensure foreground")
return False
log_debug(f" {prefix} ✓ Window viable")
return True
def _retry_with_recovery(self, operation, operation_name: str, max_attempts: int = 3):
"""
Retry an operation with window state recovery.
Args:
operation: Callable that returns True on success
operation_name: Name for logging
max_attempts: Max retry attempts
Returns: True if operation succeeded, False if all attempts failed
"""
for attempt in range(1, max_attempts + 1):
try:
# Pre-check: ensure window is ready
if not self._is_window_ready():
log_debug(f" [retry] {operation_name}: Window not ready (attempt {attempt}/{max_attempts})")
# Try to refresh handle if invalid
if not self.hwnd or not self._is_window_ready():
if not self._refresh_hwnd():
time.sleep(0.5 * attempt) # Exponential backoff
continue
# Try to restore window state
import win32gui
import win32con
if win32gui.IsIconic(self.hwnd):
win32gui.ShowWindow(self.hwnd, win32con.SW_RESTORE)
time.sleep(0.3)
# Ensure foreground before operation
if not self._ensure_foreground():
log_debug(f" [retry] {operation_name}: Could not restore focus (attempt {attempt}/{max_attempts})")
time.sleep(0.5 * attempt)
continue
# Execute operation
result = operation()
if result:
if attempt > 1:
log_debug(f" [retry] {operation_name}: ✓ Succeeded on attempt {attempt}")
return True
else:
log_debug(f" [retry] {operation_name}: Operation returned False (attempt {attempt}/{max_attempts})")
time.sleep(0.5 * attempt)
except Exception as e:
log_debug(f" [retry] {operation_name}: Exception on attempt {attempt}/{max_attempts}: {e}")
time.sleep(0.5 * attempt)
log_debug(f" [retry] {operation_name}: ✗ Failed after {max_attempts} attempts")
return False
def _safe_click(self, x: int, y: int, description: str = "") -> bool:
"""
Safely click at coordinates with foreground verification.
Args:
x, y: Screen coordinates to click
description: Description for logging
Returns: True if click succeeded, False if focus lost and couldn't recover
"""
from pywinauto.mouse import click
import win32gui
# Ensure window is foreground before click
if not self._ensure_foreground():
log_debug(f" [safe_click] ✗ Could not ensure foreground for {description}")
return False
try:
click(coords=(x, y))
log_debug(f" [safe_click] ✓ Clicked at ({x}, {y}) - {description}")
return True
except Exception as e:
log_debug(f" [safe_click] ✗ Click failed for {description}: {e}")
return False
def _verify_prompt_entry(self, expected_prompt: str, max_attempts: int = 3) -> bool:
"""
Verify that the prompt was correctly entered into the input field.
Uses Ctrl+A, Ctrl+C to copy current input and compare with expected.
Returns True if prompt matches, False otherwise.
"""
import pyperclip
from pywinauto.keyboard import send_keys
for attempt in range(1, max_attempts + 1):
try:
# Ensure foreground for clipboard operations
if not self._ensure_foreground():
log_debug(f" [verify] Lost foreground during verification (attempt {attempt})")
time.sleep(0.2)
continue
# Select all and copy
send_keys('^a')
time.sleep(0.1)
send_keys('^c')
time.sleep(0.15)
# Get clipboard content
current_text = pyperclip.paste()
# Compare (strip whitespace for robustness)
expected_clean = expected_prompt.strip()
current_clean = current_text.strip() if current_text else ""
if current_clean == expected_clean:
log_debug(f" [verify] ✓ Prompt verified ({len(current_clean)} chars)")
return True
else:
# Log mismatch details
expected_len = len(expected_clean)
current_len = len(current_clean)
log_debug(f" [verify] ✗ Mismatch (attempt {attempt}): expected {expected_len} chars, got {current_len}")
if current_len < expected_len * 0.5:
log_debug(f" [verify] Prompt appears truncated (less than 50%)")
elif current_len > expected_len:
log_debug(f" [verify] Prompt has extra content")
except Exception as e:
log_debug(f" [verify] Exception during verification: {e}")
time.sleep(0.2)
return False
def _is_template_response(self, response: dict) -> bool:
"""
Detect if ChatGPT returned the template/format instructions instead of a real answer.
Common template patterns:
- guidance contains placeholder like "one-sentence summary"
- priority is "low | medium | high" (the template text)
- action_plan contains generic steps like "step 1", "step 2"
"""
if not isinstance(response, dict):
return False
# Check for template priority (should be one of: low, medium, high - not all three)
priority = response.get("priority", "")
if "|" in str(priority):
log_debug(" [template-check] Detected template priority: contains '|'")
return True
# Check for template guidance phrases
guidance = response.get("guidance", "")
template_guidance_phrases = [
"one-sentence summary",
"your main guidance",
"explanation here",
"what the agent should do",
]
guidance_lower = guidance.lower()
for phrase in template_guidance_phrases:
if phrase in guidance_lower:
log_debug(f" [template-check] Detected template guidance: '{phrase}'")
return True
# Check for template action_plan
action_plan = response.get("action_plan", [])
if isinstance(action_plan, list) and len(action_plan) > 0:
template_plan_phrases = ["step 1", "step 2", "step 3", "action 1", "action 2"]
for step in action_plan:
step_lower = str(step).lower().strip()
if step_lower in template_plan_phrases:
log_debug(f" [template-check] Detected template action_plan step: '{step}'")
return True
# Check for notes_for_darien (wrong field name from template)
if "notes_for_darien" in response:
log_debug(" [template-check] Detected wrong field name: notes_for_darien")
return True
return False
# =========================================================================
# STEP 1: Kill ChatGPT
# =========================================================================
def step1_kill_chatgpt(self, timeout: float = 5.0) -> bool:
"""
Kill ChatGPT if running.
Verification: Process no longer exists.
"""
import psutil
log_debug("STEP 1: Killing ChatGPT if running...")
# Find and kill
killed = False
for proc in psutil.process_iter(['name', 'pid']):
try:
if proc.info['name'] and proc.info['name'].lower() == "chatgpt.exe":
log_debug(f" Terminating PID {proc.info['pid']}")
proc.terminate()
killed = True
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
if not killed:
log_debug(" ChatGPT was not running")
return True
# VERIFY: Wait for process to actually exit
start = time.time()
while (time.time() - start) < timeout:
still_running = False
for proc in psutil.process_iter(['name']):
try:
if proc.info['name'] and proc.info['name'].lower() == "chatgpt.exe":
still_running = True
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
if not still_running:
log_debug(" ✓ VERIFIED: ChatGPT process terminated")
return True
time.sleep(0.3)
log_debug(" ✗ FAILED: ChatGPT still running after timeout")
return False
# =========================================================================
# STEP 2: Start ChatGPT
# =========================================================================
def step2_start_chatgpt(self, timeout: float = 15.0) -> bool:
"""
Start ChatGPT Desktop.
Verification: Window handle found AND window is visible.
"""
import subprocess
import win32gui
import win32process
import psutil
log_debug("STEP 2: Starting ChatGPT...")
# Launch
subprocess.Popen(
'start "" "ChatGPT"',
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# VERIFY: Wait for window to appear
start = time.time()
while (time.time() - start) < timeout:
hwnd = self._find_chatgpt_hwnd()
if hwnd:
# Additional check: window must be visible
if win32gui.IsWindowVisible(hwnd):
self.hwnd = hwnd
self.window_rect = win32gui.GetWindowRect(hwnd)
title = win32gui.GetWindowText(hwnd)
log_debug(f" ✓ VERIFIED: Window found (hwnd={hwnd}, title='{title}')")
# Extra wait for UI to fully initialize
time.sleep(1.5)
return True
time.sleep(0.5)
log_debug(" ✗ FAILED: Window not found after timeout")
return False
def _find_chatgpt_hwnd(self):
"""Find ChatGPT window handle."""
import win32gui
import win32process
import psutil
result = [None]
def callback(hwnd, _):
if win32gui.IsWindow(hwnd) and win32gui.IsWindowVisible(hwnd):
try:
_, pid = win32process.GetWindowThreadProcessId(hwnd)
proc = psutil.Process(pid)
if proc.name().lower() == "chatgpt.exe":
title = win32gui.GetWindowText(hwnd)
if title and "IME" not in title and "Default" not in title:
result[0] = hwnd
return False # Stop enumeration
except:
pass
return True
try:
win32gui.EnumWindows(callback, None)
except Exception as e:
# EnumWindows failed (permission or runtime issue); try fallback method
log_debug(f" EnumWindows failed: {e} - trying psutil + pywinauto fallback")
# Call a smaller fallback routine to avoid nested try/except confusion
try:
return self._find_chatgpt_hwnd_fallback(psutil)
except Exception as e2:
log_debug(f" Fallback via pywinauto failed: {e2}")
# If fallback failed, return whatever we have (likely None)
return result[0]
return result[0]
def _find_chatgpt_hwnd_fallback(self, psutil_module):
"""Fallback: use psutil + pywinauto to find a top window for chatgpt.exe"""
from pywinauto import Application
for proc in psutil_module.process_iter(['name', 'pid']):
try:
if proc.info['name'] and proc.info['name'].lower() == 'chatgpt.exe':
pid = proc.info['pid']
try:
app = Application(backend='uia').connect(process=pid)
top = app.top_window()
hwnd = top.handle
if hwnd:
return hwnd
except Exception:
continue
except Exception:
continue
return None
# =========================================================================
# STEP 3: Focus ChatGPT
# =========================================================================
def step3_focus_chatgpt(self, timeout: float = 5.0) -> bool:
"""
Bring ChatGPT to foreground.
Verification: ChatGPT is the foreground window.
"""
import win32gui
import win32con
log_debug("STEP 3: Focusing ChatGPT window...")
if not self.hwnd:
log_debug(" ✗ FAILED: No window handle")
return False
# Try multiple methods to bring to front
start = time.time()
attempt = 0
while (time.time() - start) < timeout:
attempt += 1
try:
# Restore if minimized
if win32gui.IsIconic(self.hwnd):
win32gui.ShowWindow(self.hwnd, win32con.SW_RESTORE)
time.sleep(0.3)
# Method 1: SetForegroundWindow
win32gui.SetForegroundWindow(self.hwnd)
time.sleep(0.3)
# VERIFY: Check if we're now foreground
fg_hwnd = win32gui.GetForegroundWindow()
if fg_hwnd == self.hwnd:
log_debug(f" ✓ VERIFIED: ChatGPT is foreground (attempt {attempt})")
# Update rect in case window moved
self.window_rect = win32gui.GetWindowRect(self.hwnd)
return True
# Method 2: Try BringWindowToTop
win32gui.BringWindowToTop(self.hwnd)
time.sleep(0.3)
fg_hwnd = win32gui.GetForegroundWindow()
if fg_hwnd == self.hwnd:
log_debug(f" ✓ VERIFIED: ChatGPT is foreground (attempt {attempt}, method 2)")
self.window_rect = win32gui.GetWindowRect(self.hwnd)
return True
except Exception as e:
log_debug(f" Focus attempt {attempt} failed: {e}")
# If we're still not foreground, attempt UIA set_focus or title bar click as a last-resort
try:
from pywinauto import Application
app = Application(backend='uia').connect(handle=self.hwnd)
top = app.top_window()
try:
top.set_focus()
time.sleep(0.25)
if win32gui.GetForegroundWindow() == self.hwnd:
self.window_rect = win32gui.GetWindowRect(self.hwnd)
log_debug(f" ✓ VERIFIED: ChatGPT is foreground after UIA set_focus (attempt {attempt})")
return True
except Exception:
# Fallthrough to click title bar
pass
except Exception:
pass
# Click title-bar center as a fallback to force focus
try:
rect = win32gui.GetWindowRect(self.hwnd)
title_click_x = rect[0] + 40
title_click_y = rect[1] + 10
from pywinauto.mouse import click
click(coords=(title_click_x, title_click_y))
time.sleep(0.25)
if win32gui.GetForegroundWindow() == self.hwnd:
self.window_rect = win32gui.GetWindowRect(self.hwnd)
log_debug(f" ✓ VERIFIED: ChatGPT is foreground after title click (attempt {attempt})")
return True
except Exception:
pass
time.sleep(0.3)
log_debug(" ✗ FAILED: Could not focus window after timeout")
return False
# =========================================================================
# STEP 4: Open Sidebar
# =========================================================================
def step4_open_sidebar(self, timeout: float = 3.0) -> bool:
"""
Open the sidebar by clicking hamburger menu.
Verification: Sidebar region has light gray background (~249,249,249).
"""
import win32gui
from pywinauto.mouse import click
from PIL import ImageGrab
import numpy as np
log_phase(4, "open_sidebar", "START")
log_debug("STEP 4: Opening sidebar...")
# Pre-step validation
if not self._ensure_window_viable("step4"):
log_phase(4, "open_sidebar", "FAIL:not_viable")
log_debug(" ✗ FAILED: Window not viable")
return False
rect = self.window_rect
# First check if sidebar is already open
if self._is_sidebar_open():
log_phase(4, "open_sidebar", "OK:already_open")
log_debug(" ✓ Sidebar already open")
return True
# Click hamburger menu (top-left)
menu_x = rect[0] + 30
menu_y = rect[1] + 70
log_debug(f" Clicking hamburger at ({menu_x}, {menu_y})")
# Safe click with foreground verification
if not self._safe_click(menu_x, menu_y, "hamburger menu"):
log_phase(4, "open_sidebar", "FAIL:click_failed")
log_debug(" ✗ FAILED: Could not click hamburger menu")
return False
# VERIFY: Wait for sidebar to appear
start = time.time()
while (time.time() - start) < timeout:
time.sleep(0.3)
if self._is_sidebar_open():
log_phase(4, "open_sidebar", "OK")
log_debug(" ✓ VERIFIED: Sidebar is open")
return True
log_phase(4, "open_sidebar", "FAIL:timeout")
log_debug(" ✗ FAILED: Sidebar not detected after click")
return False
def _is_sidebar_open(self) -> bool:
"""Fast check if sidebar is open by looking for X close button."""
from PIL import ImageGrab
import numpy as np
if not self.window_rect:
return False
rect = self.window_rect
# X button is at approximately x+275, y+58 (center of X)
# Capture a 20x20 area around it
x_center = rect[0] + 275
y_center = rect[1] + 58
try:
area = (x_center - 10, y_center - 10, x_center + 10, y_center + 10)
img = ImageGrab.grab(bbox=area)
pixels = np.array(img)
# The X icon has dark pixels (gray lines on light background)
# Count pixels darker than 180
dark_pixels = np.sum(pixels < 180)
# If we see dark pixels (>30), the X button is visible = sidebar open
is_open = dark_pixels > 30
log_debug(f" Sidebar check: {dark_pixels} dark pixels -> {'OPEN' if is_open else 'CLOSED'}")
return is_open
except Exception as e:
log_debug(f" Sidebar check failed: {e}")
return False
# =========================================================================
# STEP 5: Click Project
# =========================================================================
def step5_click_project(self, project_name: str, timeout: float = 10.0) -> bool:
"""
Find and click on a project in the sidebar.
Verification: Window title contains project name OR we detect hover on target.
"""
import win32gui
log_phase(5, f"click_project:{project_name}", "START")
log_debug(f"STEP 5: Clicking project '{project_name}'...")
# Pre-step validation
if not self._ensure_window_viable("step5"):
log_phase(5, f"click_project:{project_name}", "FAIL:not_viable")
log_debug(" ✗ FAILED: Window not viable")
return False
# Retry logic for chaos resilience
max_attempts = 3
for attempt in range(max_attempts):
# Ensure foreground before every attempt (including first)
if not self._ensure_foreground():
log_debug(f" ⚠ Could not ensure foreground for attempt {attempt + 1}")
time.sleep(0.5)
continue
if attempt > 0:
log_phase(5, f"click_project:{project_name}", f"RETRY:{attempt+1}")
log_debug(f" Retry attempt {attempt + 1}/{max_attempts}...")
time.sleep(0.5)
result = self._find_and_click_sidebar_item(project_name, timeout / max_attempts)
if result:
# Verify by checking window title (best-effort)
time.sleep(0.5)
title = win32gui.GetWindowText(self.hwnd)
if project_name.lower() in title.lower():
log_phase(5, f"click_project:{project_name}", "OK:title_match")
log_debug(f" ✓ VERIFIED: Window title is '{title}'")
return True
# Title may not change; validate via hover detection + OCR nearest text
if self._verify_sidebar_selection(project_name):
log_phase(5, f"click_project:{project_name}", "OK:hover_match")
log_debug(" ✓ VERIFIED: Hover/OCR matches intended project")
return True
log_debug(" ⚠ Post-click validation did not match - will retry")
# Continue to next attempt instead of returning False immediately
continue
log_phase(5, f"click_project:{project_name}", "FAIL:not_found")
log_debug(f" ✗ FAILED: Could not find/click '{project_name}' after {max_attempts} attempts")
return False
def _verify_sidebar_selection(self, target: str) -> bool:
"""Verify the currently highlighted sidebar item text matches target using OCR + hover detection."""
from hover_detection import detect_highlighted_item
from ocr_extraction import get_ocr
from fuzzy_match import similarity_ratio
from PIL import ImageGrab
try:
hi = detect_highlighted_item(self.hwnd)
except Exception:
hi = None
if not hi:
return False
rect = self.window_rect
if not rect:
return False
window_width = rect[2] - rect[0]
sidebar_width = int(window_width * 0.28)
sidebar_left = rect[0]
sidebar_top = rect[1] + 80
sidebar_bottom = rect[3] - 50
sidebar_right = rect[0] + sidebar_width
try:
screenshot = ImageGrab.grab(bbox=(sidebar_left, sidebar_top, sidebar_right, sidebar_bottom))
except Exception:
return False
import tempfile, os
ocr = get_ocr()
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
temp_path = f.name
screenshot.save(temp_path)
try:
results = ocr.predict(temp_path)
finally:
try:
os.unlink(temp_path)
except Exception:
pass
# Pick text nearest to highlighted y
highlight_y = hi['screen_coords'][1]
best_text = None
best_dist = 1e9
for res in results:
if 'rec_texts' not in res or 'rec_boxes' not in res:
continue
texts = res['rec_texts']
boxes = res['rec_boxes']
for text, box in zip(texts, boxes):
box_center_y = sidebar_top + (box[1] + box[3]) / 2
dist = abs(box_center_y - highlight_y)
if dist < best_dist:
best_dist = dist
best_text = text
if not best_text:
log_debug(f" [verify] No OCR text found near highlight")
return False
match_score = similarity_ratio(target.lower(), best_text.lower())
matched = match_score >= 0.7 or (target.lower() in best_text.lower())
log_debug(f" [verify] Best text='{best_text[:40]}', match_score={match_score:.2f}, matched={matched}")
return matched
# =========================================================================
# STEP 6: Click Conversation
# =========================================================================
def step6_click_conversation(self, conversation_name: str, timeout: float = 10.0) -> bool:
"""
Find and click on a conversation in the PROJECT VIEW (main content area).
After clicking a project, sidebar closes and conversations appear in center.
Strategy (in order):
1. Scan project view with scroll (primary method)
2. Fallback: Ctrl+K search (if scan fails)
Verification: Window title contains conversation name.
"""
import win32gui
log_phase(6, f"click_conversation:{conversation_name}", "START")
log_debug(f"STEP 6: Clicking conversation '{conversation_name}'...")
# Pre-step validation
if not self._ensure_window_viable("step6"):
log_phase(6, f"click_conversation:{conversation_name}", "FAIL:not_viable")
log_debug(" ✗ FAILED: Window not viable")
return False
# Retry logic for chaos resilience
max_attempts = 3
for attempt in range(max_attempts):
# Ensure foreground before every attempt (including first)
if not self._ensure_foreground():
log_debug(f" ⚠ Could not ensure foreground for attempt {attempt + 1}")
time.sleep(0.5)
continue
if attempt > 0:
log_phase(6, f"click_conversation:{conversation_name}", f"RETRY:{attempt+1}")
log_debug(f" Retry attempt {attempt + 1}/{max_attempts}...")
time.sleep(0.5)
# After project click, conversations are in MAIN content area, not sidebar
result = self._find_and_click_project_item(conversation_name, timeout / max_attempts)
if result:
# Verify by checking window title
time.sleep(0.8)
title = win32gui.GetWindowText(self.hwnd)
# Fuzzy check
if self._fuzzy_match(conversation_name, title):
log_phase(6, f"click_conversation:{conversation_name}", "OK:title_match")
log_debug(f" ✓ VERIFIED: Window title is '{title}'")
return True
else:
log_phase(6, f"click_conversation:{conversation_name}", "OK:no_title_verify")
log_debug(f" ⚠ Click done but title='{title}' doesn't match '{conversation_name}'")
# Still return true - might be OK
return True
# Primary scan failed - try Ctrl+K search as fallback
log_phase(6, f"click_conversation:{conversation_name}", "FALLBACK:ctrl_k_search")
log_debug(f" Primary scan failed, trying Ctrl+K search fallback...")
if self._search_and_open_conversation(conversation_name):
# Verify by checking window title
time.sleep(0.8)
title = win32gui.GetWindowText(self.hwnd)
if self._fuzzy_match(conversation_name, title):
log_phase(6, f"click_conversation:{conversation_name}", "OK:ctrl_k_match")
log_debug(f" ✓ VERIFIED via Ctrl+K: Window title is '{title}'")
return True
else:
log_phase(6, f"click_conversation:{conversation_name}", "OK:ctrl_k_no_verify")
log_debug(f" ⚠ Ctrl+K done but title='{title}' doesn't match")
return True
log_phase(6, f"click_conversation:{conversation_name}", "FAIL:not_found_after_fallback")
log_debug(f" ✗ FAILED: Could not find/click '{conversation_name}' after {max_attempts} attempts + Ctrl+K fallback")
return False
def _search_and_open_conversation(self, conversation_name: str) -> bool:
"""
Use Ctrl+K search to find and open a conversation directly.
This is a fallback when project view scanning fails.
"""
from pywinauto.keyboard import send_keys
import win32gui
log_debug(f" [ctrl+k] Searching for '{conversation_name}'...")
try:
# Ensure foreground
if not self._ensure_foreground():
log_debug(" [ctrl+k] Could not ensure foreground")
return False
# Press Ctrl+K to open search
send_keys("^k", pause=0.05)
time.sleep(0.5)
# Type the conversation name
# Use a shorter search term for better matching
search_term = conversation_name[:30] if len(conversation_name) > 30 else conversation_name
send_keys(search_term, pause=0.02, with_spaces=True)
time.sleep(0.8)
# Press Enter to select first result
send_keys("{ENTER}", pause=0.05)
time.sleep(0.5)
# Check if we landed somewhere
if self._ensure_foreground():
title = win32gui.GetWindowText(self.hwnd)
log_debug(f" [ctrl+k] After search, title is '{title}'")
return True
return False
except Exception as e:
log_debug(f" [ctrl+k] Error: {e}")
return False
def _fuzzy_match(self, target: str, text: str, threshold: float = 0.6) -> bool:
"""Simple fuzzy match - check if words overlap."""
target_words = set(target.lower().split())
text_words = set(text.lower().split())
if not target_words:
return False
overlap = len(target_words & text_words)
ratio = overlap / len(target_words)
return ratio >= threshold or target.lower() in text.lower()
def _find_and_click_sidebar_item(self, target: str, timeout: float = 10.0) -> bool:
"""
Find and click target item in sidebar using OCR (fast method).
OCRs the entire sidebar at once and clicks directly on the target.
"""
import win32api
import win32gui
from pywinauto.mouse import click
from PIL import ImageGrab
import tempfile
import os
from ocr_extraction import get_ocr
from fuzzy_match import similarity_ratio
from hover_detection import detect_highlighted_item
ocr = get_ocr()
rect = self.window_rect
window_width = rect[2] - rect[0]
window_height = rect[3] - rect[1]
# Sidebar region (left 28%, skip title bar area)
sidebar_width = int(window_width * 0.28)
sidebar_left = rect[0]
sidebar_right = rect[0] + sidebar_width
sidebar_top = rect[1] + 80 # Below title bar
sidebar_bottom = rect[3] - 50 # Above bottom
log_debug(f" Scanning sidebar for '{target}'...")
start_time = time.time()
scroll_count = 0
max_scrolls = 5
while (time.time() - start_time) < timeout:
# Ensure foreground before screenshot - critical for chaos resilience
if not self._ensure_foreground():
log_debug(f" ⚠ Lost foreground before sidebar screenshot, retrying...")
time.sleep(0.3)
continue
# Capture entire sidebar
try:
screenshot = ImageGrab.grab(bbox=(sidebar_left, sidebar_top, sidebar_right, sidebar_bottom))
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
temp_path = f.name
screenshot.save(temp_path)
results = ocr.predict(temp_path)
os.unlink(temp_path)
# Find target in OCR results
for res in results:
if 'rec_texts' not in res or 'rec_boxes' not in res:
continue
texts = res['rec_texts']
boxes = res['rec_boxes']
scores = res.get('rec_scores', [1.0] * len(texts))
for text, box, score in zip(texts, boxes, scores):
match_score = similarity_ratio(target.lower(), text.lower())
log_debug(f" At y={int(box[1])}: '{text[:30]}...' (score={match_score:.2f})")
if match_score >= 0.7 or target.lower() in text.lower():
log_debug(f" FOUND '{target}' - clicking!")
# Calculate click position from box
box_center_x = (box[0] + box[2]) / 2
box_center_y = (box[1] + box[3]) / 2
click_x = int(sidebar_left + box_center_x)
click_y = int(sidebar_top + box_center_y)
# Safe click with foreground verification
if not self._safe_click(click_x, click_y, f"sidebar item '{target}'"):
log_debug(f" ✗ Safe click failed for '{target}'")
continue # Try next match
time.sleep(0.35)
# Post-click validation: ensure the highlighted row matches intended target.
try:
hi = detect_highlighted_item(self.hwnd)
except Exception as _e:
hi = None
if hi and 'screen_coords' in hi:
highlighted_y = hi['screen_coords'][1]
delta_y = highlighted_y - click_y
# If highlight is more than ~18px away from the clicked center, we likely selected a neighbor
if abs(delta_y) > 18:
log_debug(f" ⚠ CORRECTION NEEDED: Highlight at y={highlighted_y}, click was y={click_y}, delta={delta_y}px")
# Try clicking one row up or down depending on where highlight landed
# Sidebar row height is ~35px; use 28px as conservative step
step = 28
corrective_y = click_y - step if delta_y > 0 else click_y + step
direction = "above" if delta_y > 0 else "below"
log_debug(f" Applying corrective click {step}px {direction} original (y={corrective_y})")
if self._safe_click(click_x, corrective_y, f"corrective click for '{target}' {direction}"):
time.sleep(0.3)
log_debug(f" ✓ Corrective click completed for target '{target}'")
else:
log_debug(f" ✗ Corrective click failed for '{target}'")
else:
log_debug(f" ✓ Highlight aligned (delta={delta_y}px, within threshold)")
return True
except Exception as e:
log_debug(f" Error scanning sidebar: {e}")
# Not found - scroll down
scroll_count += 1
if scroll_count > max_scrolls:
break
log_debug(f" Scrolling down (attempt {scroll_count})...")
self._scroll_sidebar("down")
time.sleep(0.4)
return False
def _find_and_click_project_item(self, target: str, timeout: float = 10.0) -> bool:
"""
Scan PROJECT VIEW (main content area) looking for conversation.
After clicking a project, conversations appear in center, not sidebar.
Includes scroll-and-retry logic for when target is below visible area.
"""
import win32api
from pywinauto.mouse import click, scroll
from PIL import ImageGrab
import tempfile
import os
# Import OCR
from ocr_extraction import get_ocr
from fuzzy_match import fuzzy_contains, similarity_ratio
ocr = get_ocr()
rect = self.window_rect
window_width = rect[2] - rect[0]
window_height = rect[3] - rect[1]
# Project view: conversations are in the CENTER of the window
# Expanded scan area to catch items lower in the list
# x=10% to 90%, y=20% to 85% (was 30% to 75%)
content_left = rect[0] + int(window_width * 0.10)
content_right = rect[0] + int(window_width * 0.90)
content_top = rect[1] + int(window_height * 0.20)
content_bottom = rect[1] + int(window_height * 0.85)
# Center point for scrolling
scroll_x = rect[0] + int(window_width * 0.5)
scroll_y = rect[1] + int(window_height * 0.5)
max_scroll_attempts = 5 # Try scrolling down up to 5 times (chaos can create many conversations)
for scroll_attempt in range(max_scroll_attempts + 1):
if scroll_attempt == 1:
# First scroll: scroll to TOP to start fresh, then search from there
log_debug(f" Scrolling to TOP first to find target...")
if self._ensure_foreground():
win32api.SetCursorPos((scroll_x, scroll_y))
time.sleep(0.1)
# Scroll up a lot to get to top
for _ in range(3):
scroll(coords=(scroll_x, scroll_y), wheel_dist=5) # Scroll up
time.sleep(0.2)
time.sleep(0.3)
elif scroll_attempt > 1:
log_debug(f" Scrolling down (attempt {scroll_attempt - 1}/{max_scroll_attempts - 1})...")
# Scroll down to reveal more items
if not self._ensure_foreground():
log_debug(f" ✗ Could not ensure foreground for scroll")
continue
win32api.SetCursorPos((scroll_x, scroll_y))
time.sleep(0.1)
scroll(coords=(scroll_x, scroll_y), wheel_dist=-5) # Scroll down more aggressively
time.sleep(0.5) # Wait for UI to update
log_debug(f" Scanning project view for '{target}'...")
log_debug(f" Content area: ({content_left}, {content_top}) to ({content_right}, {content_bottom})")
# Ensure foreground before screenshot - critical for chaos resilience
if not self._ensure_foreground():
log_debug(f" ✗ Could not ensure foreground for project view screenshot")
continue
# Capture the content area
try:
screenshot = ImageGrab.grab(bbox=(content_left, content_top, content_right, content_bottom))
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
temp_path = f.name
screenshot.save(temp_path)
results = ocr.predict(temp_path)
os.unlink(temp_path)
# Find all text items
for res in results:
if 'rec_texts' not in res or 'rec_boxes' not in res:
continue
texts = res['rec_texts']
boxes = res['rec_boxes']
scores = res.get('rec_scores', [1.0] * len(texts))
for text, box, score in zip(texts, boxes, scores):
# Check if this matches our target
match_score = similarity_ratio(target.lower(), text.lower())
if match_score >= 0.7 or target.lower() in text.lower():
log_debug(f" FOUND '{text}' (match={match_score:.2f})")
# Calculate click position from box
# Box is in screenshot coordinates, need to add content_left/top
box_center_x = (box[0] + box[2]) / 2
box_center_y = (box[1] + box[3]) / 2
click_x = int(content_left + box_center_x)
click_y = int(content_top + box_center_y)
log_debug(f" Clicking at ({click_x}, {click_y})")
# Safe click with foreground verification
return self._safe_click(click_x, click_y, f"project item '{target}'")
else:
log_debug(f" '{text}' (match={match_score:.2f}) - no match")
log_debug(f" Target '{target}' not found in current view")
except Exception as e:
log_debug(f" Error scanning project view: {e}")
log_debug(f" Target '{target}' not found after {max_scroll_attempts} scroll attempts")
return False
def _capture_sidebar(self):
"""Capture sidebar region."""
from PIL import ImageGrab
if not self.window_rect:
return None
rect = self.window_rect
window_width = rect[2] - rect[0]
sidebar_width = int(window_width * 0.28)
sidebar_rect = (rect[0], rect[1], rect[0] + sidebar_width, rect[3])
try:
return ImageGrab.grab(bbox=sidebar_rect)
except:
return None
def _scroll_sidebar(self, direction: str = "down"):
"""Scroll sidebar."""
import win32api
from pywinauto.mouse import scroll
rect = self.window_rect
window_width = rect[2] - rect[0]
window_height = rect[3] - rect[1]
sidebar_width = int(window_width * 0.28)
sidebar_x = rect[0] + (sidebar_width // 2)
sidebar_y = rect[1] + (window_height // 2)
win32api.SetCursorPos((sidebar_x, sidebar_y))
time.sleep(0.1)
wheel_dist = -3 if direction == "down" else 3
scroll(coords=(sidebar_x, sidebar_y), wheel_dist=wheel_dist)
# =========================================================================
# STEP 7: Focus Text Input (now split into focus + send helpers)
# =========================================================================
def _is_edit_focused(self) -> bool:
"""Return True if current UIA focused control is an Edit/Document/Text element."""
try:
# Use Windows UIA directly via comtypes to get the focused element
import comtypes.client
UIAutomationCore = comtypes.client.GetModule("UIAutomationCore.dll")
from comtypes.gen.UIAutomationClient import CUIAutomation, IUIAutomation
uia = comtypes.client.CreateObject(CUIAutomation)
focused = uia.GetFocusedElement()
if not focused:
log_debug("[focus] _is_edit_focused: no focused element")
return False
ctype_id = focused.CurrentControlType
name = focused.CurrentName or ''
class_name = focused.CurrentClassName or ''
# Control type IDs: Edit=50004, Document=50030, Text=50020, Custom=50025, Group=50026
type_names = {50004: 'Edit', 50030: 'Document', 50020: 'Text', 50025: 'Custom', 50026: 'Group'}
ctype_name = type_names.get(ctype_id, f'Unknown({ctype_id})')
log_debug(f"[focus] focused element: type={ctype_name}, name={name[:50]!r}, class={class_name!r}")
# Accept Edit, Document, or Group (Chrome renders input as Group sometimes)
# Also accept if class is Chrome_RenderWidgetHostHWND (the Electron/Chrome input surface)
if ctype_id in (50004, 50030):
return True
if ctype_id == 50026 and class_name == 'Chrome_RenderWidgetHostHWND':
return True
return False
except Exception as e:
log_debug(f"[focus] _is_edit_focused error: {e}")
return False
def ensure_input_focus(self) -> bool:
"""
Focus the text input without typing.
Strategy (in order): direct click grid, UIA Edit search, calibration variance,
guarded minimal Tab traversal.
Returns True once an editable control is focused.
"""
from pywinauto.mouse import click
import win32gui
log_debug("[focus] ensure_input_focus start")
if not self.hwnd:
log_debug("[focus] ✗ FAILED: No hwnd")
return False
rect = self.window_rect
# Refresh rect if missing
if rect is None and self.hwnd:
try:
rect = win32gui.GetWindowRect(self.hwnd)
self.window_rect = rect
except Exception as e:
log_debug(f"[focus] Failed to refresh window rect: {e}")
rect = None
if rect is None:
# Try to rediscover window
new_hwnd = self._find_chatgpt_hwnd()
if new_hwnd:
self.hwnd = new_hwnd
try:
rect = win32gui.GetWindowRect(new_hwnd)
self.window_rect = rect
except Exception:
rect = None
if rect is None:
log_debug("[focus] ✗ FAILED: No window rect")
return False
# Check if already focused
if self._is_edit_focused():
log_debug("[focus] already focused on edit/document — skipping strategies")
return True
window_width = rect[2] - rect[0]
window_height = rect[3] - rect[1]
input_x = rect[0] + int(window_width * 0.5)
input_y = rect[1] + int(window_height * 0.83)
log_debug(f"[focus] window rect: {rect}, input target: ({input_x}, {input_y})")
# Strategy 1: Grid clicks
log_debug("[focus] trying strategy: grid_click")
offsets = [0, -10, 10, -20, 20]
for ox in offsets:
for oy in offsets:
try:
win32gui.SetForegroundWindow(self.hwnd)
except Exception:
pass
try:
click(coords=(input_x + ox, input_y + oy))
except Exception as e:
log_debug(f"[focus] grid click error at offset ({ox},{oy}): {e}")
time.sleep(0.15)
if self._is_edit_focused():
log_debug("[focus] ✓ success via grid_click")
return True
log_debug("[focus] grid_click did not yield edit focus")
# Strategy 2: UIA Edit fallback
log_debug("[focus] trying strategy: uia_edit")
try:
from pywinauto import Application
app = Application(backend='uia').connect(handle=self.hwnd)
main_win = app.window(handle=self.hwnd)
edits = main_win.descendants(control_type='Edit')
log_debug(f"[focus] UIA found {len(edits)} Edit controls")
if edits:
edits[0].set_focus()
time.sleep(0.25)
if self._is_edit_focused():
log_debug("[focus] ✓ success via uia_edit")
return True
else:
log_debug("[focus] uia_edit set_focus did not yield edit focus")
else:
log_debug("[focus] no Edit controls found via UIA")
except Exception as e:
log_debug(f"[focus] uia_edit failed: {e}")
# Strategy 3: Calibration variance heuristic
log_debug("[focus] trying strategy: variance_click")
try:
from PIL import ImageGrab
import numpy as np
width = rect[2] - rect[0]
height = rect[3] - rect[1]
band_top = rect[1] + int(height * 0.80)
band_bottom = rect[1] + int(height * 0.86)
band_left = rect[0] + int(width * 0.15)
band_right = rect[0] + int(width * 0.85)
img = ImageGrab.grab(bbox=(band_left, band_top, band_right, band_bottom))
arr = np.array(img)
col_var = arr.var(axis=(0, 2))
if len(col_var) > 0:
peak_x_offset = int(np.argmax(col_var))
focus_x = band_left + peak_x_offset
focus_y = band_top + (band_bottom - band_top) // 2
log_debug(f"[focus] variance peak at x_offset={peak_x_offset}, clicking ({focus_x}, {focus_y})")
try:
win32gui.SetForegroundWindow(self.hwnd)
except Exception:
pass
click(coords=(focus_x, focus_y))
time.sleep(0.25)
if self._is_edit_focused():
log_debug("[focus] ✓ success via variance_click")
return True
else:
log_debug("[focus] variance_click did not yield edit focus")
else:
log_debug("[focus] variance array empty")
except Exception as e:
log_debug(f"[focus] variance_click failed: {e}")
# Strategy 4: Guarded minimal Tab traversal
log_debug("[focus] trying strategy: tab_traversal")
try:
if not self._is_edit_focused() and self._keyboard_fallback_enabled:
from pywinauto.keyboard import send_keys
for i in range(3):
try:
win32gui.SetForegroundWindow(self.hwnd)
except Exception:
pass
send_keys('{TAB}')
time.sleep(0.12)
if self._is_edit_focused():
log_debug(f"[focus] ✓ success via tab_traversal (Tab #{i+1})")
return True
# Final direct click after tabs
log_debug("[focus] tab_traversal: 3 tabs done, trying final click")
try:
win32gui.SetForegroundWindow(self.hwnd)
except Exception:
pass
click(coords=(input_x, input_y))
time.sleep(0.2)
if self._is_edit_focused():
log_debug("[focus] ✓ success via tab_traversal + final click")
return True
else:
log_debug("[focus] tab_traversal + final click did not yield edit focus")
else:
log_debug("[focus] tab_traversal skipped (already focused or disabled)")
except Exception as e:
log_debug(f"[focus] tab_traversal failed: {e}")
# Final check
if self._is_edit_focused():
log_debug("[focus] ✓ focused after all strategies (late detection)")
return True
log_debug("[focus] ✗ FAILED: could not focus input after all strategies")
return False
def step7_send_prompt(self, prompt: str, verify_entry: bool = True) -> bool:
"""Focus input (ensure_input_focus) then readiness probe and send prompt.
Args:
prompt: The prompt text to send
verify_entry: If True, verify prompt was entered correctly before sending
"""
log_phase(7, "send_prompt", "START")
log_debug("STEP 7+8: Focus & Send Prompt...")
from pywinauto.mouse import click
import win32gui
# Pre-step validation
if not self._ensure_window_viable("step7"):
log_phase(7, "send_prompt", "FAIL:not_viable")
log_debug(" ✗ FAILED: Window not viable")
return False
if not self.ensure_input_focus():
log_phase(7, "send_prompt", "FAIL:no_focus")
return False
# Determine input center for re-clicks
rect = self.window_rect
if not rect:
return False
window_width = rect[2] - rect[0]
window_height = rect[3] - rect[1]
input_x = rect[0] + int(window_width * 0.5)
input_y = rect[1] + int(window_height * 0.83)
import pyperclip
from pywinauto.keyboard import send_keys
try:
previous_clipboard = pyperclip.paste()
except Exception:
previous_clipboard = None
max_attempts = 8
for attempt in range(max_attempts):
log_debug(f" Probe attempt {attempt+1}/{max_attempts} (Ctrl+C)")
# Ensure foreground before each probe attempt
if not self._ensure_foreground():
log_debug(f" Lost foreground at attempt {attempt+1}, retrying...")
time.sleep(0.3)
continue
try:
send_keys('^c')
except Exception as e:
log_debug(f" Ctrl+C failed: {e}")
time.sleep(0.12)
try:
if not self._is_generating():
log_debug(" Idle (arrow) detected — entering prompt")
# === CRITICAL SECTION: Prompt Entry with Verification ===
# This is the timing-critical part - maintain focus throughout
# Step 1: Clear any existing text
if not self._ensure_foreground():
log_debug(" Lost foreground before clear, retrying...")
continue
send_keys('^a') # Select all
time.sleep(0.05)
send_keys('{DELETE}') # Clear
time.sleep(0.1)
# Step 2: Paste prompt
if not self._ensure_foreground():
log_debug(" Lost foreground before paste, retrying...")
continue
pyperclip.copy(prompt)
time.sleep(0.06)
send_keys('^v')
time.sleep(0.15)
# Step 3: Verify prompt entry (if enabled)
if verify_entry:
if not self._verify_prompt_entry(prompt):
log_debug(" ✗ Prompt verification failed, clearing and retrying...")
# Clear and retry
send_keys('^a')
time.sleep(0.05)
send_keys('{DELETE}')
time.sleep(0.2)
continue
# Step 4: Send the prompt
if not self._ensure_foreground():
log_debug(" Lost foreground before Enter, retrying...")
continue
send_keys('{ENTER}')
time.sleep(0.4)
# === END CRITICAL SECTION ===
if previous_clipboard is not None:
try:
pyperclip.copy(previous_clipboard)
except Exception:
pass
log_phase(7, "send_prompt", "OK")
log_debug(" ✓ Prompt sent (verified)" if verify_entry else " ✓ Prompt sent")
return True
except Exception:
pass
# Re-click input before retry (use _safe_click)
if not self._safe_click(input_x, input_y, "re-click input"):
log_debug(" Safe re-click failed, continuing...")
time.sleep(0.2)
if previous_clipboard is not None:
try:
pyperclip.copy(previous_clipboard)
except Exception:
pass
log_phase(7, "send_prompt", "FAIL:timeout")
log_debug(" ✗ FAILED: Prompt not sent after readiness attempts")
return False
# =========================================================================
# STEP 8: Send Prompt
# =========================================================================
# =========================================================================
# STEP 9: Wait for Response
# =========================================================================
def step9_wait_for_response(self, timeout: float = 120.0) -> bool:
"""
Wait for ChatGPT to finish generating.
Detection:
- 'generating' (stop button) = still generating
- 'idle' (waveform) = complete, input empty
- 'ready' (arrow) = chaos typed text, need to clear input
"""
from PIL import ImageGrab
import numpy as np
log_phase(9, "wait_for_response", "START")
log_debug("STEP 9: Waiting for response...")
# Pre-step validation
if not self._ensure_window_viable("step9"):
log_phase(9, "wait_for_response", "FAIL:not_viable")
log_debug(" ✗ FAILED: Window not viable")
return False
start_time = time.time()
# Initial wait for generation to start
time.sleep(1.0)
generation_started = False
consecutive_idle = 0
foreground_failures = 0
max_foreground_failures = 20 # Increased tolerance for chaos
while (time.time() - start_time) < timeout:
# Ensure window is visible/foreground for screenshot
if not self._ensure_foreground():
foreground_failures += 1
log_debug(f" ⚠ Lost foreground ({foreground_failures}/{max_foreground_failures})")
if foreground_failures >= max_foreground_failures:
log_phase(9, "wait_for_response", "FAIL:foreground_lost")
log_debug(f" ✗ Too many foreground failures, aborting wait")
return False
time.sleep(0.5)
continue
# Get detailed button state
state = self._get_input_button_state()
elapsed = time.time() - start_time
if state == 'generating':
generation_started = True
consecutive_idle = 0
foreground_failures = 0 # Reset on success
log_debug(f" [{elapsed:.0f}s] GENERATING (stop button visible)")
elif state == 'ready':
# Arrow visible - chaos typed text into input
# This could happen during or after generation
log_debug(f" [{elapsed:.0f}s] ARROW detected - chaos text in input")
foreground_failures = 0 # Reset on success
# Don't count this as idle - clear the input first
self._clear_input_if_needed()
# Don't increment consecutive_idle - we need to recheck after clearing
time.sleep(0.3)
continue
elif state == 'idle':
consecutive_idle += 1
foreground_failures = 0 # Reset on success
log_debug(f" [{elapsed:.0f}s] IDLE (waveform visible) - consecutive: {consecutive_idle}")
# Need several consecutive idle readings after generation started
if generation_started and consecutive_idle >= 3:
log_phase(9, "wait_for_response", f"OK:{elapsed:.0f}s")
log_debug(f" ✓ VERIFIED: Response complete after {elapsed:.0f}s")
return True
# If we never saw generation start but see idle for a while,
# maybe response was very fast or already done
if not generation_started and consecutive_idle >= 5:
log_phase(9, "wait_for_response", f"OK:no_gen:{elapsed:.0f}s")
log_debug(f" ✓ Response appears complete (no generation detected)")
return True
else:
# Unknown state
log_debug(f" [{elapsed:.0f}s] UNKNOWN state: {state}")
time.sleep(0.5)
log_phase(9, "wait_for_response", f"FAIL:timeout:{timeout}s")
log_debug(f" ✗ TIMEOUT after {timeout}s")
return False
def _get_input_button_state(self) -> str:
"""
Detect the state of the input button (stop/waveform/arrow).
Returns:
'generating' - Black stop button visible (ChatGPT is generating)
'idle' - Waveform icon visible (input empty, ready for input)
'ready' - Arrow icon visible (text in input, ready to send)
'unknown' - Could not determine state
"""
from PIL import ImageGrab
import numpy as np
if not self.window_rect:
return 'unknown'
rect = self.window_rect
window_width = rect[2] - rect[0]
window_height = rect[3] - rect[1]
# Stop/waveform/arrow button at bottom-right of input box
center_x = rect[0] + int(window_width * 0.83)
center_y = rect[1] + int(window_height * 0.87)
area = (center_x - 15, center_y - 15, center_x + 15, center_y + 15)
try:
img = ImageGrab.grab(bbox=area)
pixels = np.array(img)
# Count very dark pixels (black elements)
dark_pixels = np.sum(np.all(pixels < 50, axis=2))
# Stop button (generating): 60-400 dark pixels (black square on white)
# Waveform (idle, empty input): <60 dark pixels (thin wavy lines)
# Arrow (ready, text in input): >400 dark pixels (filled black arrow)
if 60 < dark_pixels < 400:
return 'generating'
elif dark_pixels >= 400:
return 'ready' # Arrow - text in input
else:
return 'idle' # Waveform - empty input
except Exception as e:
log_debug(f" [button-state] Error: {e}")
return 'unknown'
def _clear_input_if_needed(self) -> bool:
"""
Clear the input field if chaos has typed text into it.
Returns True if input is now clear (or was already clear).
"""
from pywinauto.keyboard import send_keys
state = self._get_input_button_state()
if state == 'ready':
# Text in input (arrow visible) - clear it
log_debug(" [clear-input] Chaos text detected in input, clearing...")
if not self._ensure_foreground():
return False
# Click input area first
rect = self.window_rect
if rect:
window_width = rect[2] - rect[0]
window_height = rect[3] - rect[1]
input_x = rect[0] + int(window_width * 0.5)
input_y = rect[1] + int(window_height * 0.83)
self._safe_click(input_x, input_y, "clear input click")
time.sleep(0.1)
# Select all and delete
send_keys('^a')
time.sleep(0.05)
send_keys('{DELETE}')
time.sleep(0.2)
# Verify it's cleared
new_state = self._get_input_button_state()
if new_state == 'idle':
log_debug(" [clear-input] ✓ Input cleared")
return True
else:
log_debug(f" [clear-input] ✗ Input still not clear: {new_state}")
return False
return True # Already clear or generating
def _is_generating(self) -> bool:
"""Check if ChatGPT is generating by looking for black stop button."""
return self._get_input_button_state() == 'generating'
# =========================================================================
# STEP 10: Copy Response
# =========================================================================
def step10_copy_response(self, max_outer_attempts: int = 3) -> str:
"""
Copy the last response to clipboard.
Strategy:
1. Click input area as anchor point
2. Shift+Tab × 5 to reach Copy button (toolbar order from input: +attach, mic, voice, ..., refresh, thumbs-down, thumbs-up, copy)
3. Validate focused element is Copy button before pressing Enter
4. If not on Copy, continue Shift+Tab with validation
5. Fallback to UIA direct invoke
Uses outer retry loop to handle chaos interruptions.
"""
import pyperclip
from pywinauto.keyboard import send_keys
from pywinauto.mouse import click
import win32gui
log_debug("STEP 10: Copying response...")
# Pre-step validation
if not self._ensure_window_viable("step10"):
log_debug(" ✗ FAILED: Window not viable")
return ""
# Helper to get focused element info via UIA
def get_focused_button_name() -> str:
"""Return name of focused button, or empty string if not a button."""
try:
import comtypes.client
UIAutomationCore = comtypes.client.GetModule("UIAutomationCore.dll")
from comtypes.gen.UIAutomationClient import CUIAutomation
uia = comtypes.client.CreateObject(CUIAutomation)
focused = uia.GetFocusedElement()
if not focused:
return ""
ctype_id = focused.CurrentControlType
name = focused.CurrentName or ''
# Button = 50000
if ctype_id == 50000:
log_debug(f"[copy] Focused button: {name!r}")
return name.lower()
else:
log_debug(f"[copy] Focused element type={ctype_id}, name={name[:30]!r}")
return ""
except Exception as e:
log_debug(f"[copy] get_focused_button_name error: {e}")
return ""
def inner_copy_attempt() -> str:
"""Single attempt at copying the response."""
rect = self.window_rect
if not rect:
return ""
window_width = rect[2] - rect[0]
window_height = rect[3] - rect[1]
# CRITICAL: Scroll to bottom of conversation first!
# Chaos may have scrolled up or clicked on old messages
log_debug("[copy] Scrolling to bottom of conversation...")
if self._ensure_foreground():
# Click in the conversation area (not input) to focus it
conv_x = rect[0] + int(window_width * 0.5)
conv_y = rect[1] + int(window_height * 0.5) # Middle of window = conversation area
self._safe_click(conv_x, conv_y, "conversation area")
time.sleep(0.2)
# Send Ctrl+End to jump to absolute bottom
send_keys('^{END}')
time.sleep(0.3)
# Also send End key a few times for good measure
for _ in range(3):
send_keys('{END}')
time.sleep(0.1)
log_debug("[copy] ✓ Scrolled to bottom")
# Now click on input area as anchor point
input_x = rect[0] + int(window_width * 0.5)
input_y = rect[1] + int(window_height * 0.83)
if self._safe_click(input_x, input_y, "copy anchor input"):
time.sleep(0.3)
log_debug(f"[copy] Clicked anchor at ({input_x}, {input_y})")
else:
log_debug(f"[copy] ✗ Anchor click failed")
# Strategy 1: Shift+Tab × 5 to reach Copy, then validate and Enter
log_debug("[copy] Navigating with Shift+Tab × 5...")
# Ensure foreground before keyboard navigation
if not self._ensure_foreground():
log_debug("[copy] ✗ Could not ensure foreground for Shift+Tab navigation")
return ""
# Do 5 Shift+Tabs to reach Copy button area
for i in range(5):
send_keys("+{TAB}")
time.sleep(0.08)
time.sleep(0.15) # Let UI settle
# Check what we landed on
btn_name = get_focused_button_name()
if 'copy' in btn_name:
log_debug(f"[copy] ✓ On Copy button after 5 Shift+Tabs")
# Ensure still foreground before Enter
if self._ensure_foreground():
pyperclip.copy("")
send_keys("{ENTER}")
time.sleep(0.3)
response = pyperclip.paste()
if response and len(response) > 5:
log_debug(f"[copy] ✓ Got {len(response)} chars")
return response
# Not on Copy — maybe need more tabs. Continue with validation.
# Dangerous buttons to avoid: thumbs up, thumbs down, refresh, ...
dangerous = ['thumb', 'dislike', 'like', 'refresh', 'regenerate', 'more']
# Ensure still foreground before additional tabs
if not self._ensure_foreground():
log_debug("[copy] ✗ Lost foreground before additional tab navigation")
return ""
max_extra_tabs = 6
for i in range(max_extra_tabs):
send_keys("+{TAB}")
time.sleep(0.1)
btn_name = get_focused_button_name()
if 'copy' in btn_name:
log_debug(f"[copy] ✓ Found Copy button after {5 + i + 1} Shift+Tabs")
# Ensure foreground before Enter
if not self._ensure_foreground():
log_debug("[copy] ✗ Lost foreground before pressing Enter")
break
pyperclip.copy("")
send_keys("{ENTER}")
time.sleep(0.3)
response = pyperclip.paste()
if response and len(response) > 5:
log_debug(f"[copy] ✓ Got {len(response)} chars")
return response
elif any(d in btn_name for d in dangerous):
log_debug(f"[copy] Skipping dangerous button: {btn_name}")
continue
elif btn_name == '':
# Not on a button, might have overshot
log_debug(f"[copy] Lost button focus, stopping tab navigation")
break
# Fallback: Try UIA direct invoke
log_debug("[copy] Tab navigation failed, trying UIA direct invoke...")
try:
from pywinauto import Application
app = Application(backend='uia').connect(handle=self.hwnd)
main_win = app.window(handle=self.hwnd)
buttons = main_win.descendants(control_type='Button')
for b in buttons:
try:
name = (b.element_info.name or "").lower()
if 'copy' in name:
log_debug(f"[copy] UIA found Copy button: {name} - invoking")
pyperclip.copy("")
try:
b.invoke()
except Exception:
b.click_input()
time.sleep(0.4)
resp = pyperclip.paste()
if resp and len(resp) > 5:
log_debug(f"[copy] ✓ Got {len(resp)} chars via UIA")
return resp
except Exception:
continue
except Exception as e:
log_debug(f"[copy] UIA fallback failed: {e}")
# Last resort: Ctrl+Shift+C
log_debug("[copy] Trying Ctrl+Shift+C...")
if self._ensure_foreground():
pyperclip.copy("")
send_keys("^+c")
time.sleep(0.5)
response = pyperclip.paste()
if response and len(response) > 5:
log_debug(f"[copy] ✓ Got {len(response)} chars via Ctrl+Shift+C")
return response
return ""
# Outer retry loop for chaos resilience
for attempt in range(1, max_outer_attempts + 1):
log_debug(f"[copy] Outer attempt {attempt}/{max_outer_attempts}")
# Ensure window is ready before attempt
if not self._ensure_foreground():
log_debug(f"[copy] Could not ensure foreground for attempt {attempt}")
time.sleep(0.5)
continue
result = inner_copy_attempt()
if result:
log_phase(10, "copy_response", f"OK:{len(result)}chars")
return result
log_debug(f"[copy] Attempt {attempt} failed, waiting before retry...")
time.sleep(0.5 * attempt) # Exponential backoff
log_phase(10, "copy_response", "FAIL:no_content")
log_debug("[copy] ✗ FAILED: Could not copy response after all attempts")
return ""
# =========================================================================
# FULL FLOW
# =========================================================================
def _is_valid_json_response(self, response: str) -> bool:
"""
Check if response looks like a valid JSON response from ChatGPT.
This catches cases where chaos sent a new prompt and we copied
the wrong response (e.g., "Still here. Still stable.")
Returns True if response appears to be valid JSON with expected fields.
"""
import json
if not response or len(response) < 20:
log_debug("[validate] Response too short")
return False
# Try to find JSON in the response (might have markdown wrapping)
json_text = response.strip()
# Remove markdown code blocks if present
if json_text.startswith("```"):
lines = json_text.split("\n")
# Remove first and last lines (```json and ```)
json_lines = [l for l in lines if not l.strip().startswith("```")]
json_text = "\n".join(json_lines)
try:
parsed = json.loads(json_text)
# Check for expected fields
if isinstance(parsed, dict):
# Must have at least 'guidance' or 'action_plan' or 'priority'
expected_fields = ['guidance', 'action_plan', 'priority']
has_expected = any(field in parsed for field in expected_fields)
if has_expected:
log_debug(f"[validate] ✓ Valid JSON with expected fields")
return True
else:
log_debug(f"[validate] ✗ JSON but missing expected fields: {list(parsed.keys())}")
return False
else:
log_debug(f"[validate] ✗ JSON but not a dict: {type(parsed)}")
return False
except json.JSONDecodeError as e:
log_debug(f"[validate] ✗ Not valid JSON: {e}")
log_debug(f"[validate] First 100 chars: {response[:100]}")
return False
def execute_full_flow(
self,
project_name: str,
conversation_name: str,
prompt: str,
max_response_retries: int = 2
) -> dict:
"""
Execute the complete flow with verification at each step.
Includes retry logic if the copied response looks like "trash"
(e.g., chaos sent a new prompt and we copied that response).
Returns:
{
"success": bool,
"response": str (if successful),
"error": str (if failed),
"failed_step": int (if failed)
}
"""
# Reset flow timer for elapsed time tracking
reset_flow_timer()
# Block all mouse/keyboard input during the automation flow
# This prevents chaos from interfering with clicks and typing
with input_blocked():
log_phase(0, "execute_full_flow", "START")
log_debug("=" * 60)
log_debug("EXECUTING FULL FLOW")
log_debug(f" Project: {project_name}")
log_debug(f" Conversation: {conversation_name}")
log_debug(f" Prompt: {prompt[:50]}...")
log_debug("=" * 60)
# Step 1: Kill ChatGPT
log_phase(1, "kill_chatgpt", "START")
if not self.step1_kill_chatgpt():
log_phase(1, "kill_chatgpt", "FAIL")
return {"success": False, "error": "Failed to kill ChatGPT", "failed_step": 1}
log_phase(1, "kill_chatgpt", "OK")
time.sleep(1.0) # Extra wait after kill
# Step 2: Start ChatGPT
log_phase(2, "start_chatgpt", "START")
if not self.step2_start_chatgpt():
log_phase(2, "start_chatgpt", "FAIL")
return {"success": False, "error": "Failed to start ChatGPT", "failed_step": 2}
log_phase(2, "start_chatgpt", "OK")
# Step 3: Focus ChatGPT
log_phase(3, "focus_chatgpt", "START")
if not self.step3_focus_chatgpt():
log_phase(3, "focus_chatgpt", "FAIL")
return {"success": False, "error": "Failed to focus ChatGPT", "failed_step": 3}
log_phase(3, "focus_chatgpt", "OK")
# Step 4: Open Sidebar
if not self.step4_open_sidebar():
return {"success": False, "error": "Failed to open sidebar", "failed_step": 4}
# Step 5: Click Project
if not self.step5_click_project(project_name):
return {"success": False, "error": f"Failed to find project '{project_name}'", "failed_step": 5}
time.sleep(0.5) # Wait for project to expand
# Step 6: Click Conversation - with retry from step 4 if needed
# (chaos might have closed sidebar or navigated away)
max_nav_retries = 2
for nav_attempt in range(max_nav_retries + 1):
if nav_attempt > 0:
log_phase(6, "click_conversation", f"NAV_RETRY:{nav_attempt+1}")
log_debug(f"[flow] Navigation failed, retrying from sidebar (attempt {nav_attempt + 1}/{max_nav_retries + 1})...")
time.sleep(0.5)
# Re-ensure foreground
if not self._ensure_foreground():
log_debug(f"[flow] ⚠ Could not ensure foreground for retry")
# Re-open sidebar
if not self.step4_open_sidebar():
log_debug(f"[flow] ⚠ Could not re-open sidebar")
continue
# Re-click project
if not self.step5_click_project(project_name):
log_debug(f"[flow] ⚠ Could not re-click project")
continue
time.sleep(0.5)
if self.step6_click_conversation(conversation_name):
break # Success!
else:
return {"success": False, "error": f"Failed to find conversation '{conversation_name}'", "failed_step": 6}
# Response retry loop - handles case where chaos sent a new prompt
for response_attempt in range(max_response_retries + 1):
if response_attempt > 0:
log_phase(7, "send_prompt", f"RESPONSE_RETRY:{response_attempt+1}")
log_debug(f"[flow] Response invalid, retrying prompt (attempt {response_attempt + 1})")
# Add a clarification to the prompt
retry_prompt = f"Please ignore any messages above that don't match this format. Here is my actual question:\n\n{prompt}"
else:
retry_prompt = prompt
# Step 7+8: Focus & Send Prompt
if not self.step7_send_prompt(retry_prompt):
return {"success": False, "error": "Failed to focus/send prompt", "failed_step": 7}
# Step 9: Wait for Response
if not self.step9_wait_for_response():
return {"success": False, "error": "Timeout waiting for response", "failed_step": 9}
# Step 10: Copy Response
response = self.step10_copy_response()
if not response:
return {"success": False, "error": "Failed to copy response", "failed_step": 10}
# Validate response looks like proper JSON
if self._is_valid_json_response(response):
log_phase(0, "execute_full_flow", "COMPLETE")
log_debug("=" * 60)
log_debug("FLOW COMPLETE - SUCCESS")
log_debug("=" * 60)
return {
"success": True,
"response": response
}
else:
log_phase(10, "copy_response", "INVALID_JSON")
log_debug(f"[flow] Response looks like trash, will retry...")
log_debug(f"[flow] Got: {response[:100]}...")
# All retries exhausted
log_phase(0, "execute_full_flow", "FAIL:response_validation")
return {
"success": False,
"error": f"Response validation failed after {max_response_retries + 1} attempts. Last response: {response[:100]}",
"failed_step": 10
}
# Test
if __name__ == "__main__":
flow = RobustChatGPTFlow()
# Test individual steps
print("Testing robust flow...")
# Test step 1
if flow.step1_kill_chatgpt():
print("Step 1 passed")
time.sleep(1)
# Test step 2
if flow.step2_start_chatgpt():
print("Step 2 passed")
# Test step 3
if flow.step3_focus_chatgpt():
print("Step 3 passed")
# Test step 4
if flow.step4_open_sidebar():
print("Step 4 passed")
# Test step 5 - Click project
if flow.step5_click_project("Agent Expert Help"):
print("Step 5 passed")
else:
print("Step 5 FAILED")
exit(1)
time.sleep(0.5)
# Test step 6 - Click conversation
if flow.step6_click_conversation("o3 test"):
print("Step 6 passed")
else:
print("Step 6 FAILED")
exit(1)
time.sleep(0.5)
# Test step 7+8 - Focus & Send prompt
test_prompt = "What are 5 examples of water being used in biology"
if flow.step7_send_prompt(test_prompt):
print("Step 7+8 passed")
else:
print("Step 7+8 FAILED")
exit(1)
# Test step 9 - Wait for response
if flow.step9_wait_for_response():
print("Step 9 passed")
else:
print("Step 9 FAILED")
exit(1)
# Test step 10 - Copy response
response = flow.step10_copy_response()
if response:
print("Step 10 passed")
print(f"\n{'='*60}")
print("RESPONSE:")
print(f"{'='*60}")
print(response[:500] + "..." if len(response) > 500 else response)
else:
print("Step 10 FAILED")
exit(1)
print("\n✓ FULL FLOW COMPLETE!")