Skip to main content
Glama
cli.py26.2 kB
"""mcp-server-browser-use: Unified CLI for Browser Use MCP Server. Commands: - server: Start HTTP MCP server (default: background daemon, -f for foreground) - stop: Stop the running server daemon - status: Check if server is running - logs: View server logs - install: Install to Claude Desktop config - config: View or modify configuration - skill: Manage browser skills (list, get, delete) """ import json import os import signal import sys from pathlib import Path import typer from rich.console import Console from rich.panel import Panel from rich.table import Table from .config import APP_NAME, CONFIG_FILE, get_default_results_dir, load_config_file, save_config_file, settings from .skills import SkillStore def get_state_dir() -> Path: """Get the state directory for runtime files (e.g. ~/.local/state/mcp-server-browser-use).""" if os.name == "nt": base = Path(os.environ.get("LOCALAPPDATA", Path.home() / ".local/state")).expanduser() else: base = Path(os.environ.get("XDG_STATE_HOME", "~/.local/state")).expanduser() path = base / APP_NAME path.mkdir(parents=True, exist_ok=True) return path SERVER_INFO_FILE = get_state_dir() / "server.json" LOG_FILE = get_state_dir() / "server.log" app = typer.Typer( name="mcp-server-browser-use", help="Browser automation MCP server & CLI", no_args_is_help=False, # Show deprecation message instead of help ) console = Console() def _read_server_info() -> dict | None: """Read server info from file, return None if not exists or invalid.""" if not SERVER_INFO_FILE.exists(): return None try: info = json.loads(SERVER_INFO_FILE.read_text()) # Validate required keys required = {"pid", "host", "port", "transport"} if not required.issubset(info.keys()): return None return info except (json.JSONDecodeError, OSError): return None def _is_process_running(pid: int) -> bool: """Check if a process with given PID is running.""" try: os.kill(pid, 0) return True except OSError: return False def _write_server_info(pid: int, host: str, port: int, transport: str) -> None: """Write server info to file.""" info = {"pid": pid, "host": host, "port": port, "transport": transport} SERVER_INFO_FILE.write_text(json.dumps(info, indent=2)) def _remove_server_info() -> None: """Remove server info file if exists.""" if SERVER_INFO_FILE.exists(): SERVER_INFO_FILE.unlink() @app.command() def server( host: str = typer.Option(None, "--host", "-H", help="Host to bind to"), port: int = typer.Option(None, "--port", "-p", help="Port to bind to"), transport: str = typer.Option( "streamable-http", "--transport", "-t", help="HTTP transport: streamable-http (default) or sse", callback=lambda v: v if v in ("streamable-http", "sse") else typer.BadParameter(f"Invalid transport: {v}"), ), foreground: bool = typer.Option(False, "--foreground", "-f", help="Run in foreground (don't daemonize)"), ) -> None: """Start the HTTP MCP server as a background daemon.""" import subprocess h = host or settings.server.host p = port or settings.server.port # Check if already running info = _read_server_info() if info and _is_process_running(info["pid"]): console.print(f"[yellow]Server already running (PID {info['pid']})[/yellow]") console.print(f" URL: http://{info['host']}:{info['port']}/mcp") console.print("[dim]Use 'mcp-server-browser-use stop' to stop it[/dim]") raise typer.Exit(1) if foreground: # Run in foreground (useful for debugging) from .server import server_instance console.print("[bold green]Starting HTTP MCP server (foreground)[/bold green]") console.print(f" Provider: {settings.llm.provider}") console.print(f" Model: {settings.llm.model_name}") console.print(f" URL: http://{h}:{p}/mcp") _write_server_info(os.getpid(), h, p, transport) try: server_instance.run(transport=transport, host=h, port=p) # type: ignore[arg-type] finally: _remove_server_info() return # Daemonize: spawn subprocess and detach cmd = [ sys.executable, "-m", "mcp_server_browser_use.cli", "server", "--host", h, "--port", str(p), "--transport", transport, "--foreground", ] # Open log file for output LOG_FILE.parent.mkdir(parents=True, exist_ok=True) log_fd = open(LOG_FILE, "a") proc = subprocess.Popen( cmd, stdout=log_fd, stderr=log_fd, stdin=subprocess.DEVNULL, start_new_session=True, cwd=os.getcwd(), env=os.environ.copy(), ) console.print("[bold green]Started HTTP MCP server (background)[/bold green]") console.print(f" PID: {proc.pid}") console.print(f" Provider: {settings.llm.provider}") console.print(f" Model: {settings.llm.model_name}") console.print(f" URL: http://{h}:{p}/mcp") console.print(f" Log: {LOG_FILE}") console.print(f" Info: {SERVER_INFO_FILE}") @app.command() def stop() -> None: """Stop the running server daemon.""" info = _read_server_info() if info is None: console.print("[yellow]No server info file found[/yellow]") raise typer.Exit(1) pid = info["pid"] if not _is_process_running(pid): console.print(f"[yellow]Server process (PID {pid}) not running, cleaning up[/yellow]") _remove_server_info() raise typer.Exit(0) console.print(f"[bold]Stopping server (PID {pid})...[/bold]") try: os.kill(pid, signal.SIGTERM) # Wait briefly for graceful shutdown import time for _ in range(10): if not _is_process_running(pid): break time.sleep(0.5) else: # Force kill if still running console.print("[yellow]Forcing shutdown...[/yellow]") os.kill(pid, signal.SIGKILL) except OSError as e: console.print(f"[red]Failed to stop server: {e}[/red]") raise typer.Exit(1) _remove_server_info() console.print("[green]Server stopped[/green]") @app.command() def status() -> None: """Check if server is running.""" info = _read_server_info() if info is None: console.print("[dim]Server not running (no info file)[/dim]") raise typer.Exit(1) pid = info["pid"] if _is_process_running(pid): console.print(f"[green]Server running (PID {pid})[/green]") console.print(f" URL: http://{info['host']}:{info['port']}/mcp") console.print(f" Transport: {info['transport']}") console.print(f" Log: {LOG_FILE}") else: console.print(f"[yellow]Server not running (stale PID {pid})[/yellow]") _remove_server_info() raise typer.Exit(1) @app.command() def logs( follow: bool = typer.Option(False, "--follow", "-f", help="Follow log output"), lines: int = typer.Option(50, "--lines", "-n", help="Number of lines to show"), ) -> None: """View server logs.""" if not LOG_FILE.exists(): console.print("[yellow]No log file found[/yellow]") raise typer.Exit(1) if follow: import subprocess subprocess.run(["tail", "-f", str(LOG_FILE)], check=False) else: import subprocess subprocess.run(["tail", "-n", str(lines), str(LOG_FILE)], check=False) # --- MCP Client Commands --- def _get_server_url() -> str: """Get the URL of the running server.""" info = _read_server_info() if not info or not _is_process_running(info["pid"]): console.print("[red]Server is not running[/red]") console.print("[dim]Start it with: mcp-server-browser-use server[/dim]") raise typer.Exit(1) return f"http://{info['host']}:{info['port']}/mcp" async def _async_list_tools() -> list[dict]: """List tools from the MCP server.""" from fastmcp import Client url = _get_server_url() async with Client(url) as client: tools = await client.list_tools() return [{"name": t.name, "description": t.description or ""} for t in tools] async def _async_call_tool(tool_name: str, arguments: dict) -> tuple[list, bool]: """Call a tool on the MCP server.""" from fastmcp import Client url = _get_server_url() async with Client(url) as client: result = await client.call_tool(tool_name, arguments) return result.content, result.is_error @app.command() def tools() -> None: """List available tools from the running server.""" import asyncio try: tools_list = asyncio.run(_async_list_tools()) except Exception as e: console.print(f"[red]Error: {e}[/red]") raise typer.Exit(1) if not tools_list: console.print("[yellow]No tools available[/yellow]") return table = Table(title="Available MCP Tools") table.add_column("Tool", style="cyan") table.add_column("Description", style="white") for tool in tools_list: name = tool.get("name", "") desc = tool.get("description", "")[:80] table.add_row(name, desc) console.print(table) console.print("\n[dim]Use 'mcp-server-browser-use call <tool> [args]' to run a tool[/dim]") @app.command() def call( tool_name: str = typer.Argument(..., help="Name of the tool to call"), args: list[str] = typer.Argument(None, help="Tool arguments as key=value pairs"), ) -> None: """Call a tool on the running server. Examples: mcp-server-browser-use call skill_list mcp-server-browser-use call run_browser_agent task="Go to google.com" mcp-server-browser-use call run_deep_research topic="AI trends 2025" """ import asyncio # Parse key=value arguments tool_args = {} if args: for arg in args: if "=" in arg: key, value = arg.split("=", 1) # Try to parse as JSON for complex types try: import json as json_module tool_args[key] = json_module.loads(value) except json.JSONDecodeError: tool_args[key] = value else: # Positional arg - assume it's the first required param # For convenience: `call run_browser_agent "Go to google.com"` tool_args["task"] = arg console.print(f"[bold blue]Calling {tool_name}...[/bold blue]") if tool_args: console.print(f"[dim]Arguments: {tool_args}[/dim]") try: content, is_error = asyncio.run(_async_call_tool(tool_name, tool_args)) except Exception as e: console.print(f"[red]Error: {e}[/red]") raise typer.Exit(1) # Display result content for item in content: if hasattr(item, "text"): text = item.text # Try to pretty-print JSON try: import json as json_module parsed = json_module.loads(text) console.print_json(data=parsed) except (json.JSONDecodeError, TypeError): console.print(text) if is_error: console.print("[red]Tool returned an error[/red]") raise typer.Exit(1) console.print("[green]Done[/green]") @app.command() def install() -> None: """Install the MCP server to Claude Desktop configuration. Configures Claude Desktop to run mcp-server-browser-use via stdio transport. """ console.print("[bold blue]Installing to Claude Desktop...[/bold blue]") # Detect config file location if sys.platform == "darwin": config_path = Path.home() / "Library/Application Support/Claude/claude_desktop_config.json" elif sys.platform == "win32": config_path = Path(os.environ.get("APPDATA", "")) / "Claude/claude_desktop_config.json" else: config_path = Path.home() / ".config/Claude/claude_desktop_config.json" if not config_path.exists(): console.print(f"[yellow]Config not found at {config_path}[/yellow]") if typer.confirm("Create config file?"): config_path.parent.mkdir(parents=True, exist_ok=True) config_path.write_text("{}") else: raise typer.Exit(1) # Read existing config data = json.loads(config_path.read_text()) mcp_servers = data.get("mcpServers", {}) cwd = Path.cwd() server_config = { "command": "uv", "args": ["run", "mcp-server-browser-use"], "cwd": str(cwd), } mcp_servers["browser-use"] = server_config data["mcpServers"] = mcp_servers config_path.write_text(json.dumps(data, indent=2)) console.print(f"[green]Configured 'browser-use' server in {config_path}[/green]") console.print("[yellow]Restart Claude Desktop to apply changes.[/yellow]") @app.command("config") def config_cmd( action: str = typer.Argument("view", help="Action: view, set, path, save"), key: str | None = typer.Option(None, "--key", "-k", help="Config key (e.g., llm.provider)"), value: str | None = typer.Option(None, "--value", "-v", help="Value to set"), ) -> None: """View or modify configuration.""" if action == "path": console.print(str(CONFIG_FILE)) return if action == "view": table = Table(title="Current Configuration") table.add_column("Setting", style="cyan") table.add_column("Value", style="green") table.add_column("JSON Key", style="dim") table.add_row("Config File", str(CONFIG_FILE), "-") table.add_row("Results Dir", str(settings.server.results_dir or get_default_results_dir()), "server.results_dir") table.add_row("LLM Provider", settings.llm.provider, "llm.provider") table.add_row("LLM Model", settings.llm.model_name, "llm.model_name") table.add_row("LLM Base URL", settings.llm.base_url or "(default)", "llm.base_url") table.add_row("Browser Headless", str(settings.browser.headless), "browser.headless") table.add_row("Browser Proxy", settings.browser.proxy_server or "(none)", "browser.proxy_server") table.add_row("Max Steps", str(settings.agent.max_steps), "agent.max_steps") table.add_row("Max Searches", str(settings.research.max_searches), "research.max_searches") table.add_row("Server Transport", settings.server.transport, "server.transport") table.add_row("Server Host", settings.server.host, "server.host") table.add_row("Server Port", str(settings.server.port), "server.port") console.print(table) return if action == "save": path = settings.save() console.print(f"[green]Configuration saved to {path}[/green]") return if action == "set": if not key or value is None: console.print("[red]--key and --value required for 'set' action[/red]") raise typer.Exit(1) # Load current file config current = load_config_file() # Parse key path parts = key.split(".") target = current for part in parts[:-1]: target = target.setdefault(part, {}) # Parse value type if value.lower() == "true": parsed_value = True elif value.lower() == "false": parsed_value = False elif value.isdigit(): parsed_value = int(value) else: parsed_value = value target[parts[-1]] = parsed_value save_config_file(current) console.print(f"[green]Set {key} = {parsed_value}[/green]") console.print(f"[dim]Saved to {CONFIG_FILE}[/dim]") return console.print(f"[red]Unknown action: {action}. Use: view, set, path, save[/red]") # --- Skill Management Commands --- skill_app = typer.Typer(help="Manage browser skills") app.add_typer(skill_app, name="skill") @skill_app.command("list") def skill_list() -> None: """List all available skills.""" store = SkillStore(directory=settings.skills.directory) skills = store.list_all() if not skills: console.print(f"[yellow]No skills found in {store.directory}[/yellow]") console.print("\n[dim]Create skills manually or copy examples from:[/dim]") console.print("[dim] examples/skills/[/dim]") return table = Table(title=f"Browser Skills ({store.directory})") table.add_column("Name", style="cyan") table.add_column("Description", style="white") table.add_column("Success Rate", style="green") table.add_column("Usage", style="dim") table.add_column("Last Used", style="dim") for s in skills: usage = s.success_count + s.failure_count rate = f"{s.success_rate * 100:.0f}%" if usage > 0 else "-" last = s.last_used.strftime("%Y-%m-%d") if s.last_used else "-" table.add_row(s.name, s.description[:40], rate, str(usage), last) console.print(table) @skill_app.command("get") def skill_get( name: str = typer.Argument(..., help="Skill name"), ) -> None: """Show full details of a skill.""" store = SkillStore(directory=settings.skills.directory) skill = store.load(name) if not skill: console.print(f"[red]Skill not found: {name}[/red]") console.print(f"[dim]Skills directory: {store.directory}[/dim]") raise typer.Exit(1) console.print(Panel(store.to_yaml(skill), title=f"Skill: {name}", expand=False)) @skill_app.command("delete") def skill_delete( name: str = typer.Argument(..., help="Skill name to delete"), force: bool = typer.Option(False, "--force", "-f", help="Skip confirmation"), ) -> None: """Delete a skill.""" store = SkillStore(directory=settings.skills.directory) if not store.exists(name): console.print(f"[red]Skill not found: {name}[/red]") raise typer.Exit(1) if not force: if not typer.confirm(f"Delete skill '{name}'?"): console.print("[dim]Cancelled[/dim]") return store.delete(name) console.print(f"[green]Skill '{name}' deleted[/green]") # --- Observability Commands --- @app.command() def tasks( limit: int = typer.Option(20, "--limit", "-n", help="Number of tasks to show"), status_filter: str | None = typer.Option(None, "--status", "-s", help="Filter by status: running, completed, failed"), tool_filter: str | None = typer.Option(None, "--tool", "-t", help="Filter by tool name"), ) -> None: """List recent tasks with status and progress.""" import asyncio from .observability import TaskStatus from .observability.store import TaskStore async def _list_tasks(): store = TaskStore() await store.initialize() status = None if status_filter: try: status = TaskStatus(status_filter) except ValueError: console.print(f"[red]Invalid status: {status_filter}. Use: running, completed, failed, pending[/red]") raise typer.Exit(1) return await store.get_task_history(limit=limit, status=status, tool_name=tool_filter) try: tasks_list = asyncio.run(_list_tasks()) except Exception as e: console.print(f"[red]Error: {e}[/red]") raise typer.Exit(1) if not tasks_list: console.print("[yellow]No tasks found[/yellow]") return table = Table(title=f"Recent Tasks (limit={limit})") table.add_column("ID", style="cyan", width=8) table.add_column("Tool", style="white") table.add_column("Status", style="green") table.add_column("Progress", style="yellow") table.add_column("Message", style="dim", max_width=30) table.add_column("Duration", style="dim") table.add_column("Created", style="dim") for t in tasks_list: # Format status with color status_style = { "running": "[blue]running[/blue]", "completed": "[green]completed[/green]", "failed": "[red]failed[/red]", "pending": "[yellow]pending[/yellow]", }.get(t.status.value, t.status.value) progress = f"{t.progress_current}/{t.progress_total}" if t.progress_total > 0 else "-" duration = f"{t.duration_seconds:.1f}s" if t.duration_seconds else "-" created = t.created_at.strftime("%H:%M:%S") if t.created_at else "-" message = (t.progress_message[:27] + "...") if t.progress_message and len(t.progress_message) > 30 else (t.progress_message or "-") table.add_row(t.task_id[:8], t.tool_name, status_style, progress, message, duration, created) console.print(table) @app.command("task") def task_detail( task_id: str = typer.Argument(..., help="Task ID (full or prefix)"), ) -> None: """Show detailed information about a specific task.""" import asyncio from .observability.store import TaskStore async def _get_task(): store = TaskStore() await store.initialize() # Try exact match first task = await store.get_task(task_id) if task: return task # Try prefix match tasks = await store.get_task_history(limit=100) for t in tasks: if t.task_id.startswith(task_id): return t return None try: task = asyncio.run(_get_task()) except Exception as e: console.print(f"[red]Error: {e}[/red]") raise typer.Exit(1) if not task: console.print(f"[red]Task not found: {task_id}[/red]") raise typer.Exit(1) # Format task details status_style = { "running": "[blue]running[/blue]", "completed": "[green]completed[/green]", "failed": "[red]failed[/red]", "pending": "[yellow]pending[/yellow]", }.get(task.status.value, task.status.value) console.print(f"\n[bold]Task:[/bold] {task.task_id}") console.print(f"[bold]Tool:[/bold] {task.tool_name}") console.print(f"[bold]Status:[/bold] {status_style}") if task.stage: console.print(f"[bold]Stage:[/bold] {task.stage.value}") console.print(f"[bold]Progress:[/bold] {task.progress_current}/{task.progress_total} ({task.progress_percent:.1f}%)") if task.progress_message: console.print(f"[bold]Message:[/bold] {task.progress_message}") console.print(f"\n[bold]Created:[/bold] {task.created_at.isoformat()}") if task.started_at: console.print(f"[bold]Started:[/bold] {task.started_at.isoformat()}") if task.completed_at: console.print(f"[bold]Completed:[/bold] {task.completed_at.isoformat()}") if task.duration_seconds: console.print(f"[bold]Duration:[/bold] {task.duration_seconds:.1f}s") if task.input_params: console.print("\n[bold]Input:[/bold]") for key, value in task.input_params.items(): val_str = str(value)[:100] + "..." if len(str(value)) > 100 else str(value) console.print(f" {key}: {val_str}") if task.result: console.print("\n[bold]Result:[/bold]") result_preview = task.result[:500] + "..." if len(task.result) > 500 else task.result console.print(Panel(result_preview, expand=False)) if task.error: console.print("\n[bold red]Error:[/bold red]") console.print(Panel(task.error, style="red", expand=False)) @app.command() def health() -> None: """Show server health and running task status.""" import asyncio import psutil from .observability.store import TaskStore async def _get_health(): store = TaskStore() await store.initialize() running = await store.get_running_tasks() stats = await store.get_stats() return running, stats # Check if server is running info = _read_server_info() if not info or not _is_process_running(info["pid"]): console.print("[yellow]Server not running[/yellow]") console.print("[dim]Start with: mcp-server-browser-use server[/dim]") else: console.print(f"[green]Server running[/green] (PID {info['pid']})") console.print(f" URL: http://{info['host']}:{info['port']}/mcp") # Get process memory try: proc = psutil.Process(info["pid"]) mem = proc.memory_info().rss / 1024 / 1024 console.print(f" Memory: {mem:.1f} MB") except Exception: pass # Get task stats try: running_tasks, stats = asyncio.run(_get_health()) except Exception as e: console.print(f"[red]Error reading task store: {e}[/red]") raise typer.Exit(1) console.print("\n[bold]Task Statistics:[/bold]") console.print(f" Total tasks: {stats.get('total_tasks', 0)}") console.print(f" Running: {stats.get('running_count', 0)}") console.print(f" Success rate (24h): {stats.get('success_rate_24h', 0):.1f}%") if stats.get("by_tool"): console.print("\n[bold]By Tool:[/bold]") for tool, count in stats["by_tool"].items(): console.print(f" {tool}: {count}") if running_tasks: console.print(f"\n[bold]Running Tasks ({len(running_tasks)}):[/bold]") for t in running_tasks: progress = f"{t.progress_current}/{t.progress_total}" if t.progress_total > 0 else "-" stage = f" ({t.stage.value})" if t.stage else "" console.print(f" [{t.task_id[:8]}] {t.tool_name}{stage} - {progress}") if t.progress_message: console.print(f" → {t.progress_message}") # Default command when no subcommand is given @app.callback(invoke_without_command=True) def main(ctx: typer.Context) -> None: """Browser Use MCP Server & CLI. Run 'server' subcommand to start the HTTP MCP server. """ if ctx.invoked_subcommand is None: # stdio is deprecated - show migration guide from .server import STDIO_DEPRECATION_MESSAGE console.print(STDIO_DEPRECATION_MESSAGE) raise typer.Exit(1) if __name__ == "__main__": app()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Saik0s/mcp-browser-use'

If you have feedback or need assistance with the MCP directory API, please join our Discord server