MCP Web Browser Server
by random-robbie
Verified
- mcp-web-browser
- src
- mcp_web_browser
from typing import Optional, Union, Any
import asyncio
import base64
import sys
from mcp.server import FastMCP
# Create an MCP server for web browsing
mcp = FastMCP("Web Browser")
# Global browser and page management
_browser = None
_browser_context = None
_current_page = None
_playwright_instance = None
# Dynamic import of Playwright to avoid early import errors
def _import_playwright():
from playwright.async_api import async_playwright
return async_playwright
async def _ensure_browser():
"""Ensure a browser instance is available with SSL validation disabled"""
global _browser, _browser_context, _playwright_instance
if _browser is None:
playwright_module = _import_playwright()
_playwright_instance = await playwright_module().start()
_browser = await _playwright_instance.chromium.launch()
# Create a browser context that ignores HTTPS errors
_browser_context = await _browser.new_context(
ignore_https_errors=True, # Ignore SSL certificate errors
)
return _browser, _browser_context
async def _close_current_page():
"""Close the current page if it exists"""
global _current_page
if _current_page:
try:
await _current_page.close()
except Exception:
pass
_current_page = None
async def _safe_cleanup():
"""Safely clean up browser resources"""
global _browser, _current_page, _browser_context, _playwright_instance
try:
if _current_page:
try:
await _current_page.close()
except Exception:
pass
if _browser_context:
try:
await _browser_context.close()
except Exception:
pass
if _browser:
try:
await _browser.close()
except Exception:
pass
if _playwright_instance:
try:
await _playwright_instance.stop()
except Exception:
pass
except Exception as e:
# Log the error, but don't re-raise
print(f"Error during cleanup: {e}", file=sys.stderr)
finally:
# Reset global variables
_browser = None
_browser_context = None
_current_page = None
_playwright_instance = None
@mcp.tool()
async def browse_to(url: str, context: Optional[Any] = None) -> str:
"""
Navigate to a specific URL and return the page's HTML content.
Args:
url: The full URL to navigate to
context: Optional context object for logging (ignored)
Returns:
The full HTML content of the page
"""
global _current_page, _browser, _browser_context
# Ensure browser is launched with SSL validation disabled
_, browser_context = await _ensure_browser()
# Close any existing page
await _close_current_page()
# Optional logging, but do nothing with context
print(f"Navigating to {url}", file=sys.stderr)
try:
# Create a new page and navigate
_current_page = await browser_context.new_page()
# Additional options to handle various SSL/security issues
await _current_page.goto(url,
wait_until='networkidle',
timeout=30000, # 30 seconds timeout
)
# Get full page content including dynamically loaded JavaScript
page_content = await _current_page.content()
# Optional: extract additional metadata
try:
title = await _current_page.title()
print(f"Page title: {title}", file=sys.stderr)
except Exception:
pass
return page_content
except Exception as e:
print(f"Error navigating to {url}: {e}", file=sys.stderr)
raise
@mcp.tool()
async def extract_text_content(
selector: Optional[str] = None,
context: Optional[Any] = None
) -> str:
"""
Extract text content from the current page, optionally using a CSS selector.
Args:
selector: Optional CSS selector to target specific elements
context: Optional context object for logging (ignored)
Returns:
Extracted text content
"""
global _current_page
if not _current_page:
raise ValueError("No page is currently loaded. Use browse_to first.")
try:
if selector:
# If selector is provided, extract text from matching elements
elements = await _current_page.query_selector_all(selector)
text_content = "\n".join([await el.inner_text() for el in elements])
print(f"Extracted text from selector: {selector}", file=sys.stderr)
else:
# If no selector, extract all visible text from the page
text_content = await _current_page.inner_text('body')
return text_content
except Exception as e:
print(f"Error extracting text: {e}", file=sys.stderr)
raise ValueError(f"Error extracting text: {e}")
@mcp.tool()
async def click_element(
selector: str,
context: Optional[Any] = None
) -> str:
"""
Click an element on the current page.
Args:
selector: CSS selector for the element to click
context: Optional context object for logging (ignored)
Returns:
Confirmation message or error details
"""
global _current_page
if not _current_page:
raise ValueError("No page is currently loaded. Use browse_to first.")
try:
element = await _current_page.query_selector(selector)
if not element:
raise ValueError(f"No element found with selector: {selector}")
await element.click()
print(f"Clicked element: {selector}", file=sys.stderr)
return f"Successfully clicked element: {selector}"
except Exception as e:
print(f"Error clicking element: {e}", file=sys.stderr)
raise ValueError(f"Error clicking element: {e}")
@mcp.tool()
async def get_page_screenshots(
full_page: bool = False,
selector: Optional[str] = None,
context: Optional[Any] = None
) -> str:
"""
Capture screenshot of the current page.
Args:
full_page: Whether to capture the entire page or just the viewport
selector: Optional CSS selector to screenshot a specific element
context: Optional context object for logging (ignored)
Returns:
Base64 encoded screenshot image
"""
global _current_page
if not _current_page:
raise ValueError("No page is currently loaded. Use browse_to first.")
try:
if selector:
element = await _current_page.query_selector(selector)
if not element:
raise ValueError(f"No element found with selector: {selector}")
screenshot_bytes = await element.screenshot()
else:
screenshot_bytes = await _current_page.screenshot(full_page=full_page)
# Convert to base64 for easy transmission
screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
print(f"Screenshot captured: {'full page' if full_page else 'viewport'}", file=sys.stderr)
return screenshot_base64
except Exception as e:
print(f"Error capturing screenshot: {e}", file=sys.stderr)
raise ValueError(f"Error capturing screenshot: {e}")
@mcp.tool()
async def get_page_links(context: Optional[Any] = None) -> list[str]:
"""
Extract all links from the current page.
Args:
context: Optional context object for logging (ignored)
Returns:
List of links found on the page
"""
global _current_page
if not _current_page:
raise ValueError("No page is currently loaded. Use browse_to first.")
try:
# Use JavaScript to extract all links
links = await _current_page.evaluate("""
() => {
const links = document.querySelectorAll('a');
return Array.from(links).map(link => link.href);
}
""")
print(f"Extracted {len(links)} links from the page", file=sys.stderr)
return links
except Exception as e:
print(f"Error extracting links: {e}", file=sys.stderr)
raise ValueError(f"Error extracting links: {e}")
@mcp.tool()
async def input_text(
selector: str,
text: str,
context: Optional[Any] = None
) -> str:
"""
Input text into a specific element on the page.
Args:
selector: CSS selector for the input element
text: Text to input
context: Optional context object for logging (ignored)
Returns:
Confirmation message
"""
global _current_page
if not _current_page:
raise ValueError("No page is currently loaded. Use browse_to first.")
try:
element = await _current_page.query_selector(selector)
if not element:
raise ValueError(f"No element found with selector: {selector}")
await element.fill(text)
print(f"Input text into element: {selector}", file=sys.stderr)
return f"Successfully input text into element: {selector}"
except Exception as e:
print(f"Error inputting text: {e}", file=sys.stderr)
raise ValueError(f"Error inputting text: {e}")
def main():
try:
mcp.run()
except Exception as e:
print(f"Error running MCP server: {e}", file=sys.stderr)
finally:
# Use a separate event loop to ensure cleanup
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_safe_cleanup())
loop.close()
except Exception as cleanup_error:
print(f"Cleanup error: {cleanup_error}", file=sys.stderr)
if __name__ == "__main__":
main()