Skip to main content
Glama

Browser-MCP Server

by Euraxluo
browser_fastmcp_server.py119 kB
#!/usr/bin/env python3 """ Session-Based Browser-Use FastMCP Server A modern Model Context Protocol (MCP) server that provides advanced browser automation capabilities using the FastMCP framework. Features session-based instance management, TTL cleanup, PDF generation, file downloads, cookie management, and comprehensive browser configuration options. Each MCP session gets its own isolated browser instance automatically. """ import asyncio import logging import os import tempfile import threading import time import uuid import json from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass from typing import cast from pathlib import Path from typing import Optional, Dict, List, Literal from datetime import datetime,timezone from pydantic import BaseModel, Field, field_serializer from fastmcp import FastMCP,Context from fastmcp.server.dependencies import get_context from fastmcp.utilities.types import Image from fastmcp.prompts import Message from fastapi.encoders import jsonable_encoder from urllib.parse import urlparse import markdownify import mimetypes from browser_use.browser import BrowserProfile from browser_use.browser.session import BrowserSession from browser_use.filesystem.file_system import FileSystem from browser_use.dom.service import DomService # === Dowhen monkeypatch for browser-use get_page_info float->int fix === try: import dowhen def callback_special(_frame): print(_frame.f_locals["page_data"]) _frame.f_locals["page_data"]['viewport_width'] = int(_frame.f_locals["page_data"]['viewport_width']) _frame.f_locals["page_data"]['viewport_height'] = int(_frame.f_locals["page_data"]['viewport_height']) _frame.f_locals["page_data"]['page_width'] = int(_frame.f_locals["page_data"]['page_width']) _frame.f_locals["page_data"]['page_height'] = int(_frame.f_locals["page_data"]['page_height']) _frame.f_locals["page_data"]['scroll_x'] = int(_frame.f_locals["page_data"]['scroll_x']) _frame.f_locals["page_data"]['scroll_y'] = int(_frame.f_locals["page_data"]['scroll_y']) dowhen.do(callback_special).when( BrowserSession.get_page_info, "viewport_width = page_data['viewport_width']", "-1" ) except Exception as e: print(f"[dowhen patch] failed: {e}") # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # === Configuration Constants === DEFAULT_MAX_INSTANCES = int(os.getenv("BROWSER_MAXIMUM_INSTANCES", "100")) DEFAULT_INSTANCE_TTL = int(os.getenv("BROWSER_INSTANCE_TTL", "1800")) # 30 minutes DEFAULT_EXECUTE_TIMEOUT = int(os.getenv("BROWSER_EXECUTE_TIMEOUT", "30")) # 30 seconds CLEANUP_INTERVAL = int(os.getenv("BROWSER_CLEANUP_INTERVAL", "60")) # 1 minute # === Enhanced Data Models === class BrowserConfig(BaseModel): """Advanced browser configuration options""" headless: bool = Field(default=True, description="Run browser in headless mode") no_sandbox: bool = Field(default=True, description="Disable sandbox (for containers)") user_agent: Optional[str] = Field(default=None, description="Custom user agent string") viewport_width: int = Field(default=1352, description="Browser viewport width") viewport_height: int = Field(default=878, description="Browser viewport height") disable_web_security: bool = Field(default=True, description="Disable CORS for testing") class ElementInfo(BaseModel): """Information about a DOM element""" index: int = Field(description="Element index") tag: str = Field(description="HTML tag name") text: str = Field(description="Element text content") xpath: str = Field(description="Element XPath") attributes: Optional[dict] = Field(description="Element attributes") value: Optional[str] = Field(description="Element value") class PageState(BaseModel): """Current page state information""" url: str = Field(description="Current page URL") title: str = Field(description="Page title") interactive_elements_count: int = Field(description="Number of interactive elements") elements: List[ElementInfo] = Field(description="Interactive elements list") class TabInfo(BaseModel): """Browser tab information""" id: int = Field(description="Tab ID") title: str = Field(description="Tab title") url: str = Field(description="Tab URL") is_active: bool = Field(description="Whether this is the active tab") class ScreenshotResult(BaseModel): """Screenshot operation result""" success: bool = Field(description="Whether screenshot was successful") file_path: str = Field(description="Path to screenshot file") filename: str = Field(description="Screenshot filename") size_kb: float = Field(description="File size in KB") format: str = Field(description="Image format") target: str = Field(description="Screenshot target") timestamp: str = Field(description="Creation timestamp") class CookieInfo(BaseModel): """Cookie information""" name: str = Field(description="Cookie name") value: str = Field(description="Cookie value") domain: str = Field(description="Cookie domain") path: str = Field(description="Cookie path") http_only: bool = Field(default=False, description="HTTP only flag") secure: bool = Field(default=False, description="Secure flag") same_site: str = Field(default="Lax", description="SameSite attribute") expires: Optional[datetime] = Field(default=None, description="Expiration time") class DownloadResult(BaseModel): """File download result""" success: bool = Field(description="Whether download was successful") file_path: str = Field(description="Downloaded file path") file_name: str = Field(description="Downloaded filename") size_bytes: int = Field(description="File size in bytes") mime_type: str = Field(description="File MIME type") download_time: float = Field(description="Download time in seconds") class FileInfo(DownloadResult): relative_path: str = Field(description="Relative path") class DropdownOption(BaseModel): index: int = Field(description="Option index") text: str = Field(description="Option text") value: str = Field(description="Option value") class DropdownOptionsResult(BaseModel): options: List[DropdownOption] = Field(description="Dropdown options") id: Optional[str] = Field(description="Dropdown ID") name: Optional[str] = Field(description="Dropdown name") class PDFResult(BaseModel): """PDF generation result""" success: bool = Field(description="Whether PDF generation was successful") file_path: str = Field(description="Generated PDF file path") file_name: str = Field(description="PDF filename") size_kb: float = Field(description="File size in KB") source_url: Optional[str] = Field(description="Source URL if applicable") page_count: Optional[int] = Field(description="Number of pages") class SessionInstanceInfo(BaseModel): """Information about the current session's browser instance""" session_id: str = Field(description="Session ID") created_at: datetime = Field(description="Creation timestamp") last_used: datetime = Field(description="Last usage timestamp") config: BrowserConfig = Field(description="Browser configuration") temp_dir: str = Field(description="Temporary directory path") current_url: Optional[str] = Field(description="Current page URL") active_tabs: int = Field(description="Number of active tabs") screenshot_count: int = Field(description="Number of screenshots taken") active_tab_id: Optional[int] = Field(description="Active tab ID") current_title: Optional[str] = Field(description="Current page title") @field_serializer("created_at") def serialize_created_at(self, value: datetime, _info): return value.astimezone(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") @field_serializer("last_used") def serialize_last_used(self, value: datetime, _info): return value.astimezone(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") # === Global Playwright Management === _global_playwright = None async def get_global_playwright(): global _global_playwright if _global_playwright is None: from browser_use.browser.types import async_playwright _global_playwright = await async_playwright().start() return _global_playwright async def shutdown_global_playwright(): global _global_playwright if _global_playwright is not None: await _global_playwright.stop() _global_playwright = None # === Session-Based Browser Instance Management === @dataclass class SessionBrowserInstance: """Browser instance tied to a specific MCP session""" session_id: str browser_session: BrowserSession file_system: FileSystem temp_dir: str config: BrowserConfig created_at: datetime last_used: datetime screenshot_history: List[str] def __post_init__(self): if self.screenshot_history is None: self.screenshot_history = [] def update_last_used(self): """Update last used timestamp""" self.last_used = datetime.now() async def cleanup(self): """Clean up instance resources""" try: if self.browser_session: await self.browser_session.close() # Clean up temp directory if self.temp_dir and os.path.exists(self.temp_dir): import shutil shutil.rmtree(self.temp_dir) logger.info(f"Cleaned up session temp directory: {self.temp_dir}") except Exception as e: logger.error(f"Error cleaning up session {self.session_id}: {e}") class SessionBrowserManager: """Session-based browser instance manager""" def __init__(self, max_instances: int = DEFAULT_MAX_INSTANCES, default_ttl: int = DEFAULT_INSTANCE_TTL): self.instances: Dict[str, SessionBrowserInstance] = {} self.max_instances = max_instances self.default_ttl = default_ttl self._lock = threading.RLock() self._cleanup_task: Optional[asyncio.Task] = None self._shutdown = False self._playwright = None # Global playwright instance async def start_playwright(self): if self._playwright is None: self._playwright = await get_global_playwright() async def stop_playwright(self): if self._playwright is not None: await shutdown_global_playwright() self._playwright = None async def start_cleanup_task(self): """Start background cleanup task""" if self._cleanup_task is None: await self.start_playwright() # Start playwright self._cleanup_task = asyncio.create_task(self._auto_cleanup()) async def _auto_cleanup(self): """Background task for automatic instance cleanup""" while not self._shutdown: try: await self.cleanup_expired_instances() await asyncio.sleep(CLEANUP_INTERVAL) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in auto cleanup: {e}") await asyncio.sleep(CLEANUP_INTERVAL) async def cleanup_expired_instances(self): """Clean up expired instances""" with self._lock: expired_sessions = [] for session_id, instance in self.instances.items(): if (datetime.now() - instance.last_used).total_seconds() > self.default_ttl: expired_sessions.append(session_id) # Clean up expired instances for session_id in expired_sessions: await self.close_session(session_id) logger.info(f"Auto-cleaned expired session: {session_id}") async def get_or_create_session_instance( self, session_id: str, config: Optional[BrowserConfig] = None ) -> SessionBrowserInstance: """Get existing session instance or create new one""" with self._lock: if session_id in self.instances: instance = self.instances[session_id] instance.update_last_used() return instance # Check if we can create a new instance if len(self.instances) >= self.max_instances: raise Exception(f"Maximum number of instances ({self.max_instances}) exceeded") # Create new instance if config is None: config = BrowserConfig() # Create temp directory temp_dir = tempfile.mkdtemp(prefix=f"browser_session_{session_id}_") # Create session-specific user data directory user_data_dir = Path(temp_dir) / "user_data" user_data_dir.mkdir(parents=True, exist_ok=True) try: # Create browser profile with configuration profile_kwargs = { "headless": config.headless, "downloads_path": temp_dir, "user_data_dir": str(user_data_dir), # Use session-specific user data directory } # Add additional browser arguments based on config browser_args = [] if config.disable_web_security: browser_args.append("--disable-web-security") if config.no_sandbox: browser_args.append("--no-sandbox") if config.user_agent: browser_args.append(f"--user-agent={config.user_agent}") profile_kwargs["browser_args"] = browser_args profile_kwargs["chromium_sandbox"] = False profile = BrowserProfile(**profile_kwargs) # Initialize browser session, pass playwright playwright = self._playwright browser_session = BrowserSession(browser_profile=profile, playwright=playwright) await browser_session.start() # Set viewport size current_page = await browser_session.get_current_page() await current_page.set_viewport_size({ "width": config.viewport_width, "height": config.viewport_height }) # Initialize file system file_system = FileSystem(str(Path(temp_dir) / "files")) # Create instance now = datetime.now() instance = SessionBrowserInstance( session_id=session_id, browser_session=browser_session, file_system=file_system, temp_dir=temp_dir, config=config, created_at=now, last_used=now, screenshot_history=[] ) with self._lock: self.instances[session_id] = instance logger.info(f"Created browser instance for session: {session_id}") return instance except Exception as e: # Clean up on failure if os.path.exists(temp_dir): import shutil shutil.rmtree(temp_dir) raise Exception(f"Failed to create browser instance for session {session_id}: {str(e)}") async def get_session_instance(self, session_id: str) -> SessionBrowserInstance: """Get session instance, creating if necessary""" with self._lock: if session_id in self.instances: instance = self.instances[session_id] instance.update_last_used() return instance # Create new instance if not exists return await self.get_or_create_session_instance(session_id) async def close_session(self, session_id: str): """Close and clean up a session instance""" with self._lock: if session_id not in self.instances: return instance = self.instances[session_id] del self.instances[session_id] await instance.cleanup() logger.info(f"Closed browser instance for session: {session_id}") async def close_all(self): """Close all session instances""" with self._lock: session_ids = list(self.instances.keys()) for session_id in session_ids: await self.close_session(session_id) async def get_session_info(self, session_id: str) -> SessionInstanceInfo: """Get information about a specific session""" with self._lock: if session_id not in self.instances: raise Exception(f"Session {session_id} not found") instance = self.instances[session_id] # Get current URL and tab count (synchronous info only) current_url = None active_tabs = 0 active_tab_id = None current_title = None try: if instance.browser_session: active_tabs = len(instance.browser_session.tabs) active_tab_id = instance.browser_session.tabs.index(instance.browser_session.current_page) current_title = await instance.browser_session.current_page.title() except: pass return SessionInstanceInfo( session_id=instance.session_id, created_at=instance.created_at, last_used=instance.last_used, config=instance.config, temp_dir=instance.temp_dir, current_url=current_url, active_tabs=active_tabs, screenshot_count=len(instance.screenshot_history), active_tab_id=active_tab_id, current_title=current_title, ) async def get_all_sessions_info(self) -> List[SessionInstanceInfo]: """Get information about all sessions""" with self._lock: return [await self.get_session_info(session_id) for session_id in self.instances.keys()] def get_session_count(self) -> int: """Get current number of active sessions""" with self._lock: return len(self.instances) async def shutdown(self): """Shutdown the manager and clean up all instances""" self._shutdown = True if self._cleanup_task: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass await self.close_all() await self.stop_playwright() # Close playwright logger.info("Session browser manager shut down") # === Global Manager === session_browser_manager = SessionBrowserManager() # === Context Management === @dataclass class SessionBrowserContext: """Context for the session-based browser MCP server""" manager: SessionBrowserManager temp_dir: str = "" def __post_init__(self): if not self.temp_dir: self.temp_dir = tempfile.mkdtemp(prefix="browser_mcp_global_") # === Lifespan Management === @asynccontextmanager async def session_browser_lifespan(server: FastMCP) -> AsyncIterator[SessionBrowserContext]: """Manage session-based browser lifecycle""" # print a banner context = SessionBrowserContext(manager=session_browser_manager) logger.info("Starting session-based browser MCP server lifespan") try: # Start background cleanup task await context.manager.start_cleanup_task() yield context finally: # Cleanup on shutdown logger.info("Shutting down session-based browser MCP server") await context.manager.shutdown() # Clean up global temp directory if context.temp_dir and os.path.exists(context.temp_dir): import shutil try: shutil.rmtree(context.temp_dir) logger.info(f"Cleaned up global temp directory: {context.temp_dir}") except Exception as e: logger.error(f"Error cleaning global temp directory: {e}") # Create the FastMCP server with session-based browser lifecycle management mcp = FastMCP("Session-Browser-Use", lifespan=session_browser_lifespan) # === Helper Functions === def get_session_manager() -> SessionBrowserManager: """Get the session browser manager from current context""" ctx = get_context() browser_context: SessionBrowserContext = cast(SessionBrowserContext, ctx.request_context.lifespan_context) return browser_context.manager def generate_session_id() -> str: """Generate a unique session ID (UUID)""" return str(uuid.uuid4()) async def get_session_instance(session_id: str) -> SessionBrowserInstance: """Get or create browser instance for given session_id""" manager = get_session_manager() return await manager.get_session_instance(session_id) # === Session-Based Browser Management Tools === @mcp.tool() async def create_chrome_instance(ctx: Context, headless: bool = True, viewport_width: int = 1024, viewport_height: int = 768) -> SessionInstanceInfo: """Create a new Chrome browser instance and return session_id (UUID)""" manager = get_session_manager() session_id = generate_session_id() logger.info(f"Creating Chrome instance with session_id: {session_id}") await ctx.report_progress(0.2, 1.0, "Creating browser profile") try: await ctx.report_progress(0.5, 1.0, "Starting browser") await manager.get_or_create_session_instance(session_id, BrowserConfig(headless=headless, viewport_width=viewport_width, viewport_height=viewport_height)) await ctx.report_progress(1.0, 1.0, "Browser ready") logger.info(f"Chrome instance created successfully with session_id: {session_id}") return await manager.get_session_info(session_id) except Exception as e: logger.error(f"Failed to create Chrome instance: {str(e)}") raise Exception(f"Failed to create Chrome instance: {str(e)}") @mcp.tool() async def close_instance(ctx: Context, session_id: str) -> str: """Close a specific Chrome browser instance by session_id, will delete the instance and all related data""" manager = get_session_manager() if not session_id: logger.error("session_id is required") raise Exception("session_id is required") try: await manager.close_session(session_id) logger.info(f"Chrome instance {session_id} closed successfully.") return f"Chrome instance {session_id} closed successfully." except Exception as e: logger.error(f"Failed to close instance {session_id}: {str(e)}") raise Exception(f"Failed to close instance {session_id}: {str(e)}") @mcp.tool() async def get_instance_info(ctx: Context, session_id: str) -> SessionInstanceInfo: """Get detailed information about a specific browser instance""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.debug(f"Getting instance info for {session_id}") try: manager = get_session_manager() instance = await manager.get_session_instance(session_id) instance.update_last_used() # Get current page info current_url = None current_title = None active_tab_id = None try: current_page = await instance.browser_session.get_current_page() current_url = current_page.url current_title = await current_page.title() active_tab_id = instance.browser_session.tabs.index(current_page) except Exception as e: logger.warning(f"Could not get current page info: {e}") # Count active tabs active_tabs = len(instance.browser_session.tabs) # Count screenshots screenshot_count = len(instance.screenshot_history) info = SessionInstanceInfo( session_id=instance.session_id, created_at=instance.created_at, last_used=instance.last_used, config=instance.config, temp_dir=instance.temp_dir, current_url=current_url, active_tabs=active_tabs, screenshot_count=screenshot_count, active_tab_id=active_tab_id, current_title=current_title ) logger.info(f"Instance info retrieved for {session_id}") return info except Exception as e: logger.error(f"Failed to get instance info: {str(e)}") raise Exception(f"Failed to get instance info: {str(e)}") @mcp.tool() async def check_browser_health(ctx: Context, session_id: str) -> str: """Check the health status of a browser session and provide recovery suggestions""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Checking browser health for instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session health_status = { "session_exists": True, "browser_accessible": False, "current_page_accessible": False, "tabs_count": 0, "current_url": None, "issues": [], "suggestions": [] } # Check if browser session is accessible try: current_page = await browser_session.get_current_page() health_status["browser_accessible"] = True health_status["tabs_count"] = len(browser_session.tabs) if current_page: health_status["current_url"] = current_page.url # Test if page is accessible try: await asyncio.wait_for(current_page.evaluate('1'), timeout=3.0) health_status["current_page_accessible"] = True except asyncio.TimeoutError: health_status["issues"].append("Current page is not responding") health_status["suggestions"].append("Try refresh_page() to reload the current page") except Exception as e: health_status["issues"].append(f"Page evaluation failed: {str(e)}") health_status["suggestions"].append("Try refresh_page() or navigate_to() a new URL") else: health_status["issues"].append("No current page available") health_status["suggestions"].append("Use navigate_to() to go to a new page") except Exception as e: health_status["issues"].append(f"Browser session not accessible: {str(e)}") health_status["suggestions"].append("Browser may have crashed. Try creating a new instance with create_chrome_instance()") # Generate health report if health_status["current_page_accessible"]: return f"✅ Browser session is healthy\n" \ f"📊 Tabs: {health_status['tabs_count']}\n" \ f"🌐 Current URL: {health_status['current_url']}\n" \ f"✅ Page is responsive" else: issues_text = "\n".join([f"❌ {issue}" for issue in health_status["issues"]]) suggestions_text = "\n".join([f"💡 {suggestion}" for suggestion in health_status["suggestions"]]) return f"⚠️ Browser session has issues:\n" \ f"📊 Tabs: {health_status['tabs_count']}\n" \ f"🌐 Current URL: {health_status['current_url']}\n" \ f"{issues_text}\n" \ f"🔧 Suggestions:\n{suggestions_text}" except Exception as e: logger.error(f"Health check failed: {str(e)}") return f"❌ Health check failed: {str(e)}\n" \ f"💡 Try creating a new browser instance with create_chrome_instance()" @mcp.tool() async def get_browser_status(ctx: Context) -> List[SessionInstanceInfo]: """Get current browser status, including all instances and their status,only admin can use this tool""" manager = get_session_manager() try: status = await manager.get_all_sessions_info() logger.debug(f"Browser status: running={len(status)}, sessions={manager.get_session_count()}") return status except Exception as e: logger.error(f"Failed to get browser status: {str(e)}") raise Exception(f"Failed to get browser status: {str(e)}") @mcp.tool() async def close_all_instances(ctx: Context) -> str: """Close all browser instances and delete all related data, only admin can use this tool""" manager = get_session_manager() try: await manager.close_all() logger.info("All browser instances closed.") return "All browser instances closed." except Exception as e: logger.error(f"Failed to close all instances: {str(e)}") raise Exception(f"Failed to close all instances: {str(e)}") # === Browser Configuration Tools === @mcp.tool() async def set_browser_config( ctx: Context, session_id: str, headless: Optional[bool] = None, no_sandbox: Optional[bool] = None, user_agent: Optional[str] = None, viewport_width: Optional[int] = None, viewport_height: Optional[int] = None, disable_web_security: Optional[bool] = None, ) -> SessionInstanceInfo: """ Set advanced browser configuration for a specific instance. If you want to change the viewport, you can use the set_browser_config tool to change the viewport_width and viewport_height. """ manager = get_session_manager() if not session_id: logger.error("session_id is required") raise Exception("session_id is required") try: # Get current instance instance = await manager.get_session_instance(session_id) config = instance.config need_restart = False # Check if the parameters that need to be restarted have changed if headless is not None and headless != config.headless: config.headless = headless need_restart = True if no_sandbox is not None and no_sandbox != config.no_sandbox: config.no_sandbox = no_sandbox need_restart = True if user_agent is not None and user_agent != config.user_agent: config.user_agent = user_agent need_restart = True if disable_web_security is not None and disable_web_security != config.disable_web_security: config.disable_web_security = disable_web_security need_restart = True # Dynamic modification of viewport related parameters viewport_changed = False if viewport_width is not None and viewport_width != config.viewport_width: config.viewport_width = viewport_width viewport_changed = True if viewport_height is not None and viewport_height != config.viewport_height: config.viewport_height = viewport_height viewport_changed = True # First handle the restart logic if need_restart: await manager.close_session(session_id) await manager.get_or_create_session_instance(session_id, config) logger.info(f"Browser instance {session_id} restarted due to config change.") else: # Dynamic modification of viewport if viewport_changed: browser_session = instance.browser_session page = await browser_session.get_current_page() await page.set_viewport_size({ "width": config.viewport_width, "height": config.viewport_height }) logger.info(f"Viewport updated for instance {session_id}: {config.viewport_width}x{config.viewport_height}") logger.info(f"Browser configuration for instance {session_id} updated successfully.") return await manager.get_session_info(session_id) except Exception as e: logger.error(f"Failed to set browser config for instance {session_id}: {str(e)}") raise Exception(f"Failed to set browser config: {str(e)}") @mcp.tool() async def get_browser_config(ctx: Context, session_id: str) -> SessionInstanceInfo: """Get advanced browser configuration for a specific instance""" manager = get_session_manager() if not session_id: logger.error("session_id is required") raise Exception("session_id is required") try: info = await manager.get_session_info(session_id) if info: logger.info(f"Browser configuration for instance {session_id}:") logger.debug(f"- Headless: {info.config.headless}") logger.debug(f"- No Sandbox: {info.config.no_sandbox}") logger.debug(f"- User Agent: {info.config.user_agent}") logger.debug(f"- Viewport Width: {info.config.viewport_width}") logger.debug(f"- Viewport Height: {info.config.viewport_height}") logger.debug(f"- Disable Web Security: {info.config.disable_web_security}") return info else: logger.error(f"Instance {session_id} not found") raise Exception(f"Instance {session_id} not found") except Exception as e: logger.error(f"Failed to get browser config: {str(e)}") raise Exception(f"Failed to get browser config: {str(e)}") # === Navigation Tools === @mcp.tool() async def navigate_to(ctx: Context, session_id: str, url: str, new_tab: bool = False) -> str: """ Navigate to URL in current tab or new tab (does not auto-switch to new tab). """ if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Navigating to: {url}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session if new_tab: page = await browser_session.create_new_tab(url) if page is not None: tab_idx = browser_session.tabs.index(page) result = f"Opened new tab #{tab_idx} with URL: {url}" else: result = f"Opened new tab but failed to get page object for URL: {url}" else: await browser_session.navigate(url) page = await browser_session.get_current_page() result = f"Navigated to: {url}" # Notify that page content has changed await ctx.session.send_resource_list_changed() logger.info(result) return result except Exception as e: logger.error(f"Navigation failed: {str(e)}") raise Exception(f"Navigation failed: {str(e)}") @mcp.tool() async def navigate_back(ctx: Context, session_id: str) -> str: """ Go back in current tab's history. """ if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Navigating back in instance {session_id}") try: instance = await get_session_instance(session_id) await instance.browser_session.go_back() result = "Navigated back" logger.info(result) await ctx.session.send_resource_list_changed() return result except Exception as e: logger.error(f"Navigate back failed: {str(e)}") raise Exception(f"Navigate back failed: {str(e)}") @mcp.tool() async def navigate_forward(ctx: Context, session_id: str) -> str: """Navigate forward in browser history""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Navigating forward in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() # Check if we can go forward try: await page.go_forward() await page.wait_for_load_state(state='domcontentloaded', timeout=10_000) current_url = page.url logger.info(f"Successfully navigated forward to: {current_url}") await ctx.session.send_resource_list_changed() return f"Navigated forward to: {current_url}" except Exception as e: logger.error(f"Navigate forward failed: {str(e)}") raise Exception(f"Navigate forward failed: {str(e)}") except Exception as e: logger.error(f"Navigate forward failed: {str(e)}") raise Exception(f"Navigate forward failed: {str(e)}") @mcp.tool() async def refresh_page(ctx: Context, session_id: str) -> str: """Refresh the current page""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Refreshing page in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() # Store current URL for logging current_url = page.url # Refresh the page try: await page.reload() await page.wait_for_load_state(state='domcontentloaded', timeout=15_000) logger.info(f"Successfully refreshed page: {current_url}") await ctx.session.send_resource_list_changed() return f"Successfully refreshed page: {current_url}" except Exception as e: logger.error(f"Page refresh failed: {str(e)}") raise Exception(f"Page refresh failed: {str(e)}") except Exception as e: logger.error(f"Refresh page failed: {str(e)}") raise Exception(f"Refresh page failed: {str(e)}") @mcp.tool() async def get_page_state(ctx: Context, session_id: str, frames: bool = False) -> PageState: """Get current page state and interactive elements; frames mode is unstable and may not extract all details.""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.debug(f"Getting page state for instance {session_id} (frames={frames})...") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session # First check if the browser session is still valid try: current_page = await browser_session.get_current_page() if not current_page: raise Exception("No current page available") # Test if page is accessible with a simple evaluation try: await asyncio.wait_for(current_page.evaluate('1'), timeout=3.0) except asyncio.TimeoutError: logger.warning("Page evaluation timeout, page may be unresponsive") # Return a minimal state instead of failing completely return PageState( url=current_page.url if hasattr(current_page, 'url') else "unknown", title="Page not accessible", interactive_elements_count=0, elements=[] ) except Exception as e: logger.warning(f"Page evaluation failed: {e}") # Return a minimal state instead of failing completely return PageState( url=current_page.url if hasattr(current_page, 'url') else "unknown", title="Page not accessible", interactive_elements_count=0, elements=[] ) except Exception as e: logger.error(f"Failed to get current page: {e}") raise Exception(f"Browser session is not accessible: {str(e)}") # Try to get state summary with retry logic max_retries = 3 for attempt in range(max_retries): try: state_summary = await browser_session.get_state_summary(cache_clickable_elements_hashes=False) break except Exception as e: if attempt == max_retries - 1: logger.error(f"Failed to get state summary after {max_retries} attempts: {e}") # Return a minimal state instead of failing return PageState( url=current_page.url if hasattr(current_page, 'url') else "unknown", title="Page state unavailable", interactive_elements_count=0, elements=[] ) else: logger.warning(f"State summary attempt {attempt + 1} failed, retrying: {e}") await asyncio.sleep(1) # Extract interactive elements info elements = [] try: for idx, element in state_summary.selector_map.items(): elements.append(ElementInfo( index=idx, tag=element.tag_name, text=element.get_all_text_till_next_clickable_element(), xpath=element.xpath, attributes=jsonable_encoder(element.attributes), value=None )) except Exception as e: logger.warning(f"Failed to extract elements: {e}") # Continue with empty elements list warning = None if frames: try: for frame in current_page.frames: if frame.url == current_page.url or frame.url.startswith('data:') or frame.url.startswith('about:'): continue try: dom_service = DomService(frame, logger=logger) frame_state = await dom_service.get_clickable_elements() for _, element in frame_state.selector_map.items(): elements.append(ElementInfo( index=element.index, tag=element.tag_name, text=element.get_all_text_till_next_clickable_element(), xpath=element.xpath, attributes=jsonable_encoder(element.attributes), value=None )) except Exception as e: logger.warning(f"Failed to extract elements from frame {frame.url}: {e}") continue warning = "[frames mode]:This mode is unstable, some iframe/sub-page content may not be fully extracted, and the element index may be inaccurate." except Exception as e: logger.warning(f"Failed to process frames: {e}") state = PageState( url=current_page.url, title=state_summary.title + (f"\n{warning}" if warning else ""), interactive_elements_count=len(elements), elements=elements ) logger.info(f"Found {state.interactive_elements_count} interactive elements (frames={frames}) on page: {state.title}") return state except Exception as e: logger.error(f"Failed to get page state: {str(e)}") # Provide more specific error messages if "Page is not accessible" in str(e): raise Exception("The current page is not accessible. The page may have crashed, been closed, or is taking too long to respond. Try navigating to a new page or refreshing the current page.") elif "Browser session is not accessible" in str(e): raise Exception("The browser session is not accessible. The browser may have crashed or been closed. Try creating a new browser instance.") else: raise Exception(f"Failed to get page state: {str(e)}") # === Tab Management Tools === @mcp.tool() async def get_tabs_info(ctx: Context, session_id: str) -> List[TabInfo]: """Get information about all open browser tabs""" manager = get_session_manager() if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.debug(f"Getting tabs information for instance {session_id}...") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session tabs_info = [] current_page = await browser_session.get_current_page() for i, tab in enumerate(browser_session.tabs): title = await tab.title() tabs_info.append(TabInfo( id=i, title=title, url=tab.url, is_active=(tab == current_page) )) logger.info(f"Found {len(tabs_info)} open tabs") return tabs_info except Exception as e: logger.error(f"Failed to get tabs info: {str(e)}") raise Exception(f"Failed to get tabs info: {str(e)}") @mcp.tool() async def close_tab(ctx: Context, session_id: str, page_id: int) -> str: """ Close a specific tab, if the tab is the last one, it will goto about:blank """ manager = get_session_manager() if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Closing tab {page_id} in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session await browser_session.switch_to_tab(page_id) page = await browser_session.get_current_page() url = page.url await page.close() new_page = await browser_session.get_current_page() new_page_idx = browser_session.tabs.index(new_page) result = f"Closed tab #{page_id} with {url}, now focused on tab #{new_page_idx}" logger.info(result) await ctx.session.send_resource_list_changed() return result except Exception as e: logger.error(f"Close tab failed: {str(e)}") raise Exception(f"Close tab failed: {str(e)}") @mcp.tool() async def switch_tab(ctx: Context, session_id: str, page_id: int) -> TabInfo: """Switch to a specific browser tab and return the tab info object""" manager = get_session_manager() if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Switching to tab {page_id} in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session await browser_session.switch_to_tab(page_id) page = await browser_session.get_current_page() # Wait for page to load try: await page.wait_for_load_state(state='domcontentloaded', timeout=5_000) except: logger.warning("Page load timeout, continuing anyway") # Gather tab info for the switched-to tab title = await page.title() tab_info = TabInfo( id=page_id, title=title, url=page.url, is_active=True ) logger.info(f"Switched to tab: {tab_info}") # Notify that page content has changed await ctx.session.send_resource_list_changed() return tab_info except Exception as e: logger.error(f"Switch tab failed: {str(e)}") raise Exception(f"Switch tab failed: {str(e)}") # === Element Interaction Tools === @mcp.tool() async def click_element_by_xpath(ctx: Context, session_id: str, xpath: str) -> str: """Click an interactive element by XPath with confirmation""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") if not xpath: logger.error("xpath is required") raise Exception("xpath is required") logger.info(f"Attempting to click element by xpath '{xpath}' in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() for frame in page.frames: try: element_handle = await frame.evaluate_handle( "(xpath) => document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue", xpath ) if element_handle: await element_handle.click() break except Exception: continue result = f"Clicked element with xpath: {xpath}" logger.info(result) await ctx.session.send_resource_list_changed() return result except Exception as e: logger.error(f"Click by xpath failed: {str(e)}") raise Exception(f"Click by xpath failed: {str(e)}") @mcp.tool() async def click_element(ctx: Context, session_id: str, index: int) -> str: """Click an interactive element by index with confirmation""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Attempting to click element {index} in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session # Get element and click element_node = await browser_session.get_dom_element_by_index(index) if element_node is None: error_msg = f"Element with index {index} not accessible" logger.error(error_msg) raise Exception(error_msg) # Perform click logger.debug(f"Clicking element {index}") download_path = await browser_session._click_element_node(element_node) if download_path: result = f"Clicked element {index} and downloaded file to: {download_path}" else: element_text = element_node.get_all_text_till_next_clickable_element(max_depth=2) result = f"Clicked element {index}: {element_text}" logger.info(result) # Notify that page might have changed await ctx.session.send_resource_list_changed() return result except Exception as e: logger.error(f"Click failed: {str(e)}") raise Exception(f"Click failed: {str(e)}") @mcp.tool() async def input_text(ctx: Context, session_id: str, index: int, text: str) -> str: """Input text into a form field""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Inputting text into element {index} in instance {session_id}") logger.debug(f"Text content: {text[:50]}{'...' if len(text) > 50 else ''}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session # Get element element_node = await browser_session.get_dom_element_by_index(index) if element_node is None: error_msg = f"Element with index {index} not found" logger.error(error_msg) raise Exception(error_msg) # Input text await browser_session._input_text_element_node(element_node, text) result = f"Input text into element {index}: {text}" logger.info("Text input successful") return result except Exception as e: logger.error(f"Input text failed: {str(e)}") raise Exception(f"Input text failed: {str(e)}") @mcp.tool() async def set_element_value(ctx: Context, session_id: str, index: int, value: str) -> str: """Set value of an input or select element directly, supports frames""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Setting value for element {index} in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() selector_map = await browser_session.get_selector_map() if index not in selector_map: raise Exception(f"Element with index {index} not found") dom_element = selector_map[index] # Try to set value in all frames for frame in page.frames: try: success = await frame.evaluate( """ ({xpath, value}) => { const element = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; if (!element) { console.error("Element not found for XPath:", xpath); return false; } console.log("Found element:", element.tagName); if (element.tagName === 'SELECT') { const options = element.querySelectorAll('option'); let found = false; for (const option of options) { console.log("Checking option:", option.value); if (option.value === value) { option.selected = true; found = true; break; } } if (!found) { console.warn("Option not found:", value); return false; } element.dispatchEvent(new Event('change', { bubbles: true })); } else { element.value = value; element.dispatchEvent(new Event('input', { bubbles: true })); element.dispatchEvent(new Event('change', { bubbles: true })); } return true; } """, {"xpath": dom_element.xpath, "value": value} ) if success: logger.info(f"Successfully set value for element {index}") return f"Set value for element {index}: {value}" except Exception as e: logger.warning(f"Set value attempt failed in one frame: {e}") continue raise Exception(f"Set value failed: element not found in any frame for index {index}") except Exception as e: logger.error(f"Set value failed: {str(e)}") raise Exception(f"Set value failed: {str(e)}") @mcp.tool() async def get_element_info( ctx: Context, session_id: str, index: Optional[int] = None, xpath: Optional[str] = None ) -> ElementInfo: """ Get detailed information about a DOM element (supports index or xpath query), can get the value of the element """ logger.info(f"Getting element info for session {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() if index is not None: element_node = await browser_session.get_dom_element_by_index(index) if element_node is None: error_msg = f"Element with index {index} not found" logger.error(error_msg) raise Exception(error_msg) value = None for frame in page.frames: try: value = await frame.evaluate( """(xpath) => { const el = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; return el && ('value' in el) ? el.value : null; }""", element_node.xpath ) if value is not None: break except Exception as e: continue return ElementInfo( index=index, tag=element_node.tag_name, text=element_node.get_all_text_till_next_clickable_element(), xpath=element_node.xpath, attributes=jsonable_encoder(element_node.attributes), value=value ) elif xpath is not None: element_handle = await page.evaluate_handle( "(xpath) => document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue", xpath ) if not element_handle: error_msg = f"Element with xpath '{xpath}' not found" logger.error(error_msg) raise Exception(error_msg) value = None for frame in page.frames: try: value = await frame.evaluate( """(xpath) => { const el = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; return el && ('value' in el) ? el.value : null; }""", xpath ) if value is not None: break except Exception as e: continue return ElementInfo( index=None, tag=await element_handle.get_property("tagName"), text=await element_handle.get_property("textContent"), xpath=xpath, attributes=jsonable_encoder(await element_handle.get_properties()), value=value ) else: raise Exception("Either index or xpath must be provided") except Exception as e: logger.error(f"Failed to get element info: {str(e)}") raise Exception(f"Failed to get element info: {str(e)}") @mcp.tool() async def send_keys(ctx: Context, session_id: str, keys: str) -> str: """Send keyboard keys to the browser""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Sending keys '{keys}' in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() await page.keyboard.press(keys) result = f"Sent keys: {keys}" logger.info(result) return result except Exception as e: logger.error(f"Send keys failed: {str(e)}") raise Exception(f"Send keys failed: {str(e)}") @mcp.tool() async def scroll_page(ctx: Context, session_id: str, direction: str = "down") -> str: """Scroll page up or down""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Scrolling {direction} in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() dy = await page.evaluate('() => window.innerHeight') if direction.lower() == "up": dy = -dy try: await browser_session._scroll_container(dy) except: await page.evaluate('(y) => window.scrollBy(0, y)', dy) result = f"Scrolled {direction} by one page" logger.info(result) return result except Exception as e: logger.error(f"Scroll failed: {str(e)}") raise Exception(f"Scroll failed: {str(e)}") @mcp.tool() async def upload_file(ctx: Context, session_id: str, index: int, file_path: str) -> str: """Upload file to file input element""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Uploading file to element {index} in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session if not os.path.exists(file_path): error_msg = f"File not found: {file_path}" logger.error(error_msg) raise Exception(error_msg) file_upload_dom_el = await browser_session.find_file_upload_element_by_index( index, max_height=3, max_descendant_depth=3 ) if file_upload_dom_el is None: error_msg = f"No file upload element found at index {index}" logger.error(error_msg) raise Exception(error_msg) file_upload_el = await browser_session.get_locate_element(file_upload_dom_el) if file_upload_el is None: error_msg = f"No file upload element found at index {index}" logger.error(error_msg) raise Exception(error_msg) await file_upload_el.set_input_files(file_path) result = f"Successfully uploaded file to index {index}: {file_path}" logger.info(result) return result except Exception as e: logger.error(f"File upload failed: {str(e)}") raise Exception(f"File upload failed: {str(e)}") # === PDF Generation Tools === @mcp.tool() async def generate_pdf( ctx: Context, session_id: str, url: Optional[str] = None, html_content: Optional[str] = None, output_filename: Optional[str] = None, print_background: bool = True, margin_top: float = 0.4, margin_bottom: float = 0.4, margin_left: float = 0.4, margin_right: float = 0.4, paper_width: float = 8.27, paper_height: float = 11.7 ) -> PDFResult: """Generate PDF from current page, URL, or HTML content""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") if url is None and html_content is None: logger.info("Generating PDF from current page") elif url: logger.info(f"Generating PDF from URL: {url}") else: logger.info("Generating PDF from HTML content") await ctx.report_progress(0.2, 1.0, "Preparing PDF generation") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() # Navigate to URL if provided if url: await ctx.report_progress(0.3, 1.0, "Navigating to URL") await browser_session.navigate(url) await page.wait_for_load_state(state='domcontentloaded') # Set HTML content if provided if html_content: await ctx.report_progress(0.3, 1.0, "Setting HTML content") await page.set_content(html_content) await page.wait_for_load_state(state='domcontentloaded') await ctx.report_progress(0.5, 1.0, "Generating PDF") # Generate filename if not provided if not output_filename: if url: # Generate filename from URL parsed_url = urlparse(url) filename_base = parsed_url.netloc.replace('.', '_') if len(filename_base) > 50: filename_base = filename_base[:50] else: filename_base = f"page_{uuid.uuid4().hex}" output_filename = f"{filename_base}.pdf" if not output_filename.endswith('.pdf'): output_filename += '.pdf' # Set up PDF options pdf_options = { "path": str(Path(instance.temp_dir) / output_filename), "print_background": print_background, "margin": { "top": f"{margin_top}in", "bottom": f"{margin_bottom}in", "left": f"{margin_left}in", "right": f"{margin_right}in" } } # check if custom paper size is used if paper_width != 8.27 or paper_height != 11.7: pdf_options["width"] = f"{paper_width}in" pdf_options["height"] = f"{paper_height}in" else: pdf_options["format"] = "A4" await ctx.report_progress(0.8, 1.0, "Saving PDF file") # Generate PDF pdf_bytes = await page.pdf(**pdf_options) file_path = Path(instance.temp_dir) / output_filename file_size_kb = len(pdf_bytes) / 1024 file_path.write_bytes(pdf_bytes) await ctx.report_progress(1.0, 1.0, "PDF generation complete") result = PDFResult( success=True, file_path=str(file_path), file_name=output_filename, size_kb=round(file_size_kb, 1), source_url=url or page.url, page_count=None # Could be extracted from PDF metadata ) logger.info(f"PDF generated successfully: {output_filename} ({file_size_kb:.1f} KB)") return result except Exception as e: logger.error(f"PDF generation failed: {str(e)}") raise Exception(f"PDF generation failed: {str(e)}") @mcp.tool() async def take_screenshot( ctx: Context, session_id: str, target: Optional[str] = None, width: Optional[int] = None, height: Optional[int] = None, full_page: bool = True, quality: int = 90, format: str = "png" ) -> Image: """ Take screenshot with structured result, it will take screenshot of the target element if target is not None, otherwise it will take screenshot of the full page, otherwise it will take screenshot of the current viewport Screenshot page or element (by target selector, supports CSS/XPath, e.g. 'button', '//button', 'css=button', 'xpath=//button'). See: https://playwright.dev/python/docs/locators#locate-by-css-or-xpath """ if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Taking screenshot for instance {session_id}...") await ctx.report_progress(0.2, 1.0, "Preparing screenshot") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() # Handle target selection if target: try: logger.debug(f"Waiting for target element: {target}") await page.wait_for_selector(target, timeout=5000) except Exception: error_msg = f"Target element '{target}' not found or not visible" logger.error(error_msg) raise Exception(error_msg) await ctx.report_progress(0.4, 1.0, "Configuring viewport") # Set custom viewport size if specified if width and height: await page.set_viewport_size({"width": width, "height": height}) logger.debug(f"Set viewport size: {width}x{height}") elif full_page: # Auto-adjust to full page dimensions try: page_height = await page.evaluate("() => document.documentElement.scrollHeight") page_width = await page.evaluate("() => document.documentElement.scrollWidth") await page.set_viewport_size({"width": page_width, "height": page_height}) logger.debug(f"Auto-adjusted viewport: {page_width}x{page_height}") except Exception: logger.warning("Could not auto-adjust viewport, using default") await ctx.report_progress(0.6, 1.0, "Capturing screenshot") # Configure screenshot options screenshot_options = { "full_page": full_page if not target else False, "type": format.lower(), } # Add quality for JPEG if format.lower() == "jpeg": screenshot_options["quality"] = quality # Add clip for specific target if target: try: element = page.locator(target).first bounding_box = await element.bounding_box() if bounding_box: screenshot_options["clip"] = bounding_box except Exception: error_msg = f"Could not get bounding box for target '{target}'" logger.error(error_msg) raise Exception(error_msg) # Take screenshot screenshot_bytes = await page.screenshot(**screenshot_options) await ctx.report_progress(0.8, 1.0, "Saving screenshot") # Save to file filename = f"screenshot_{uuid.uuid4().hex}.{format}" file_path = Path(instance.temp_dir) / filename # Write screenshot to file file_path.write_bytes(screenshot_bytes) # Get file size info file_size = len(screenshot_bytes) file_size_kb = file_size / 1024 # Add to screenshot history instance.screenshot_history.append(str(file_path)) logger.info(f"Screenshot saved: {filename} ({file_size_kb:.1f} KB)") await ctx.report_progress(1.0, 1.0, "Screenshot complete") return Image( data=screenshot_bytes, format=format.lower() ) except Exception as e: logger.error(f"Screenshot failed: {str(e)}") raise Exception(f"Screenshot failed: {str(e)}") # === File Download Tools === @mcp.tool() async def download_file( ctx: Context, session_id: str, url: str, output_filename: Optional[str] = None, timeout: int = 30 ) -> DownloadResult: """ Download any file from URL to the temp directory. Use fetch+blob+a.download, if failed, fallback to goto+expect_download, maximum compatibility. """ if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Downloading file via browser: {url}") await ctx.report_progress(0.1, 1.0, "Starting browser-context file download") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() # Generate filename if not output_filename: parsed_url = urlparse(url) output_filename = os.path.basename(parsed_url.path) if not output_filename or '.' not in output_filename: output_filename = f"file_{uuid.uuid4().hex}" download_path = Path(instance.temp_dir) / output_filename await ctx.report_progress(0.3, 1.0, "Triggering browser download (fetch mode)") start_time = time.time() try: await page.evaluate( '''(args) => { fetch(args.url, {credentials: 'include'}) .then(resp => resp.blob()) .then(blob => { const a = document.createElement('a'); a.href = URL.createObjectURL(blob); a.download = args.filename; document.body.appendChild(a); a.click(); setTimeout(() => { URL.revokeObjectURL(a.href); a.remove(); }, 1000); }); }''', {"url": url, "filename": output_filename} ) download_event = await page.wait_for_event("download", timeout=timeout * 1000) await download_event.save_as(download_path) except Exception as fetch_exc: logger.warning(f"fetch+blob+a.download failed, fallback to goto+expect_download: {fetch_exc}") await ctx.report_progress(0.5, 1.0, "Fallback to goto+expect_download") async with page.expect_download(timeout=timeout * 1000) as download_info: await page.goto(url) download = await download_info.value await download.save_as(download_path) end_time = time.time() await ctx.report_progress(1.0, 1.0, "Download complete") file_size = os.path.getsize(download_path) mime_type = mimetypes.guess_type(str(download_path))[0] or "application/octet-stream" return DownloadResult( success=True, file_path=str(download_path), file_name=output_filename, size_bytes=file_size, mime_type=mime_type, download_time=round(end_time - start_time, 2) ) except Exception as e: logger.error(f"File download failed: {str(e)}") raise Exception(f"File download failed: {str(e)}") @mcp.tool() async def download_image( ctx: Context, session_id: str, image_url: str, output_filename: Optional[str] = None, timeout: int = 30 ) -> Image: """Download a image from URL, it will download the image to the temp directory, and return the file path, it will open a new tab if the image is not from the same origin as the current page""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Downloading image via browser: {image_url}") await ctx.report_progress(0.1, 1.0, "Starting browser-context image download") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() # Check if the same origin current_url = page.url if hasattr(page, 'url') else await page.evaluate('() => window.location.href') current_origin = urlparse(current_url).scheme + '://' + urlparse(current_url).netloc image_origin = urlparse(image_url).scheme + '://' + urlparse(image_url).netloc if current_origin != image_origin: logger.info(f"Current page origin {current_origin} != image origin {image_origin}, opening new tab.") page = await browser_session.create_new_tab(image_url) # Generate filename parsed_url = urlparse(image_url) if not output_filename: output_filename = os.path.basename(parsed_url.path) if not output_filename or '.' not in output_filename: ext = '.jpg' if 'png' in image_url.lower(): ext = '.png' elif 'gif' in image_url.lower(): ext = '.gif' elif 'webp' in image_url.lower(): ext = '.webp' output_filename = f"image_{uuid.uuid4().hex}{ext}" await ctx.report_progress(0.3, 1.0, "Triggering browser download") # Execute fetch+blob+a.download in the page context, pack the parameters as an object await page.evaluate( """ (args) => { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), args.timeout * 1000); fetch(args.url, {credentials: 'include', signal: controller.signal}) .then(resp => resp.blob()) .then(blob => { clearTimeout(timeoutId); const a = document.createElement('a'); a.href = URL.createObjectURL(blob); a.download = args.filename; document.body.appendChild(a); a.click(); setTimeout(() => { URL.revokeObjectURL(a.href); a.remove(); }, 1000); }) .catch(e => { clearTimeout(timeoutId); console.error('Fetch failed or aborted:', e); }); } """, {"url": image_url, "filename": output_filename, "timeout": timeout} ) # Wait for download to complete, listen for download event, pass timeout download_event = await page.wait_for_event("download", timeout=timeout * 1000) download_path = Path(instance.temp_dir) / output_filename await download_event.save_as(download_path) mime_type = mimetypes.guess_type(str(download_path))[0] or "application/octet-stream" # The actual path of the file is unknown, cannot return the local path directly, can only return the filename await ctx.report_progress(1.0, 1.0, "Browser download triggered") logger.info(f"Image download triggered in browser: {output_filename}") return Image( data=download_path.read_bytes(), format=mime_type.split('/')[1] ) except Exception as e: logger.error(f"Browser-context image download failed: {str(e)}") raise Exception(f"Browser-context image download failed: {str(e)}") # === Cookie Management Tools === @mcp.tool() async def set_cookie( ctx: Context, session_id: str, name: str, value: str, domain: str, path: str = "/", http_only: bool = False, secure: bool = False, same_site: Optional[Literal['Lax', 'None', 'Strict']] = None, expires: Optional[int] = None, max_age: Optional[int] = None ) -> str: """Set a cookie in the browser""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Setting cookie '{name}' for domain '{domain}'") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() context = page.context # Validate same_site value valid_same_site = ["Strict", "Lax", "None"] if same_site not in valid_same_site: same_site = "Lax" # Validate SameSite=None requires Secure=true if same_site == "None" and not secure: logger.warning("SameSite=None requires Secure=true, setting secure=True") secure = True # Clean domain domain = domain.lower().strip() if domain.startswith('http://') or domain.startswith('https://'): parsed = urlparse(domain) domain = parsed.netloc # Build cookie object cookie = { "name": name, "value": value, "domain": domain, "path": path, "httpOnly": http_only, "secure": secure, "sameSite": same_site } # Set expiration if max_age is not None: expires = int(time.time()) + max_age if expires is not None: cookie["expires"] = expires # Add cookie to context await context.add_cookies([{ "name": name, "value": value, "domain": domain, "path": path, "httpOnly": http_only, "secure": secure, "sameSite": same_site, "expires": expires if expires is not None else -1 # or omit if None }]) # Format result cookie_info = CookieInfo( name=name, value=value, domain=domain, path=path, http_only=http_only, secure=secure, same_site=same_site, expires=datetime.fromtimestamp(expires) if expires else None ) result = f"Cookie '{name}' set successfully for domain '{domain}'" if expires: result += f" (expires: {cookie_info.expires})" logger.info(result) return result except Exception as e: logger.error(f"Set cookie failed: {str(e)}") raise Exception(f"Set cookie failed: {str(e)}") @mcp.tool() async def get_cookies(ctx: Context, session_id: str, domain: Optional[str] = None) -> List[CookieInfo]: """Get cookies from the browser""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Getting cookies" + (f" for domain '{domain}'" if domain else "")) try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() context = page.context # Get all cookies cookies = await context.cookies() # Filter by domain if specified if domain: domain = domain.lower().strip() cookies = [c for c in cookies if domain in c.get('domain', '')] # Convert to CookieInfo objects cookie_infos = [] for cookie in cookies: cookie_info = CookieInfo( name=cookie.get('name', ''), value=cookie.get('value', ''), domain=cookie.get('domain', ''), path=cookie.get('path', '/'), http_only=cookie.get('httpOnly', False), secure=cookie.get('secure', False), same_site=cookie.get('sameSite', 'Lax'), expires=datetime.fromtimestamp(cookie['expires']) if ('expires' in cookie and cookie['expires']) else None ) cookie_infos.append(cookie_info) logger.info(f"Found {len(cookie_infos)} cookies") return cookie_infos except Exception as e: logger.error(f"Get cookies failed: {str(e)}") return [] # === Utility Tools === @mcp.tool() async def wait(ctx: Context, seconds: int = 3) -> str: """Wait for specified number of seconds with progress""" try: logger.info(f"Waiting for {seconds} seconds...") # Report progress during wait for i in range(seconds): progress = (i + 1) / seconds await ctx.report_progress(progress, 1.0, f"Waiting... ({i + 1}/{seconds})") await asyncio.sleep(1) result = f"Waited for {seconds} seconds" logger.info("Wait complete") return result except Exception as e: logger.error(f"Wait failed: {str(e)}") raise Exception(f"Wait failed: {str(e)}") @mcp.tool() async def search_bing(ctx: Context, session_id: str, query: str) -> str: """Search Bing for a query with progress tracking""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Searching Bing for: {query} in instance {session_id}") await ctx.report_progress(0.3, 1.0, "Navigating to Bing") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session # First navigate to Bing await browser_session.navigate("https://www.bing.com") await ctx.report_progress(0.6, 1.0, "Performing search") # Then navigate to search results search_url = f"https://www.bing.com/search?q={query}" await browser_session.navigate(search_url) await ctx.report_progress(1.0, 1.0, "Search complete") result = f"Searched Bing for: {query}" logger.info("Bing search completed") # Notify that page content has changed await ctx.session.send_resource_list_changed() return result except Exception as e: logger.error(f"Bing search failed: {str(e)}") raise Exception(f"Bing search failed: {str(e)}") @mcp.tool() async def extract_content(ctx: Context, session_id: str, query: str) -> str: """Extract content from page based on query, including all frames/iframes""" import re if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Extracting content matching '{query}' from instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() html_content = await page.content() markdown_content = markdownify.markdownify(html_content, strip=['script', 'style']) content = markdown_content # 遍历所有 frame/iframe,拼接内容 for frame in page.frames: try: await frame.wait_for_load_state(timeout=1000) except Exception: pass if frame.url != page.url and not frame.url.startswith('data:') and not frame.url.startswith('about:'): content += f'\n\nIFRAME {frame.url}:\n' try: iframe_html = await asyncio.wait_for(frame.content(), timeout=2.0) iframe_markdown = await asyncio.wait_for( asyncio.get_event_loop().run_in_executor(None, markdownify.markdownify, iframe_html), timeout=2.0, ) except Exception: iframe_markdown = '' content += iframe_markdown # 合并多余换行 content = re.sub(r'\n+', '\n', content) lines = content.split('\n') relevant_lines = [] for line in lines: if query.lower() in line.lower(): relevant_lines.append(line.strip()) if relevant_lines: result = f"Content matching '{query}':\n" + '\n'.join(relevant_lines[:10]) else: title = await page.title() first_lines = [line.strip() for line in lines if line.strip()][:5] result = f"Page Title: {title}\nNo specific matches for '{query}'. Page preview:\n" + '\n'.join(first_lines) logger.info(f"Content extraction completed (with frames)") return result except Exception as e: logger.error(f"Content extraction failed: {str(e)}") raise Exception(f"Content extraction failed: {str(e)}") @mcp.tool() async def get_dropdown_options(ctx: Context, session_id: str, index: int) -> DropdownOptionsResult: """Get options from a dropdown/select element, it will return the options of the element, and the id and name of the element""" if not session_id: logger.error("session_id is required") raise Exception("session_id is required") logger.info(f"Getting dropdown options for element {index} in instance {session_id}") try: instance = await get_session_instance(session_id) browser_session = instance.browser_session page = await browser_session.get_current_page() selector_map = await browser_session.get_selector_map() if index not in selector_map: error_msg = f"Element with index {index} not found" logger.error(error_msg) raise Exception(error_msg) dom_element = selector_map[index] for frame in page.frames: try: options = await frame.evaluate( """ (xpath) => { const select = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; if (!select) return null; return { options: Array.from(select.options).map(opt => ({ text: opt.text, value: opt.value, index: opt.index })), id: select.id, name: select.name }; } """, dom_element.xpath, ) if options: dropdown_options = [DropdownOption(**opt) for opt in options['options']] result = DropdownOptionsResult( options=dropdown_options, id=options.get('id'), name=options.get('name') ) logger.info(f"Dropdown options retrieved for element {index} in instance {session_id}") return result except Exception as e: logger.error(f"Get dropdown options failed: {str(e)}") continue raise Exception(f"No dropdown options found for element {index} in instance {session_id}") except Exception as e: logger.error(f"Get dropdown options failed: {str(e)}") raise Exception(f"Get dropdown options failed: {str(e)}") # === Resources for browser instance === @mcp.resource("browser://instance/{instance_id}/page") async def get_instance_page_resource(instance_id: str) -> str: """Get information about the current page in a specific instance""" try: manager = get_session_manager() instance = await manager.get_session_instance(instance_id) browser_session = instance.browser_session state_summary = await browser_session.get_state_summary(cache_clickable_elements_hashes=True) current_page = await browser_session.get_current_page() # Extract interactive elements info elements = [] for idx, element in state_summary.selector_map.items(): elements.append({ "index": idx, "tag": element.tag_name, "text": element.get_all_text_till_next_clickable_element()[:100], "xpath": element.xpath }) page_text = [] page_text.append(f"Current Page (Instance: {instance_id}):") page_text.append(f"Title: {state_summary.title}") page_text.append(f"URL: {current_page.url}") page_text.append(f"Interactive Elements: {len(state_summary.selector_map)}") page_text.append("") page_text.append("Top 10 Elements:") for elem in elements[:10]: page_text.append(f" {elem['index']}: {elem['tag']} - {elem['text'][:50]}...") return "\n".join(page_text) except Exception as e: logger.error(f"Error getting page info for instance {instance_id}: {str(e)}") raise Exception(f"Error getting page info for instance {instance_id}: {str(e)}") @mcp.resource("browser://instance/{instance_id}/tabs") async def get_instance_tabs_resource(instance_id: str) -> str: """Get information about all open tabs in a specific instance""" try: manager = get_session_manager() instance = await manager.get_session_instance(instance_id) browser_session = instance.browser_session tabs_info = [] current_page = await browser_session.get_current_page() for i, tab in enumerate(browser_session.tabs): title = await tab.title() tabs_info.append({ "id": i, "title": title, "url": tab.url, "is_active": (tab == current_page) }) if not tabs_info: return f"No tabs available in instance {instance_id}" tabs_text = [] tabs_text.append(f"Open Tabs in Instance {instance_id} ({len(tabs_info)}):") for tab in tabs_info: status = " (ACTIVE)" if tab["is_active"] else "" tabs_text.append(f" Tab {tab['id']}: {tab['title']}{status}") tabs_text.append(f" URL: {tab['url']}") return "\n".join(tabs_text) except Exception as e: logger.error(f"Error getting tabs info for instance {instance_id}: {str(e)}") raise Exception(f"Error getting tabs info for instance {instance_id}: {str(e)}") @mcp.resource("browser://instance/{instance_id}/screenshots") async def get_instance_screenshots_resource(instance_id: str) -> str: """Get list of all screenshots taken for a specific instance""" try: manager = get_session_manager() instance = await manager.get_session_instance(instance_id) if not instance.screenshot_history: return f"No screenshots taken for instance {instance_id}" screenshots_text = [] screenshots_text.append(f"Screenshots for Instance {instance_id} ({len(instance.screenshot_history)}):") for i, screenshot_path in enumerate(instance.screenshot_history): path = Path(screenshot_path) if path.exists(): size_kb = path.stat().st_size / 1024 screenshots_text.append(f" {i+1}. {path.name} ({size_kb:.1f} KB)") screenshots_text.append(f" Path: {screenshot_path}") else: screenshots_text.append(f" {i+1}. {path.name} (file not found)") return "\n".join(screenshots_text) except Exception as e: logger.error(f"Error getting screenshots info for instance {instance_id}: {str(e)}") raise Exception(f"Error getting screenshots info for instance {instance_id}: {str(e)}") @mcp.resource("browser://instance/{instance_id}/status") async def get_instance_status_resource(instance_id: str) -> dict: """Get detailed status of a specific browser instance (SessionInstanceInfo + current tab/page info)""" try: manager = get_session_manager() instance = await manager.get_session_instance(instance_id) info = await manager.get_session_info(instance_id) browser_session = instance.browser_session current_page = await browser_session.get_current_page() tabs = browser_session.tabs active_tab_id = None if current_page in tabs: active_tab_id = tabs.index(current_page) status = { "session_id":info.session_id, "created_at":info.created_at.isoformat(), "last_used":info.last_used.isoformat(), "config":info.config.model_dump() if hasattr(info.config, 'model_dump') else info.config.dict(), "temp_dir":info.temp_dir, "current_url":current_page.url, "current_title":await current_page.title(), "active_tabs":info.active_tabs, "active_tab_id":active_tab_id, "screenshot_count":info.screenshot_count, } return status except Exception as e: logger.error(f"Error getting instance status for {instance_id}: {str(e)}") raise Exception(f"Error getting instance status for {instance_id}: {str(e)}") @mcp.resource("browser://instance/{instance_id}/files") async def get_instance_files_resource(instance_id: str) -> str: """Get all files in the instance's temp directory (filename, path, size, type, mtime)""" import os import mimetypes from pathlib import Path from datetime import datetime try: manager = get_session_manager() instance = await manager.get_session_instance(instance_id) temp_dir = instance.temp_dir files_info = [] # only walk temp_dir and not walk subdirs for fname in os.listdir(temp_dir): fpath = os.path.join(temp_dir, fname) if os.path.isfile(fpath): stat = os.stat(fpath) files_info.append({ "file_path": str(fpath), "file_name": str(fname), "size_bytes": int(stat.st_size), "mime_type": str(mimetypes.guess_type(fpath)[0] or "application/octet-stream"), "download_time": float(stat.st_mtime), "relative_path": str(Path(fpath).relative_to(temp_dir)), }) return jsonable_encoder(files_info) except Exception as e: logger.error(f"Error getting files for instance {instance_id}: {str(e)}") raise Exception(f"Error getting files for instance {instance_id}: {str(e)}") @mcp.resource("browser://instance/{instance_id}/cookies") async def get_instance_cookies_resource(instance_id: str) -> dict: """Get all cookies for the instance (structure matches CookieInfo)""" try: manager = get_session_manager() instance = await manager.get_session_instance(instance_id) browser_session = instance.browser_session page = await browser_session.get_current_page() context = page.context cookies = await context.cookies() cookie_infos = [] for cookie in cookies: cookie_infos.append({ "name":cookie.get('name', ''), "value": cookie.get('value', ''), "domain": cookie.get('domain', ''), "path": cookie.get('path', '/'), "http_only": cookie.get('httpOnly', False), "secure": cookie.get('secure', False), "same_site": cookie.get('sameSite', 'Lax'), "expires": (datetime.fromtimestamp(cookie['expires']).isoformat() if ('expires' in cookie and cookie['expires']) else None) }) return cookie_infos except Exception as e: logger.error(f"Error getting cookies for instance {instance_id}: {str(e)}") raise Exception(f"Error getting cookies for instance {instance_id}: {str(e)}") @mcp.resource("browser://instance/{instance_id}/file/{relative_path*}") async def get_instance_file_resource(instance_id: str, relative_path: str): import mimetypes manager = get_session_manager() instance = await manager.get_session_instance(instance_id) temp_dir = instance.temp_dir abs_path = os.path.abspath(os.path.join(temp_dir, relative_path)) mime_type, _ = mimetypes.guess_type(abs_path) if mime_type is None: mime_type = "application/octet-stream" if mime_type.startswith("text/") or mime_type == "application/json": with open(abs_path, "r", encoding="utf-8") as f: return f.read() else: with open(abs_path, "rb") as f: return f.read() # === Tips and Help Tools === @mcp.tool() async def browser_tips(ctx: Context) -> str: """LLM-oriented best practices and practical tips for robust, step-by-step use of browser MCP tools.""" tips = """ 🤖 **LLM Best Practices for Using Browser MCP Tools** 1. **Always Observe Before Acting** - Before any interaction (click, input, etc.), call get_page_state() to get the latest element indices and structure. - Never assume element indices or page content are unchanged after navigation or interaction. 2. **Wait and Retry Logic** - After navigation, clicking, or input, always call wait(seconds) to allow the page to update. - For dynamic content, loop: get_page_state() → check for element → wait() → repeat, until found or timeout. - If an action fails (e.g., element not found), call get_page_state() again and retry the action. - For persistent failures, consider refreshing or re-navigating the page. 3. **Multi-Step Reasoning and Validation** - For workflows (e.g., login, form filling), always validate after each step: - After navigation: wait() + get_page_state() - After input: get_page_state() to confirm the field is filled - After clicking: wait() + get_page_state() to check for page changes - For batch/loop actions, always wait() and refresh get_page_state() between steps to avoid index drift. 4. **Element Targeting** - Prefer using element indices from get_page_state() for click/input actions. - For highly dynamic or complex DOMs, use click_element_by_xpath with a precise XPath (get from get_page_state() or get_element_info()). - If the DOM changes, always refresh the element map before acting. 5. **Tab and Context Awareness** - When opening new tabs (navigate_to with new_tab=True), use get_tabs_info() to find the new tab's id, then switch_tab() before further actions. - After switching tabs, always get_page_state() to refresh context. - When closing tabs, verify the remaining tab(s) and their URLs with get_tabs_info(). - For multi-tab/multi-session, always check the current context (tab/session) before performing actions. 6. **Form Input and Validation** - After input_text or set_element_value, take_screenshot() for debugging and validation. - If input is not reflected, check if the element is visible and editable, or use set_element_value for direct assignment. 7. **File Operations** - After download_file, download_image, or upload_file, check the file existence in temp_dir. - For batch downloads, wait() between actions to avoid race conditions. 8. **Cookie and Session Handling** - After set_cookie(), refresh or navigate to ensure the cookie is applied. - Use get_cookies() to verify session state before performing authenticated actions; if lost, re-set cookies as needed. 9. **Evidence and Traceability** - After critical actions (e.g., form submit, navigation), take_screenshot() for evidence. - Use extract_content() to verify that the expected text/content is present. - For complex flows, take_screenshot() at each step to create a traceable operation log. 10. **Exception Handling and Self-Healing** - On any tool failure, first refresh state (get_page_state()), then retry. - Wrap all actions in try/except; on failure, take a screenshot and get_page_state() for diagnostics. - If repeated failures, consider re-navigating or reloading the page. 11. **Page Accessibility Issues** - If get_page_state() returns "Page not accessible", use check_browser_health() to diagnose the issue. - For unresponsive pages, try refresh_page() to reload the current page. - If the browser session is corrupted, create a new instance with create_chrome_instance(). - Always check browser health before starting complex automation workflows. 12. **Dynamic Content and Timeouts** - For elements that load asynchronously, use a loop with get_page_state() and wait() until the element is found or a timeout is reached. - Example: while not found_element and not timeout: get_page_state(); wait(1) 13. **General LLM Reasoning Principles** - Never hardcode indices or assume static DOM—always fetch fresh state. - Use explicit validation after each step. - If unsure, prefer to over-explain your reasoning and check state before acting. - When encountering "Page is not accessible" errors, use the recovery tools: check_browser_health(), refresh_page(), or create_chrome_instance(). **Summary for LLMs:** Treat the browser as a dynamic, stateful environment. Always observe (get_page_state), wait for changes, validate before acting, and recover from errors by refreshing state. Use evidence (screenshots, content extraction) to confirm success. Think step by step, and never assume the page is static! Learn from the test cases: wait for what you need, check before you act, and always handle the unexpected! **Recovery Tools for Page Issues:** - check_browser_health(): Diagnose browser session problems - refresh_page(): Reload the current page when it's unresponsive - create_chrome_instance(): Start fresh when the session is corrupted """ logger.info("Browser automation tips provided") return tips # === Resources === @mcp.resource("browser://status") async def get_browser_status_resource() -> str: """Get current browser status as a resource, only admin can use this tool""" try: manager = get_session_manager() status = await manager.get_all_sessions_info() status_text = [] status_text.append(f"Session-Based Browser Manager Status:") status_text.append(f"- Active Sessions: {len(status)}") status_text.append(f"- Max Sessions: {manager.max_instances}") status_text.append(f"- Default TTL: {manager.default_ttl}s") status_text.append("") if status: status_text.append("Active Sessions:") for info in status: status_text.append(f" Session {info.session_id[:8]}... ({info.config.headless and 'headless' or 'visible'})") status_text.append(f" Created: {info.created_at.strftime('%Y-%m-%d %H:%M:%S')}") status_text.append(f" Last Used: {info.last_used.strftime('%Y-%m-%d %H:%M:%S')}") status_text.append(f" Tabs: {info.active_tabs}") status_text.append(f" Screenshots: {info.screenshot_count}") status_text.append("") else: status_text.append("No active sessions") return "\n".join(status_text) except Exception as e: return f"Error getting browser status: {str(e)}" @mcp.resource("browser://instances") async def get_instances_resource() -> str: """Get detailed information about all browser instances, only admin can use this tool""" try: manager = get_session_manager() instances = await manager.get_all_sessions_info() if not instances: return "No browser sessions active" instances_text = [] instances_text.append(f"Browser Sessions ({len(instances)}):") instances_text.append("") for info in instances: instances_text.append(f"Session ID: {info.session_id}") instances_text.append(f" Status: Active") instances_text.append(f" Created: {info.created_at}") instances_text.append(f" Last Used: {info.last_used}") instances_text.append(f" Temp Dir: {info.temp_dir}") instances_text.append(f" Configuration:") instances_text.append(f" Headless: {info.config.headless}") instances_text.append(f" No Sandbox: {info.config.no_sandbox}") instances_text.append(f" User Agent: {info.config.user_agent}") instances_text.append(f" Viewport Width: {info.config.viewport_width}") instances_text.append(f" Viewport Height: {info.config.viewport_height}") instances_text.append(f" Disable Web Security: {info.config.disable_web_security}") instances_text.append("") return "\n".join(instances_text) except Exception as e: return f"Error getting instances info: {str(e)}" @mcp.resource("browser://help") async def get_help_resource() -> str: """Get concise help: all tools and resources, grouped and described.""" help_text = """ 🧭 **Browser MCP API Reference** --- **Session Management** - create_chrome_instance: Create a new browser session - close_instance: Close a specific session - get_instance_info: Get info for a session - get_browser_status: List all sessions - close_all_instances: Close all sessions **Browser Configuration** - set_browser_config: Set browser config (headless, viewport, etc.) - get_browser_config: Get current config **Navigation** - navigate_to: Go to a URL (optionally new tab) - navigate_back: Go back in history - navigate_forward: Go forward in history - get_page_state: Get current page and elements **Tab Management** - get_tabs_info: List all open tabs - close_tab: Close a tab - switch_tab: Switch to a tab **Element Interaction** - click_element_by_xpath: Click element by XPath - click_element: Click element by index - input_text: Input text into a field - set_element_value: Set value of input/select - get_element_info: Get info for an element - send_keys: Send keyboard keys - scroll_page: Scroll page up/down - upload_file: Upload file to input **PDF & Screenshot** - generate_pdf: Save page as PDF - take_screenshot: Capture screenshot **File Download/Upload** - download_file: Download file from URL - download_image: Download image from URL **Cookie Management** - set_cookie: Set a browser cookie - get_cookies: Get cookies **Utilities** - wait: Wait for seconds - search_bing: Bing search - extract_content: Extract text from page - get_dropdown_options: Get select options - browser_tips: Best practices for automation --- **Resources** - browser://status: Manager and sessions status - browser://instances: All sessions info - browser://instance/{id}/page: Session page info - browser://instance/{id}/tabs: Session tabs - browser://instance/{id}/screenshots: Session screenshots - browser://instance/{id}/status: Session status (detailed) - browser://instance/{id}/files: Session temp files - browser://instance/{id}/cookies: Session cookies - browser://instance/{id}/file/{relative_path}: Read a file in session temp - browser://help: This help """ return help_text # === Prompts === @mcp.prompt() def web_testing(url: str, test_scenario: str) -> str: """Generate a prompt for web testing scenarios""" return f"""I need to test this website: {url} Test scenario: {test_scenario} 🧪 Enhanced Web Testing Plan: 🎯 Setup & Initialization: 1. start_browser() → Create isolated test instance 2. set_browser_config() → Configure for testing (viewport, user agent, etc.) 3. navigate_to(instance_id, "{url}") → Navigate to target 4. take_screenshot(instance_id) → Document initial state 🔍 Page Analysis: - get_page_state(instance_id) → Analyze interactive elements - extract_content(instance_id, "relevant_terms") → Check page content - get_tabs_info(instance_id) → Monitor tab behavior 📋 Test Execution Strategy: Based on scenario: {test_scenario} For Functional Testing: - Test all interactive elements via click_element() - Validate form inputs with input_text() and set_element_value() - Test file uploads with upload_file() - Verify dropdown options with get_dropdown_options() For Security Testing: - Test cookie behavior with set_cookie() and get_cookies() - Validate HTTPS behavior with secure cookies - Test session management across tabs For Performance Testing: - Monitor page load times - Test with different viewport sizes - Capture screenshots at different stages 📊 Evidence Collection: - take_screenshot() at each test step - generate_pdf() for comprehensive documentation - extract_content() to verify text changes - Save test artifacts to instance temp directory ✅ Validation Steps: - Screenshot comparison for visual regression - Content extraction for text validation - Tab management for popup/redirect testing - Cookie inspection for session testing 🔧 Advanced Testing Features: - Multiple instances for concurrent testing - Custom browser configurations per test - Automated cleanup with TTL management - Comprehensive error logging Please execute this systematically with proper instance management!""" @mcp.prompt() def data_extraction(url: str, data_type: str) -> str: """Generate a prompt for extracting data from web pages""" return f"""I need to extract {data_type} from this website: {url} 🔍 Advanced Data Extraction Plan: 🚀 Setup & Navigation: 1. start_browser() → Create extraction instance 2. set_browser_config() → Optimize for data extraction: - Disable images for faster loading - Set appropriate viewport size - Configure user agent if needed 3. navigate_to(instance_id, "{url}") → Target website 📊 Data Identification: - get_page_state(instance_id) → Map all interactive elements - extract_content(instance_id, "{data_type}") → Initial content scan - take_screenshot(instance_id) → Visual documentation 🎯 Extraction Strategy for {data_type}: For Structured Data (Tables, Lists): - Identify table/list elements via get_page_state() - Use click_element() for sortable columns - Extract via extract_content() with specific queries For Form-Based Data: - Use get_dropdown_options() for select menus - Fill search forms with input_text() - Navigate results with pagination For Dynamic Content: - Handle JavaScript-loaded content - Use scroll_page() to trigger lazy loading - Wait for content with wait() function 📁 Data Processing: - extract_content() with targeted queries - generate_pdf() for formatted data capture - download_file() for exportable data - Multiple screenshots for visual evidence 🔄 Advanced Techniques: - Cookie management for authenticated areas - Multiple tabs for comparison - Session persistence across pages - Automated pagination handling 📋 Output Management: - Save extracted data to instance temp directory - Generate comprehensive PDF reports - Maintain screenshot history - Structured data export options 💡 Pro Tips: - Use multiple instances for parallel extraction - Set appropriate TTL for long extractions - Handle anti-bot measures with proper delays - Maintain extraction audit trail Please execute with proper error handling and data validation!""" def user_message(content: str): return Message(content, role="user") def assistant_message(content: str): return Message(content, role="assistant") @mcp.prompt() def form_filling(url: str, form_data: str) -> list[Message]: """Generate a conversation for automated form filling""" return [ user_message(f"I need to fill out a form on {url}"), user_message(f"Form data to enter: {form_data}"), assistant_message("🤖 I'll help you automate the form filling process using the enhanced browser automation system!"), assistant_message("📋 Here's my systematic approach:"), assistant_message("1. **Instance Creation**: start_browser() to create an isolated session"), assistant_message("2. **Navigation**: navigate_to(instance_id, url) to reach the form"), assistant_message("3. **Page Analysis**: get_page_state(instance_id) to identify form fields"), assistant_message("4. **Documentation**: take_screenshot(instance_id) for before state"), assistant_message("5. **Form Interaction**:"), assistant_message(" - Use input_text() for text fields"), assistant_message(" - Use set_element_value() for direct value setting"), assistant_message(" - Use click_element() for checkboxes/radio buttons"), assistant_message(" - Use get_dropdown_options() + click_element() for select menus"), assistant_message(" - Use upload_file() for file inputs"), assistant_message("6. **Validation**: take_screenshot() after filling"), assistant_message("7. **Submission**: click_element() on submit button"), assistant_message("8. **Verification**: Capture results and clean up"), assistant_message("🔧 **Advanced Features I'll Use**:"), assistant_message("- Smart element detection and interaction"), assistant_message("- Screenshot documentation at each step"), assistant_message("- Error handling and retry logic"), assistant_message("- Session management with cookies if needed"), assistant_message("- PDF generation for final documentation"), user_message("Perfect! Please proceed step by step and document each action with screenshots. Let me know if you need any clarification about the form data."), ] @mcp.prompt() def automation_troubleshooting() -> str: """Generate a prompt for troubleshooting browser automation issues""" return """🔧 Browser Automation Troubleshooting Guide Having issues with browser automation? Here's a systematic approach to diagnose and fix common problems: 🚨 Common Issues & Solutions: **Instance Management Issues:** - Problem: "Instance not found" errors - Solution: Check get_browser_status() and use valid instance_id - Use get_instance_info(instance_id) to verify instance state **Element Interaction Failures:** - Problem: Elements not clickable or not found - Solution: 1. Call get_page_state(instance_id) to refresh element map 2. Take screenshot for visual debugging 3. Use wait() for page loading 4. Check element visibility with targeted screenshots **Navigation Issues:** - Problem: Pages not loading or timing out - Solution: 1. Increase timeout settings in browser config 2. Use wait() after navigation 3. Check network connectivity 4. Try navigate_back() and retry **Performance Problems:** - Problem: Slow automation or timeouts - Solution: 1. Enable headless mode for better performance 2. Disable images and unnecessary features 3. Use smaller viewport sizes 4. Close unused instances **Authentication & Session Issues:** - Problem: Login state not persisting - Solution: 1. Use set_cookie() to manage sessions 2. Check get_cookies() for session cookies 3. Ensure proper domain and path settings 4. Use secure flags for HTTPS sites 🛠️ Debugging Workflow: 1. Check browser_tips() for best practices 2. Use browser://status resource for overview 3. Take screenshots at each step for visual debugging 4. Use extract_content() to verify page state 5. Check browser://help for comprehensive guidance 🔍 Advanced Debugging: - Enable browser logging in config - Use multiple instances to isolate issues - Generate PDFs for comprehensive documentation - Monitor TTL and cleanup timing Need specific help? Describe your issue and I'll provide targeted solutions!"""

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Euraxluo/browser-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server