MCP Operator
by willer
Verified
#!/usr/bin/env python3
"""
Agent implementation for the OpenAI Computer Use Agent (CUA)
"""
import os
import json
import base64
import io
import asyncio
import aiohttp
import imageio.v2 as imageio
from typing import List, Dict, Any
from pathlib import Path
from urllib.parse import urlparse
from .computer import AsyncComputer
# Pretty print JSON objects
def pp(obj):
"""Pretty print JSON objects"""
print(json.dumps(obj, indent=4))
# Create OpenAI Responses API request
async def create_response(**kwargs):
"""Create a response from the OpenAI API using aiohttp with retry logic"""
api_key = os.getenv('OPENAI_API_KEY')
url = "https://api.openai.com/v1/responses"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Openai-beta": "responses=v1",
}
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=kwargs) as response:
if response.status != 200:
error_text = await response.text()
print(f"Error: {response.status} {error_text}")
# Check for rate limit errors
if response.status == 429 or "rate limit" in error_text.lower():
wait_time = retry_delay * (2 ** attempt) # Exponential backoff
print(f"Rate limit hit. Waiting {wait_time}s before retry {attempt+1}/{max_retries}...")
await asyncio.sleep(wait_time)
continue
return {"error": error_text}
response_json = await response.json()
# Verify response has expected structure
if "error" in response_json:
print(f"API returned error: {response_json['error']}")
# Check if it's a rate limit error
if "rate limit" in str(response_json['error']).lower():
wait_time = retry_delay * (2 ** attempt)
print(f"Rate limit hit. Waiting {wait_time}s before retry {attempt+1}/{max_retries}...")
await asyncio.sleep(wait_time)
continue
return response_json
except Exception as e:
print(f"Network error on attempt {attempt+1}/{max_retries}: {str(e)}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
print(f"Waiting {wait_time}s before retry...")
await asyncio.sleep(wait_time)
else:
print("Max retries reached, giving up.")
return {"error": f"Max retries reached: {str(e)}"}
# If we exhausted all retries
return {"error": "Max retries reached"}
# Check if a URL is allowed by domain rules
def check_allowed_url(url: str, allowed_domains: List[str]) -> bool:
"""Check if URL is in allowed domains list"""
hostname = urlparse(url).hostname or ""
return any(hostname.endswith(domain) for domain in allowed_domains)
class Agent:
"""Agent to manage the CUA loop and interaction with the Computer"""
def __init__(
self,
model="computer-use-preview",
computer: Any = None,
allowed_domains: List[str] = None,
):
self.model = model
self.computer = computer
self.print_steps = True
self.debug = False
self.conversation_history = []
self.screen_captures = []
self.allowed_domains = allowed_domains or ['about:blank']
self.last_reasoning = None # Store the last reasoning message
# Set up tools to include computer-preview
self.tools = []
if computer:
self.tools.append({
"type": "computer-preview",
"display_width": computer.dimensions[0],
"display_height": computer.dimensions[1],
"environment": computer.environment,
})
def debug_print(self, *args):
"""Print debug information if debug is enabled"""
if self.debug:
pp(*args)
# Define a function to generate contextual reasoning for actions
def generate_action_reasoning(self, action_type, action_args):
"""Generate contextual reasoning for different action types"""
action_reasoning = {
"click": "Clicking on an element to interact with the page interface. This helps navigate through the content to find the requested information.",
"double_click": "Double-clicking on an element to open or expand content that may contain relevant information.",
"type": "Typing text to provide input needed for this search. This text will help narrow down the results to find the specific information requested.",
"keypress": "Submitting the search query to find information about the requested topic. This will execute the search and retrieve relevant results.",
"scroll": "Scrolling the page to view additional content that might contain the requested information. Scrolling allows examining more search results or content.",
"goto": "Navigating to a website to find information about the requested topic. This website likely contains relevant data or search capabilities needed.",
"wait": "Waiting for page to respond while the page loads the requested information. This ensures all content is properly displayed before proceeding.",
"move": "Moving the cursor to prepare for the next interaction. Positioning the cursor is necessary before clicking or selecting content.",
"drag": "Adjusting the view or interacting with content by dragging. This helps reveal or organize information in a more useful way.",
"screenshot": "Capturing a screenshot to record the visual information displayed. This preserves the current state of the information for reference."
}
# Get default reasoning for this action type
base_reasoning = action_reasoning.get(action_type, f"Performing {action_type} action to find the requested information.")
# Add specific details based on action type and args
if action_type == "click":
x = action_args.get("x", 0)
y = action_args.get("y", 0)
return f"Clicking at position ({x}, {y}) - {base_reasoning}"
elif action_type == "type":
text = action_args.get("text", "")
if len(text) > 30:
text = text[:30] + "..."
return f"Typing '{text}' - {base_reasoning}"
elif action_type == "keypress":
keys = action_args.get("keys", [])
if isinstance(keys, list):
keys = ", ".join(keys)
return f"Pressing keys: {keys} - {base_reasoning}"
elif action_type == "scroll":
x = action_args.get("scroll_x", 0)
y = action_args.get("scroll_y", 0)
direction = "down" if y > 0 else "up"
return f"Scrolling {direction} - {base_reasoning}"
elif action_type == "wait":
return f"Waiting - {base_reasoning}"
# Return default reasoning with action type
return base_reasoning
async def handle_item(self, item):
"""Handle response items from the model"""
# Handle reasoning items - new format in the API
if item["type"] == "reasoning":
if "summary" in item:
combined_text = ""
for summary in item.get("summary", []):
if isinstance(summary, dict) and "text" in summary:
combined_text += summary["text"] + " "
elif isinstance(summary, str):
combined_text += summary + " "
if combined_text.strip():
reasoning_text = combined_text.strip()
self.last_reasoning = reasoning_text
if self.print_steps:
print(f"Reasoning: {reasoning_text}")
# Add reasoning to conversation history
self.conversation_history.append({
"role": "assistant",
"content": reasoning_text,
"type": "reasoning" # Add type to distinguish from actions
})
return [] # No output items for reasoning
# Parse messages for reasoning sections indicated by [REASONING] tags
elif item["type"] == "message":
if "content" in item and len(item["content"]) > 0:
content = item["content"][0]
if isinstance(content, dict) and "text" in content:
message_text = content["text"]
# Look for [REASONING] tags in the message
import re
reasoning_match = re.search(r'\[REASONING\](.*?)(?:\[ACTION\]|$)', message_text, re.DOTALL)
if reasoning_match:
reasoning_text = reasoning_match.group(1).strip()
# Store reasoning text to reference with the next action
self.last_reasoning = reasoning_text
if self.print_steps:
print(f"Reasoning: {reasoning_text}")
# Add reasoning to conversation history
self.conversation_history.append({
"role": "assistant",
"content": reasoning_text,
"type": "reasoning" # Add type to distinguish from actions
})
else:
# No explicit [REASONING] tag, treat the whole message as reasoning
self.last_reasoning = message_text
# Add to conversation history
self.conversation_history.append({
"role": "assistant",
"content": message_text,
"type": "message"
})
return [] # No output items for message
elif item["type"] == "computer_call":
action = item["action"]
action_type = action["type"]
action_args = {k: v for k, v in action.items() if k != "type"}
# Print action first
if self.print_steps:
print(f"{action_type}({action_args})")
# If no reasoning is available, generate one based on action type
if not hasattr(self, 'last_reasoning') or not self.last_reasoning:
self.last_reasoning = self.generate_action_reasoning(action_type, action_args)
# Add generated reasoning to conversation history
self.conversation_history.append({
"role": "assistant",
"content": self.last_reasoning,
"type": "reasoning"
})
# Print reasoning after action for better clarity
if self.print_steps and self.last_reasoning:
print(f"Reasoning: {self.last_reasoning}")
# Add action to conversation history
self.conversation_history.append({
"role": "assistant",
"content": f"{action_type}({action_args})",
"type": "action"
})
# Clear reasoning after using it
reasoning_used = self.last_reasoning
self.last_reasoning = None
# Execute the action on the computer
method = getattr(self.computer, action_type)
await method(**action_args)
# Add the action to conversation history
self.conversation_history.append({
"role": "assistant",
"content": f"{action_type}({action_args})",
"type": "action" # Identify as an action
})
# Capture the screenshot
screenshot_base64 = await self.computer.screenshot()
self.screen_captures.append(imageio.imread(io.BytesIO(base64.b64decode(screenshot_base64))))
# Prepare response
call_output = {
"type": "computer_call_output",
"call_id": item["call_id"],
"acknowledged_safety_checks": [],
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}",
},
}
# Add URL for browser environments
if self.computer.environment == "browser":
current_url = await self.computer.get_current_url()
if not check_allowed_url(current_url, self.allowed_domains):
print(f"Error: URL not in allowed domains: {current_url}")
# Force navigation to allowed domain
await self.computer.goto(f"https://{self.allowed_domains[0]}")
current_url = await self.computer.get_current_url()
call_output["output"]["current_url"] = current_url
return [call_output]
return []
async def run_full_turn(self, input_items, print_steps=True, debug=False):
"""Run a full turn of the conversation with the model"""
self.print_steps = print_steps
self.debug = debug
new_items = []
# Keep looping until we get a final response
while new_items[-1].get("role") != "assistant" if new_items else True:
self.debug_print(input_items + new_items)
# Only print new user messages, not the initial instructions that get repeated
if print_steps and len(new_items) > 0: # Only print for follow-up turns, not the first turn
for item in input_items:
if item.get("role") == "user" and "content" in item:
for content in item.get("content", []):
if isinstance(content, dict) and content.get("type") == "input_text":
# Extract just the evaluation instructions, which are the new part
text = content.get("text", "")
if "Looking at the current screen" in text and "Test requirements:" in text:
print(f"\n--- SENDING EVALUATION REQUEST TO MODEL ---\n")
response = await create_response(
model=self.model,
input=input_items + new_items,
tools=self.tools,
truncation="auto",
temperature=0.2, # Small amount of temperature to avoid deterministic errors
# Note: timeout parameter removed as it's not supported by the API
)
# Print the response structure for debugging
if self.debug: # Only show in debug mode
print("DEBUG - FULL RESPONSE STRUCTURE:")
try:
import json
print(json.dumps(response, indent=2)[:1000]) # Truncate to avoid massive output
except Exception as e:
print(f"Error printing response: {e}")
# Try to extract reasoning directly from the response
try:
if "reasoning" in response and response["reasoning"]:
# This field contains reasoning for the next actions
reasoning_obj = response["reasoning"]
# Parse out reasoning text if available
reasoning_text = ""
if isinstance(reasoning_obj, dict):
# Try extract from various fields based on the API structure
if "description" in reasoning_obj:
reasoning_text = reasoning_obj["description"]
elif "explanation" in reasoning_obj:
reasoning_text = reasoning_obj["explanation"]
elif "effort" in reasoning_obj and reasoning_obj["effort"] != "medium":
reasoning_text = f"Reasoning effort: {reasoning_obj['effort']}"
if reasoning_text:
print(f"Reasoning from response object: {reasoning_text}")
self.last_reasoning = reasoning_text
except Exception as e:
if self.debug:
print(f"Error extracting reasoning from response: {e}")
self.debug_print(response)
if "output" not in response:
if self.debug:
print(response)
# Check for specific error messages we might handle
if "error" in response:
error_message = response["error"]
print(f"API Error: {error_message}")
# Handle rate limit issues
if isinstance(error_message, str) and "rate limit" in error_message.lower():
print("Rate limit hit. Consider reducing the number of concurrent tests or increasing delays.")
# Handle auth issues
if isinstance(error_message, str) and any(term in error_message.lower() for term in ["authentication", "unauthorized", "auth", "key"]):
print("Authentication error. Check your OpenAI API key.")
# Handle quota issues
if isinstance(error_message, str) and "quota" in error_message.lower():
print("API quota exceeded. Check your OpenAI account billing and limits.")
# Try to recover with a partial response
if "item" in response or "items" in response:
items_key = "items" if "items" in response else "item"
print(f"Attempting to recover with partial response ({items_key})...")
return response.get(items_key, [])
# Fall back to returning an error message that will be shown to the user
return [{
"role": "assistant",
"content": [{
"type": "output_text",
"text": "Test FAILED. An API error occurred: the model did not provide any output. This might be due to rate limits, quotas, or API availability. Please try again later."
}]
}]
else:
# Process the output and extract reasoning if present
reasoning_text = ""
# Look for reasoning BEFORE computer_call items
computer_call_items = []
reasoning_items = []
other_items = []
# First pass - segregate items by type
for item in response["output"]:
if item.get("type") == "reasoning":
reasoning_items.append(item)
elif item.get("type") == "computer_call":
computer_call_items.append(item)
else:
other_items.append(item)
# Now handle the items in order: reasoning first, then computer calls
# This ensures reasoning is captured before actions
for item in reasoning_items:
# Try to extract reasoning from the item
if "summary" in item:
combined_text = ""
for summary in item.get("summary", []):
if isinstance(summary, dict) and "text" in summary:
combined_text += summary["text"] + " "
elif isinstance(summary, str):
combined_text += summary + " "
if combined_text.strip():
reasoning_text = combined_text.strip()
print(f"Reasoning: {reasoning_text}")
self.last_reasoning = reasoning_text
# Add reasoning to conversation history
self.conversation_history.append({
"role": "assistant",
"content": reasoning_text,
"type": "reasoning"
})
# Check if the model is asking a question
model_asking_question = False
for item in other_items:
if item.get("role") == "assistant" and "content" in item:
for content in item.get("content", []):
if isinstance(content, dict) and content.get("type") == "output_text":
text = content.get("text", "")
if "?" in text and len(text) < 250: # Likely a question
model_asking_question = True
print(f"\n--- MODEL QUESTION: {text} ---\n")
# Add a response that tells it to continue with the task
new_items.append({
"role": "user",
"content": [{
"type": "input_text",
"text": "Yes, please continue with the task. Close any popups or dialogs, and proceed with the test instructions."
}]
})
print("--- AUTOMATIC RESPONSE: Yes, please continue with the task ---\n")
# Extract reasoning directly from the response if not found in items
if not reasoning_text and "reasoning" in response:
if isinstance(response["reasoning"], dict) and "summary" in response["reasoning"]:
reasoning_text = response["reasoning"]["summary"]
print(f"Reasoning from response: {reasoning_text}")
self.last_reasoning = reasoning_text
# Add the model's output to our items
new_items += response["output"]
# If model wasn't asking a question, handle the items
if not model_asking_question:
# Handle computer calls last
for item in computer_call_items:
new_items += await self.handle_item(item)
# Handle other items
for item in other_items:
if item.get("type") != "reasoning": # Skip reasoning items (already handled)
new_items += await self.handle_item(item)
return new_items
async def run(self, task, max_steps=60, auth_state=None):
"""Run the agent to complete a task"""
# Initialize screen captures
self.screen_captures = []
# Set up computer with auth state
async with self.computer as computer:
# Navigate to a blank page first to establish the browser context
await computer.goto("about:blank")
# Extract URL from task - we'll navigate to it after setting cookies
url = self.extract_url_from_task(task)
# Apply auth state using the direct Playwright approach
if auth_state:
try:
# Get the current context
context = computer._browser.contexts[0]
# Apply the auth state directly to the browser context
# This uses the exact same format that was saved by auth_setup.py
print(f"Applying auth state with {len(auth_state.get('cookies', []))} cookies")
await context.add_cookies(auth_state.get('cookies', []))
# Apply origins if available (for localStorage and sessionStorage)
if 'origins' in auth_state:
print(f"Applying storage from {len(auth_state.get('origins', []))} origins")
# No need to iterate through the origins - Playwright handles this automatically
# when we set the entire storage state at once
storage_state_json = json.dumps(auth_state)
await context.add_init_script(f"""
() => {{
const storageState = {storage_state_json};
if (storageState.origins) {{
for (const origin of storageState.origins) {{
const originURL = new URL(origin.origin);
if (originURL.origin === window.location.origin) {{
// Apply localStorage
if (origin.localStorage) {{
for (const entry of origin.localStorage) {{
try {{
window.localStorage.setItem(entry.name, entry.value);
}} catch (e) {{
console.error('Error setting localStorage:', e);
}}
}}
}}
// Apply sessionStorage
if (origin.sessionStorage) {{
for (const entry of origin.sessionStorage) {{
try {{
window.sessionStorage.setItem(entry.name, entry.value);
}} catch (e) {{
console.error('Error setting sessionStorage:', e);
}}
}}
}}
}}
}}
}}
}}
""")
# Verify the auth state was applied
current_state = await context.storage_state()
print(f"Browser now has {len(current_state.get('cookies', []))} cookies")
except Exception as e:
print(f"Error applying auth state: {e}")
# Navigate to the target URL
if url:
print(f"Navigating to URL: {url}")
try:
await computer._page.goto(url, wait_until="domcontentloaded", timeout=30000)
print("Navigation complete, waiting for page to fully load...")
except Exception as e:
print(f"Navigation error: {e}")
# Wait for page to fully load and stabilize
await computer.wait(5000) # Wait time for complex pages
# Capture initial screenshot
screenshot_base64 = await computer.screenshot()
self.screen_captures.append(imageio.imread(io.BytesIO(base64.b64decode(screenshot_base64))))
# Store conversation in history
self.conversation_history = []
# Initialize the conversation
user_message = {
"role": "user",
"content": [
{
"type": "input_text",
"text": task
},
{
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}"
}
]
}
# Store the message in history
self.conversation_history.append({
"role": "user",
"content": task
})
# Run the initial turn
items = await self.run_full_turn([user_message], print_steps=True, debug=self.debug)
# Process the response and store in history
for item in items:
if item.get("role") == "assistant":
content_text = ""
# Skip if this is already going to be handled as a reasoning or action entry
# by the handle_item method
if item.get("type") in ["computer_call", "computer_call_output"]:
continue
if isinstance(item.get("content", []), list) and len(item.get("content", [])) > 0:
content_item = item.get("content", [])[0]
if isinstance(content_item, dict) and "text" in content_item:
content_text = content_item.get("text", "")
else:
content_text = str(content_item)
else:
content_text = "No response"
self.conversation_history.append({
"role": "assistant",
"content": content_text,
"type": "message" # Explicitly mark as a regular message
})
# If the assistant is talking about login, try to handle the login automatically
login_phrases = ["login", "sign in", "sign-in", "google account", "authenticate", "credentials"]
if any(phrase in content_text.lower() for phrase in login_phrases):
print("Assistant mentioned login - will try to auto-login if we see login form")
# Wait a bit and check for login buttons
await computer.wait(2000)
try:
current_url = await computer.get_current_url()
if "accounts.google.com" in current_url or "login" in current_url:
print("Detected login page - trying to find account selector")
# Take a screenshot to analyze the login page
screenshot = await computer.screenshot()
# Try to bypass Google login using auth tokens and local storage
bypass_login_script = """
() => {
// Save the current URL to redirect back later
const targetUrl = localStorage.getItem('redirect_after_auth') || window.location.href;
// Set auth tokens and bypasses
localStorage.setItem('gapi_auth', 'true');
localStorage.setItem('google_oauth_token', 'bypass_token');
localStorage.setItem('google_auth_bypass', 'true');
localStorage.setItem('auth_override', 'true');
localStorage.setItem('genome_auth_bypass', 'true');
sessionStorage.setItem('genome_auth_bypass', 'true');
// Try to set cookie values via document.cookie
try {
document.cookie = 'auth_bypass=true; path=/; domain=.klick.com';
document.cookie = 'google_auth_complete=true; path=/; domain=.klick.com';
} catch (e) {
console.error('Error setting cookies:', e);
}
return {
status: 'Attempted auth bypass',
targetUrl: targetUrl
};
}
"""
# Try the bypass first
try:
bypass_result = await computer._page.evaluate(bypass_login_script)
print(f"Auth bypass attempt: {bypass_result}")
except Exception as e:
print(f"Auth bypass error: {e}")
# Try a more robust approach to find and click on account elements
# First use JavaScript to identify potential login elements
find_accounts_script = """
() => {
// Look for common elements in Google login screens
const elements = [];
// Profile images are often in divs with role="link"
const profiles = Array.from(document.querySelectorAll('div[role="link"] img'));
if (profiles.length > 0) {
profiles.forEach((img, i) => {
const rect = img.getBoundingClientRect();
elements.push({
type: 'profile',
index: i,
x: Math.round(rect.left + rect.width / 2),
y: Math.round(rect.top + rect.height / 2)
});
});
}
// Look for common button text
['Next', 'Continue', 'Sign in', 'Log in', 'Yes', 'Confirm'].forEach(text => {
const buttons = Array.from(document.querySelectorAll('button, div[role="button"], a[role="button"]'))
.filter(el => el.innerText.includes(text));
buttons.forEach((btn, i) => {
const rect = btn.getBoundingClientRect();
if (rect.width > 0 && rect.height > 0) {
elements.push({
type: 'button',
text: text,
index: i,
x: Math.round(rect.left + rect.width / 2),
y: Math.round(rect.top + rect.height / 2)
});
}
});
});
// Look for the first visible account or profile card
const accountCards = Array.from(document.querySelectorAll('div[data-identifier], div[data-email]'));
accountCards.forEach((card, i) => {
const rect = card.getBoundingClientRect();
if (rect.width > 0 && rect.height > 0) {
elements.push({
type: 'account',
index: i,
x: Math.round(rect.left + rect.width / 2),
y: Math.round(rect.top + rect.height / 2)
});
}
});
return elements;
}
"""
try:
# Execute the script to find clickable elements
elements = await computer._page.evaluate(find_accounts_script)
if elements and len(elements) > 0:
print(f"Found {len(elements)} potential login elements: {elements}")
# First try profile images
profiles = [e for e in elements if e['type'] == 'profile']
if profiles:
print(f"Clicking profile image at ({profiles[0]['x']}, {profiles[0]['y']})")
await computer.click(profiles[0]['x'], profiles[0]['y'])
# Then try account cards
elif any(e['type'] == 'account' for e in elements):
account = next(e for e in elements if e['type'] == 'account')
print(f"Clicking account card at ({account['x']}, {account['y']})")
await computer.click(account['x'], account['y'])
# Then try buttons
elif any(e['type'] == 'button' for e in elements):
button = next(e for e in elements if e['type'] == 'button')
print(f"Clicking button '{button['text']}' at ({button['x']}, {button['y']})")
await computer.click(button['x'], button['y'])
else:
# Fallback to center of screen
print("Using fallback clicks")
await computer.click(640, 400)
else:
print("No login elements found, trying fixed positions...")
# First try center of screen where first account usually is
await computer.click(640, 400)
except Exception as e:
print(f"Error finding login elements: {e}")
# Fallback to fixed positions
await computer.click(640, 400)
# Wait a bit to see if anything happens
await computer.wait(3000)
# Check if we're still on a login page
current_url_after = await computer.get_current_url()
if "accounts.google.com" in current_url_after:
# Take another screenshot to see what changed
screenshot = await computer.screenshot()
# Try buttons that might appear in next page
try:
next_button_script = """
() => {
// Look for 'Next' or 'Continue' buttons that might appear in the flow
const buttonTexts = ['Next', 'Continue', 'Sign in', 'Yes', 'Confirm'];
for (const text of buttonTexts) {
const buttons = Array.from(document.querySelectorAll('button, div[role="button"]'))
.filter(el => el.innerText.includes(text));
if (buttons.length > 0) {
const rect = buttons[0].getBoundingClientRect();
return {
text: text,
x: Math.round(rect.left + rect.width / 2),
y: Math.round(rect.top + rect.height / 2)
};
}
}
return null;
}
"""
next_button = await computer._page.evaluate(next_button_script)
if next_button:
print(f"Clicking '{next_button['text']}' button at ({next_button['x']}, {next_button['y']})")
await computer.click(next_button['x'], next_button['y'])
await computer.wait(2000)
else:
# Try clicking in common positions as fallback
print("First click didn't work, trying another position")
await computer.click(640, 300)
await computer.wait(2000)
except Exception as e:
print(f"Error finding next buttons: {e}")
# Fallback to fixed positions
print("First click didn't work, trying another position")
await computer.click(640, 300)
await computer.wait(2000)
# Check again
current_url_after = await computer.get_current_url()
if "accounts.google.com" in current_url_after:
# Try one more position
print("Second click didn't work, trying top-left position")
await computer.click(400, 300)
await computer.wait(2000)
# Wait longer for login to complete
await computer.wait(5000)
# Check if we're still on login page
final_url = await computer.get_current_url()
if "accounts.google.com" not in final_url:
print("Successfully navigated past login page!")
else:
print("Still on login page after auto-login attempts")
except Exception as e:
print(f"Auto-login attempt failed: {e}")
# Check if we're done
current_step = 1
while current_step < max_steps:
# Check if we're done
if self.is_done(items):
break
current_step += 1
print(f"\033[93m==== Running step {current_step}/{max_steps} ====\033[0m")
# Construct follow-up message
follow_up = f"""
Looking at the current screen, please evaluate the test status.
Test requirements:
{task}
Please write your thought process for determining if this is a PASS or a FAIL, considering:
1. Which requirements have been completed successfully?
2. Which requirements (if any) have not been completed successfully?
3. Are there any blocking issues that prevent completion?
IMPORTANT: For each action you take, please always provide your reasoning. Format your actions like this:
[REASONING] I'm clicking this button because it appears to be the login button that will take me to the dashboard.
[ACTION] *click on login button*
After your analysis, end your response with a single paragraph starting with exactly "Test PASSED." or "Test FAILED." followed by a brief explanation of the key results.
"""
# Get latest screenshot
screenshot_base64 = await computer.screenshot()
# Create message with screenshot
user_message = {
"role": "user",
"content": [
{
"type": "input_text",
"text": follow_up
},
{
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}"
}
]
}
# Store message in history
self.conversation_history.append({
"role": "user",
"content": follow_up
})
# Run the turn
new_items = await self.run_full_turn([user_message], print_steps=True, debug=self.debug)
items = new_items
# Process the response and store in history
for item in items:
if item.get("role") == "assistant":
content_text = ""
# Skip if this is already going to be handled as a reasoning or action entry
# by the handle_item method
if item.get("type") in ["computer_call", "computer_call_output"]:
continue
if isinstance(item.get("content", []), list) and len(item.get("content", [])) > 0:
content_item = item.get("content", [])[0]
if isinstance(content_item, dict) and "text" in content_item:
content_text = content_item.get("text", "")
else:
content_text = str(content_item)
else:
content_text = "No response"
self.conversation_history.append({
"role": "assistant",
"content": content_text,
"type": "message" # Explicitly mark as a regular message
})
# If the assistant is talking about login, try to handle the login automatically
login_phrases = ["login", "sign in", "sign-in", "google account", "authenticate", "credentials"]
if any(phrase in content_text.lower() for phrase in login_phrases):
print("Assistant mentioned login - will try to auto-login if we see login form")
# Wait a bit and check for login buttons
await computer.wait(2000)
try:
current_url = await computer.get_current_url()
if "accounts.google.com" in current_url or "login" in current_url:
print("Detected login page - trying to find account selector")
# Check for Google login selectors - these are common patterns
# Take a screenshot to debug
screenshot = await computer.screenshot()
# Try to find account selector elements by clicking in common locations
# First try center of screen where first account usually is
await computer.click(640, 400)
# Wait a bit to see if anything happens
await computer.wait(2000)
# Check if we're still on a login page
current_url_after = await computer.get_current_url()
if "accounts.google.com" in current_url_after:
# Try clicking in other common places
print("First click didn't work, trying another position")
# Try the top account position
await computer.click(640, 300)
await computer.wait(2000)
# Check again
current_url_after = await computer.get_current_url()
if "accounts.google.com" in current_url_after:
# Try one more position
print("Second click didn't work, trying top-left position")
await computer.click(400, 300)
await computer.wait(2000)
# Wait longer for login to complete
await computer.wait(5000)
# Check if we're still on login page
final_url = await computer.get_current_url()
if "accounts.google.com" not in final_url:
print("Successfully navigated past login page!")
else:
print("Still on login page after auto-login attempts")
except Exception as e:
print(f"Auto-login attempt failed: {e}")
# Create result object
result = self._create_result_object(items)
# Add conversation history
result.conversation_history = self.conversation_history
return result
def is_done(self, items):
"""Check if the task is complete"""
# Look at the last message from the assistant
for item in reversed(items):
if item.get("role") == "assistant":
content = item.get("content", [])
# Extract text content
message_content = ""
if isinstance(content, list):
for content_item in content:
if content_item.get("type") == "output_text":
message_content += content_item.get("text", "").lower() + " "
# Check if response contains a final determination
final_lines = message_content.split("\n")
for line in final_lines:
line = line.strip()
# Look for standalone "Test PASSED" or "Test FAILED" indicators
if line.startswith("test passed") or line.startswith("test failed"):
return True
# Also check for specific patterns that indicate completion
if " test passed" in message_content or " test failed" in message_content:
return True
# Check for explicit pass/fail words near the end
last_section = message_content[-100:] if len(message_content) > 100 else message_content
if "passed" in last_section or "failed" in last_section:
return True
# If no computer call items and we have some messages, we're probably done
has_computer_call = False
for item in items:
if item.get("type") == "computer_call":
has_computer_call = True
break
if not has_computer_call and len(items) > 2:
# If no computer calls and we have some exchanges, we're probably done
return True
return False
def _create_result_object(self, items):
"""Create a result object with success/failure determination"""
# Default values
success = False
result_message = ""
# Look at the last message from the assistant
for item in reversed(items):
if item.get("role") == "assistant":
content = item.get("content", [])
# Extract text content
full_content = ""
if isinstance(content, list):
for content_item in content:
if content_item.get("type") == "output_text":
full_content += content_item.get("text", "") + " "
# Make lowercase for checking
message_content = full_content.lower()
# First, look for explicit "Test PASSED" or "Test FAILED" statements
final_decision = None
final_lines = full_content.split("\n")
# Look for lines containing our explicit pass/fail markers
for line in final_lines:
line_lower = line.lower().strip()
if line_lower.startswith("test passed"):
final_decision = "PASS"
break
elif line_lower.startswith("test failed"):
final_decision = "FAIL"
break
# If we didn't find an explicit marker, check the last paragraph
if not final_decision:
# Get the last few sentences (likely to contain the conclusion)
last_section = full_content[-200:] if len(full_content) > 200 else full_content
# Look for pass/fail indicators in the last section
if "passed" in last_section.lower() and not any(x in last_section.lower() for x in ["not passed", "failed"]):
final_decision = "PASS"
elif "failed" in last_section.lower():
final_decision = "FAIL"
# Make final determination
if final_decision == "PASS":
success = True
# Ensure it has the proper format for consistent logging
if not message_content.startswith("test passed"):
result_message = "Test PASSED. " + full_content
else:
result_message = full_content
elif final_decision == "FAIL":
success = False
# Ensure it has the proper format for consistent logging
if not message_content.startswith("test failed"):
result_message = "Test FAILED. " + full_content
else:
result_message = full_content
else:
# Extract the last word of the message to check for a final PASS/FAIL
last_words = message_content.strip().split()
if last_words and last_words[-1].lower() in ["pass", "passed"]:
success = True
result_message = "Test PASSED. " + full_content
elif last_words and last_words[-1].lower() in ["fail", "failed"]:
success = False
result_message = "Test FAILED. " + full_content
else:
# Unable to determine - explicitly mark as inconclusive
success = False
# Use UNCERTAIN prefix to ensure consistent classification
result_message = f"UNCERTAIN: Test FAILED. Could not determine a clear pass/fail status. Full output: {full_content}"
break
# Return an object with the results
return type('AgentResult', (), {
"success": success,
"message": result_message,
"screen_captures": self.screen_captures
})
def create_gif(self, gif_path):
"""Create a GIF from captured screenshots"""
if not self.screen_captures:
print(f"\033[93mWarning: No screenshots captured for GIF creation\033[0m")
return False
try:
# Make sure the directory exists
Path(gif_path).parent.mkdir(parents=True, exist_ok=True)
# Write GIF file
imageio.mimsave(gif_path, self.screen_captures, fps=1)
print(f"\033[94mCreated GIF with {len(self.screen_captures)} frames at {gif_path}\033[0m")
return True
except Exception as e:
print(f"\033[93mWarning: Failed to create GIF: {str(e)}\033[0m")
return False
def extract_url_from_task(self, task):
"""Extract the URL to navigate to from the task description"""
import re
# Special case: If we find a URL: line in the task with a complete URL, use that
if "URL:" in task:
url_line_match = re.search(r"URL:\s*(https?://[^\s\n]+)", task)
if url_line_match:
url = url_line_match.group(1)
# Strip any punctuation that might have been included
url = url.rstrip('.,;:)')
print(f"Found URL in task: {url}")
return url
# Look for common URL patterns in the task
url_patterns = [
r"Navigate to (https?://[^\s]+)",
r"Go to (https?://[^\s]+)",
r"Visit (https?://[^\s]+)",
r"Open (https?://[^\s]+)",
r"Access (https?://[^\s]+)",
r"URL: (https?://[^\s]+)",
r"Navigate to the URL ([^\s]+)"
]
for pattern in url_patterns:
match = re.search(pattern, task)
if match:
url = match.group(1)
# Strip any punctuation that might have been included
url = url.rstrip('.,;:)')
print(f"Found URL from pattern match: {url}")
return url
# If no URL found, extract from the base_url that was added to the task
base_url_match = re.search(r"base_url:\s*([^\s\n]+)", task, re.IGNORECASE)
if base_url_match:
base_url = base_url_match.group(1).strip()
# Assume base_url is a path and convert to full URL
if not urlparse(base_url).scheme:
# Strip leading slashes to avoid double slashes
path = base_url.lstrip('/')
# Use the first allowed domain as the host
host = f"https://{self.allowed_domains[0]}" if self.allowed_domains else None
if host:
full_url = f"{host}/{path}"
print(f"Found base_url in task: {base_url} -> {full_url}")
return full_url
else:
print(f"Found base_url in task: {base_url}")
return base_url
# Look for any HTTP URLs in the task
url_regex = re.compile(r'https?://[^\s\'"]+')
matches = url_regex.findall(task)
if matches:
# Clean up the URL
url = matches[0].rstrip('.,;:)')
print(f"Found URL via general regex: {url}")
return url
# If no URL found, check for domain references that might indicate a URL
for domain in self.allowed_domains:
if domain in task and domain != 'about:blank' and not domain.startswith('.'):
full_url = f"https://{domain}"
print(f"Found domain reference in task: {full_url}")
return full_url
print("No URL found in task")
return None