browser-use MCP Server
by williamvd4
Verified
- browser-use-mcp-server
- server
"""
Browser Use MCP Server
This module implements an MCP (Model-Control-Protocol) server for browser automation
using the browser_use library. It provides functionality to interact with a browser instance
via an async task queue, allowing for long-running browser tasks to be executed asynchronously
while providing status updates and results.
The server supports Server-Sent Events (SSE) for web-based interfaces.
"""
import os
import click
import asyncio
import uuid
from datetime import datetime
from dotenv import load_dotenv
import logging
import json
import traceback
# Import from browser-use library
from browser_use.browser.context import BrowserContextConfig, BrowserContext
from browser_use.browser.browser import Browser, BrowserConfig
from browser_use import Agent
# Import MCP server components
from mcp.server.lowlevel import Server
import mcp.types as types
# Import LLM provider
from langchain_openai import ChatOpenAI
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Task storage for async operations
task_store = {}
# Flag to track browser context health
browser_context_healthy = True
# Store global browser context and configuration
browser = None
context = None
config = None
async def reset_browser_context():
"""
Reset the browser context to a clean state.
This function attempts to close the existing context and create a new one.
If that fails, it tries to recreate the entire browser instance.
"""
global context, browser, browser_context_healthy, config
logger.info("Resetting browser context")
try:
# Try to close the existing context
try:
await context.close()
except Exception as e:
logger.warning(f"Error closing browser context: {str(e)}")
# Create a new context
context = BrowserContext(browser=browser, config=config)
browser_context_healthy = True
logger.info("Browser context reset successfully")
except Exception as e:
logger.error(f"Failed to reset browser context: {str(e)}")
browser_context_healthy = False
# If we can't reset the context, try to reset the browser
try:
await browser.close()
# Recreate browser with same configuration
browser_config = BrowserConfig(
extra_chromium_args=[
"--no-sandbox",
"--disable-gpu",
"--disable-software-rasterizer",
"--disable-dev-shm-usage",
"--remote-debugging-port=9222",
],
)
# Only set chrome_instance_path if we have a path from environment or command line
chrome_path = os.environ.get("CHROME_PATH")
if chrome_path:
browser_config.chrome_instance_path = chrome_path
browser = Browser(config=browser_config)
context = BrowserContext(browser=browser, config=config)
browser_context_healthy = True
logger.info("Browser reset successfully")
except Exception as e:
logger.error(f"Failed to reset browser: {str(e)}")
browser_context_healthy = False
async def check_browser_health():
"""
Check if the browser context is healthy by attempting to access the current page.
If the context is unhealthy, attempts to reset it.
Returns:
bool: True if the browser context is healthy, False otherwise.
"""
global browser_context_healthy
if not browser_context_healthy:
await reset_browser_context()
return browser_context_healthy
try:
# Simple health check - try to get the current page
await context.get_current_page()
return True
except Exception as e:
logger.warning(f"Browser health check failed: {str(e)}")
browser_context_healthy = False
await reset_browser_context()
return browser_context_healthy
async def run_browser_task_async(task_id, url, action, llm):
"""
Run a browser task asynchronously and store the result.
This function executes a browser automation task with the given URL and action,
and updates the task store with progress and results.
Args:
task_id (str): Unique identifier for the task
url (str): URL to navigate to
action (str): Action to perform after navigation
llm: Language model to use for browser agent
"""
try:
# Update task status to running
task_store[task_id]["status"] = "running"
task_store[task_id]["start_time"] = datetime.now().isoformat()
task_store[task_id]["progress"] = {
"current_step": 0,
"total_steps": 0,
"steps": [],
}
# Reset browser context to ensure a clean state
await reset_browser_context()
# Check browser health
if not await check_browser_health():
task_store[task_id]["status"] = "failed"
task_store[task_id]["end_time"] = datetime.now().isoformat()
task_store[task_id]["error"] = (
"Browser context is unhealthy and could not be reset"
)
return
# Define step callback function with the correct signature
async def step_callback(browser_state, agent_output, step_number):
# Update progress in task store
task_store[task_id]["progress"]["current_step"] = step_number
task_store[task_id]["progress"]["total_steps"] = max(
task_store[task_id]["progress"]["total_steps"], step_number
)
# Add step info with minimal details
step_info = {"step": step_number, "time": datetime.now().isoformat()}
# Add goal if available
if agent_output and hasattr(agent_output, "current_state"):
if hasattr(agent_output.current_state, "next_goal"):
step_info["goal"] = agent_output.current_state.next_goal
# Add to progress steps
task_store[task_id]["progress"]["steps"].append(step_info)
# Log progress
logger.info(f"Task {task_id}: Step {step_number} completed")
# Define done callback function with the correct signature
async def done_callback(history):
# Log completion
logger.info(f"Task {task_id}: Completed with {len(history.history)} steps")
# Add final step
current_step = task_store[task_id]["progress"]["current_step"] + 1
task_store[task_id]["progress"]["steps"].append(
{
"step": current_step,
"time": datetime.now().isoformat(),
"status": "completed",
}
)
# Use the existing browser context with callbacks
agent = Agent(
task=f"First, navigate to {url}. Then, {action}",
llm=llm,
browser_context=context,
register_new_step_callback=step_callback,
register_done_callback=done_callback,
)
# Run the agent with a reasonable step limit
ret = await agent.run(max_steps=10)
# Get the final result
final_result = ret.final_result()
# Check if we have a valid result
if final_result and hasattr(final_result, "raise_for_status"):
final_result.raise_for_status()
result_text = str(final_result.text)
else:
result_text = (
str(final_result) if final_result else "No final result available"
)
# Gather essential information from the agent history
is_successful = ret.is_successful()
has_errors = ret.has_errors()
errors = ret.errors()
urls_visited = ret.urls()
action_names = ret.action_names()
extracted_content = ret.extracted_content()
steps_taken = ret.number_of_steps()
# Create a focused response with the most relevant information
response_data = {
"final_result": result_text,
"success": is_successful,
"has_errors": has_errors,
"errors": [str(err) for err in errors if err],
"urls_visited": [str(url) for url in urls_visited if url],
"actions_performed": action_names,
"extracted_content": extracted_content,
"steps_taken": steps_taken,
}
# Store the result
task_store[task_id]["status"] = "completed"
task_store[task_id]["end_time"] = datetime.now().isoformat()
task_store[task_id]["result"] = response_data
except Exception as e:
logger.error(f"Error in async browser task: {str(e)}")
tb = traceback.format_exc()
# Mark the browser context as unhealthy
global browser_context_healthy
browser_context_healthy = False
# Store the error
task_store[task_id]["status"] = "failed"
task_store[task_id]["end_time"] = datetime.now().isoformat()
task_store[task_id]["error"] = str(e)
task_store[task_id]["traceback"] = tb
finally:
# Always try to reset the browser context to a clean state after use
try:
current_page = await context.get_current_page()
await current_page.goto("about:blank")
except Exception as e:
logger.warning(f"Error resetting page state: {str(e)}")
browser_context_healthy = False
async def cleanup_old_tasks():
"""
Periodically clean up old completed tasks to prevent memory leaks.
This function runs continuously in the background, removing tasks that have been
completed or failed for more than 1 hour to conserve memory.
"""
while True:
try:
# Sleep first to avoid cleaning up tasks too early
await asyncio.sleep(3600) # Run cleanup every hour
current_time = datetime.now()
tasks_to_remove = []
# Find completed tasks older than 1 hour
for task_id, task_data in task_store.items():
if (
task_data["status"] in ["completed", "failed"]
and "end_time" in task_data
):
end_time = datetime.fromisoformat(task_data["end_time"])
hours_elapsed = (current_time - end_time).total_seconds() / 3600
if hours_elapsed > 1: # Remove tasks older than 1 hour
tasks_to_remove.append(task_id)
# Remove old tasks
for task_id in tasks_to_remove:
del task_store[task_id]
if tasks_to_remove:
logger.info(f"Cleaned up {len(tasks_to_remove)} old tasks")
except Exception as e:
logger.error(f"Error in task cleanup: {str(e)}")
def create_mcp_server(llm, task_expiry_minutes=60):
"""
Create and configure an MCP server for browser interaction.
Args:
llm: The language model to use for browser agent
task_expiry_minutes (int): Minutes after which tasks are considered expired
Returns:
Server: Configured MCP server instance
"""
# Create MCP server instance
app = Server("browser_use")
@app.call_tool()
async def call_tool(
name: str, arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
global browser_context_healthy
# Handle browser_use tool
if name == "browser_use":
# Check required arguments
if "url" not in arguments:
raise ValueError("Missing required argument 'url'")
if "action" not in arguments:
raise ValueError("Missing required argument 'action'")
# Generate a task ID
task_id = str(uuid.uuid4())
# Initialize task in store
task_store[task_id] = {
"id": task_id,
"status": "pending",
"url": arguments["url"],
"action": arguments["action"],
"created_at": datetime.now().isoformat(),
}
# Start task in background
asyncio.create_task(
run_browser_task_async(
task_id, arguments["url"], arguments["action"], llm
)
)
# Estimate task duration
estimated_seconds = 60 # Default estimate of 60 seconds
# Return task ID immediately with explicit sleep instruction
return [
types.TextContent(
type="text",
text=json.dumps(
{
"task_id": task_id,
"status": "pending",
"message": f"Browser task started. Please wait for {estimated_seconds} seconds, then check the result using browser_get_result or the resource URI. Always wait exactly 5 seconds between status checks.",
"estimated_time": f"{estimated_seconds} seconds",
"resource_uri": f"resource://browser_task/{task_id}",
"sleep_command": "sleep 5",
"instruction": "Use the terminal command 'sleep 5' to wait 5 seconds between status checks. IMPORTANT: Always use exactly 5 seconds, no more and no less.",
},
indent=2,
),
)
]
# Handle browser_get_result tool
elif name == "browser_get_result":
# Get result of async task
if "task_id" not in arguments:
raise ValueError("Missing required argument 'task_id'")
task_id = arguments["task_id"]
if task_id not in task_store:
return [
types.TextContent(
type="text",
text=json.dumps(
{"error": "Task not found", "task_id": task_id}, indent=2
),
)
]
# Get the current task data
task_data = task_store[task_id].copy()
# If task is still running, add simple guidance
if task_data["status"] == "running":
# Add a simple next check suggestion
progress = task_data.get("progress", {})
current_step = progress.get("current_step", 0)
if current_step > 0:
# Simple message based on current step
task_data["message"] = (
f"Task is running (step {current_step}). Wait 5 seconds before checking again."
)
task_data["sleep_command"] = "sleep 5"
task_data["instruction"] = (
"Use the terminal command 'sleep 5' to wait 5 seconds before checking again. IMPORTANT: Always use exactly 5 seconds, no more and no less."
)
else:
task_data["message"] = (
"Task is starting. Wait 5 seconds before checking again."
)
task_data["sleep_command"] = "sleep 5"
task_data["instruction"] = (
"Use the terminal command 'sleep 5' to wait 5 seconds before checking again. IMPORTANT: Always use exactly 5 seconds, no more and no less."
)
# Return current task status and result if available
return [
types.TextContent(type="text", text=json.dumps(task_data, indent=2))
]
else:
raise ValueError(f"Unknown tool: {name}")
@app.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="browser_use",
description="Performs a browser action and returns a task ID for async execution",
inputSchema={
"type": "object",
"required": ["url", "action"],
"properties": {
"url": {
"type": "string",
"description": "URL to navigate to",
},
"action": {
"type": "string",
"description": "Action to perform in the browser",
},
},
},
),
types.Tool(
name="browser_get_result",
description="Gets the result of an asynchronous browser task",
inputSchema={
"type": "object",
"required": ["task_id"],
"properties": {
"task_id": {
"type": "string",
"description": "ID of the task to get results for",
}
},
},
),
]
@app.list_resources()
async def list_resources() -> list[types.Resource]:
# List all completed tasks as resources
resources = []
for task_id, task_data in task_store.items():
if task_data["status"] in ["completed", "failed"]:
resources.append(
types.Resource(
uri=f"resource://browser_task/{task_id}",
title=f"Browser Task Result: {task_id[:8]}",
description=f"Result of browser task for URL: {task_data.get('url', 'unknown')}",
)
)
return resources
@app.read_resource()
async def read_resource(uri: str) -> list[types.ResourceContents]:
# Extract task ID from URI
if not uri.startswith("resource://browser_task/"):
return [
types.ResourceContents(
type="text",
text=json.dumps(
{"error": f"Invalid resource URI: {uri}"}, indent=2
),
)
]
task_id = uri.replace("resource://browser_task/", "")
if task_id not in task_store:
return [
types.ResourceContents(
type="text",
text=json.dumps({"error": f"Task not found: {task_id}"}, indent=2),
)
]
# Return task data
return [
types.ResourceContents(
type="text", text=json.dumps(task_store[task_id], indent=2)
)
]
# Add cleanup_old_tasks function to app for later scheduling
app.cleanup_old_tasks = cleanup_old_tasks
return app
@click.command()
@click.option("--port", default=8000, help="Port to listen on for SSE")
@click.option(
"--chrome-path",
default=None,
help="Path to Chrome executable",
)
@click.option(
"--window-width",
default=1280,
help="Browser window width",
)
@click.option(
"--window-height",
default=1100,
help="Browser window height",
)
@click.option(
"--locale",
default="en-US",
help="Browser locale",
)
@click.option(
"--task-expiry-minutes",
default=60,
help="Minutes after which tasks are considered expired",
)
def main(
port: int,
chrome_path: str,
window_width: int,
window_height: int,
locale: str,
task_expiry_minutes: int,
) -> int:
"""
Run the browser-use MCP server.
This function initializes the browser context, creates the MCP server,
and runs it with the SSE transport.
"""
global browser, context, config, browser_context_healthy
# Use Chrome path from command line arg, environment variable, or None
chrome_executable_path = chrome_path or os.environ.get("CHROME_PATH")
if chrome_executable_path:
logger.info(f"Using Chrome path: {chrome_executable_path}")
else:
logger.info(
"No Chrome path specified, letting Playwright use its default browser"
)
# Initialize browser context
try:
# Browser context configuration
config = BrowserContextConfig(
wait_for_network_idle_page_load_time=0.6,
maximum_wait_page_load_time=1.2,
minimum_wait_page_load_time=0.2,
browser_window_size={"width": window_width, "height": window_height},
locale=locale,
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36",
highlight_elements=True,
viewport_expansion=0,
)
# Initialize browser and context directly
browser_config = BrowserConfig(
extra_chromium_args=[
"--no-sandbox",
"--disable-gpu",
"--disable-software-rasterizer",
"--disable-dev-shm-usage",
"--remote-debugging-port=9222",
],
)
# Only set chrome_instance_path if we actually found a path
if chrome_executable_path:
browser_config.chrome_instance_path = chrome_executable_path
browser = Browser(config=browser_config)
context = BrowserContext(browser=browser, config=config)
browser_context_healthy = True
logger.info("Browser context initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize browser context: {str(e)}")
return 1
# Initialize LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0.0)
# Create MCP server
app = create_mcp_server(
llm=llm,
task_expiry_minutes=task_expiry_minutes,
)
# Set up SSE transport
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Mount, Route
import uvicorn
sse = SseServerTransport("/messages/")
async def handle_sse(request):
try:
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await app.run(
streams[0], streams[1], app.create_initialization_options()
)
except Exception as e:
logger.error(f"Error in handle_sse: {str(e)}")
# Ensure browser context is reset if there's an error
asyncio.create_task(reset_browser_context())
raise
starlette_app = Starlette(
debug=True,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
# Add a startup event to initialize the browser
@starlette_app.on_event("startup")
async def startup_event():
logger.info("Starting browser context...")
await reset_browser_context()
logger.info("Browser context started")
# Start background task cleanup
asyncio.create_task(app.cleanup_old_tasks())
logger.info("Task cleanup process scheduled")
# Add a shutdown event to clean up browser resources
@starlette_app.on_event("shutdown")
async def shutdown_event():
logger.info("Shutting down browser context...")
try:
await browser.close()
logger.info("Browser context closed successfully")
except Exception as e:
logger.error(f"Error closing browser: {str(e)}")
# Run uvicorn server
uvicorn.run(starlette_app, host="0.0.0.0", port=port)
return 0
if __name__ == "__main__":
main()