Browser Use Server
by ztobs
from browser_use import Agent, Browser
from langchain_openai import ChatOpenAI
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_ollama import ChatOllama
from pydantic import SecretStr
from browser_use.browser.browser import BrowserConfig
from browser_use.browser.context import BrowserContextConfig
from dotenv import load_dotenv
import json
import base64
import sys
import asyncio
import os
import time
# Load environment variables from .env file
load_dotenv()
SCREENSHOT_DIR = os.path.join('.', 'screenshots')
async def handle_command(command, args):
"""Handle different browser commands"""
# Ensure screenshot directory exists
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# Define LLM configurations
llm_configs = {
'OLLAMA_API_KEY': {
'class': ChatOllama,
'params': {
'base_url': 'http://localhost:11434',
'model': 'qwen2.5:32b-instruct-q4_K_M',
'num_ctx': 32000
}
},
'GLHF_API_KEY': {
'class': ChatOpenAI,
'params': {
'base_url': 'https://glhf.chat/api/openai/v1',
'model': 'deepseek-ai/DeepSeek-V3'
}
},
'GROQ_API_KEY': {
'class': ChatOpenAI,
'params': {
'base_url': 'https://api.groq.com/openai/v1',
'model': 'deepseek-r1-distill-llama-70b'
}
},
'OPENAI_API_KEY': {
'class': ChatOpenAI,
'params': {
'base_url': 'https://api.openai.com/v1',
'model': 'gpt-4o-mini'
}
},
'OPENROUTER_API_KEY': {
'class': ChatOpenAI,
'params': {
'base_url': 'https://openrouter.ai/api/v1',
'model': 'deepseek/deepseek-chat'
}
},
'GITHUB_API_KEY': {
'class': ChatOpenAI,
'params': {
'base_url': 'https://models.inference.ai.azure.com',
'model': 'gpt-4o-mini'
}
},
'DEEPSEEK_API_KEY': {
'class': ChatOpenAI,
'params': {
'base_url': 'https://api.deepseek.com/v1',
'model': 'deepseek-chat'
}
},
'GEMINI_API_KEY': {
'class': ChatGoogleGenerativeAI,
'params': {
'model': 'gemini-2.0-flash-exp'
}
}
}
# Check for available API keys and select the first one found
for env_key, config in llm_configs.items():
api_key = os.getenv(env_key)
if api_key:
print(f"[DEBUG] Using {env_key}")
llm_class = config['class']
params = config['params'].copy() # Create a copy to avoid modifying the original
# Check if MODEL env var is set and override the default model
custom_model = os.getenv('MODEL')
if custom_model:
print(f"[DEBUG] Using custom model: {custom_model}")
params['model'] = custom_model
# Check if BASE_URL env var is set and override the default base_url
custom_base_url = os.getenv('BASE_URL')
if custom_base_url and 'base_url' in params:
print(f"[DEBUG] Using custom base URL: {custom_base_url}")
params['base_url'] = custom_base_url
params['api_key'] = SecretStr(api_key)
llm = llm_class(**params)
break
else:
return {
'success': False,
'error': 'No API key found. Please set one of the following environment variables: ' + ', '.join(llm_configs.keys())
}
# Configure browser with longer timeouts
context_config = BrowserContextConfig(
save_recording_path="../generated/recordings/",
cookies_file="../generated/cookies.json",
wait_for_network_idle_page_load_time=3.0,
browser_window_size={'width': 1280, 'height': 1100},
locale='en-US',
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36',
highlight_elements=True,
viewport_expansion=500,
# allowed_domains=['google.com', 'wikipedia.org'],
)
# Check if running under xvfb-run
running_under_xvfb = os.getenv('RUNNING_UNDER_XVFB') == 'true'
config_args = {
'headless': True,
'disable_security': True,
'new_context_config': context_config
}
# Only include chrome_instance_path when running under xvfb
if running_under_xvfb:
config_args['chrome_instance_path'] = '/usr/bin/google-chrome'
config_args['headless'] = False
config = BrowserConfig(**config_args)
browser = Browser(config=config)
context = await browser.new_context()
try:
if command == 'screenshot':
if not args.get('url'):
return {
'success': False,
'error': 'URL is required for screenshot command'
}
task = f"1. Go to {args['url']}"
if args.get('steps'):
steps = args['steps'].split(',')
for i, step in enumerate(steps, 2):
task += f"\n{i}. {step.strip()}"
task += f"\n{len(steps) + 2}. Take a screenshot"
else:
task += "\n2. Take a screenshot"
if args.get('full_page'):
task += " of the full page"
print(f"[DEBUG] Creating agent for task: {task}")
use_vision = os.getenv('USE_VISION', 'false').lower() == 'true'
agent = Agent(task=task, llm=llm, use_vision=use_vision, browser_context=context)
print("[DEBUG] Running agent")
await agent.run()
print("[DEBUG] Agent run completed")
# Get the screenshot from the browser context
try:
# await context.navigate_to(args['url'])
screenshot_base64 = await context.take_screenshot(full_page=args.get('full_page', False))
filename = f"screenshot_{int(time.time())}.png"
filepath = os.path.join(SCREENSHOT_DIR, filename)
# Decode base64 and save image
screenshot_bytes = base64.b64decode(screenshot_base64)
with open(filepath, 'wb') as f:
f.write(screenshot_bytes)
return {
'success': True,
'screenshot': screenshot_base64, # Keep base64 for potential direct display
'filepath': os.path.abspath(filepath) # Include full file path in response
}
finally:
await context.close()
elif command == 'get_html':
if not args.get('url'):
return {
'success': False,
'error': 'URL is required for get_html command'
}
task = f"1. Go to {args['url']}"
if args.get('steps'):
steps = args['steps'].split(',')
for i, step in enumerate(steps, 2):
task += f"\n{i}. {step.strip()}"
task += f"\n{len(steps) + 2}. Get the page HTML"
else:
task += "\n2. Get the page HTML"
use_vision = os.getenv('USE_VISION', 'false').lower() == 'true'
agent = Agent(task=task, llm=llm, use_vision=use_vision, browser_context=context)
await agent.run()
try:
html = await context.get_page_html()
return {
'success': True,
'html': html
}
finally:
await context.close()
elif command == 'execute_js':
if not args.get('url') or not args.get('script'):
return {
'success': False,
'error': 'URL and script are required for execute_js command'
}
task = f"1. Go to {args['url']}"
if args.get('steps'):
steps = args['steps'].split(',')
for i, step in enumerate(steps, 2):
task += f"\n{i}. {step.strip()}"
task += f"\n{len(steps) + 2}. Execute JavaScript: {args['script']}"
else:
task += f"\n2. Execute JavaScript: {args['script']}"
use_vision = os.getenv('USE_VISION', 'false').lower() == 'true'
agent = Agent(task=task, llm=llm, use_vision=use_vision, browser_context=context)
await agent.run()
try:
result = await context.execute_javascript(args['script'])
return {
'success': True,
'result': result
}
finally:
await context.close()
elif command == 'get_console_logs':
if not args.get('url'):
return {
'success': False,
'error': 'URL is required for get_console_logs command'
}
console_messages = []
def on_console_message(msg):
console_messages.append(f"type: {msg.type}, text: {msg.text}, location: {msg.location}")
task = f"1. Go to {args['url']}"
if args.get('steps'):
steps = args['steps'].split(',')
for i, step in enumerate(steps, 2):
task += f"\n{i}. {step.strip()}"
task += f"\n{len(steps) + 2}. Get the console logs"
else:
task += f"\n2. Get the console logs"
use_vision = os.getenv('USE_VISION', 'false').lower() == 'true'
agent = Agent(task=task, llm=llm, use_vision=use_vision, browser_context=context)
await agent.run()
try:
# Execute JavaScript to get console logs
await context.execute_javascript("""
window._consoleLogs = [];
const originalConsole = window.console;
['log', 'info', 'warn', 'error'].forEach(level => {
window.console[level] = (...args) => {
window._consoleLogs.push({type: level, text: args.join(' ')});
originalConsole[level](...args);
};
});
""")
# Wait a bit for any console logs to be captured
await asyncio.sleep(1)
# Get the captured logs
logs = await context.execute_javascript("window._consoleLogs")
return {
'success': True,
'logs': logs
}
finally:
await context.close()
else:
return {
'success': False,
'error': f'Unknown command: {command}'
}
finally:
await browser.close()
async def main():
# Read command line arguments as JSON
args = json.loads(sys.argv[1])
command = args.get('command')
try:
result = await handle_command(command, args)
except Exception as e:
result = {
'success': False,
'error': str(e)
}
# Output result as JSON
print(json.dumps(result))
if __name__ == "__main__":
asyncio.run(main())