Skip to main content
Glama
server.py64.7 kB
"""MCP server exposing browser-use as tools with native background task support.""" import asyncio import logging import os import re import sys import time import uuid from pathlib import Path from typing import TYPE_CHECKING def _configure_stdio_logging() -> None: """Configure logging for stdio MCP mode - all logs MUST go to stderr. In stdio mode, stdout is reserved exclusively for JSON-RPC messages. Any logging or print() to stdout corrupts the protocol stream. """ # Suppress noisy loggers from dependencies BEFORE they're imported os.environ.setdefault("BROWSER_USE_LOGGING_LEVEL", "warning") # Force all logging to stderr stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")) # Configure root logger root = logging.getLogger() root.handlers = [stderr_handler] root.setLevel(logging.WARNING) # Suppress verbose loggers from dependencies for logger_name in [ "httpx", "httpcore", "urllib3", "asyncio", "playwright", "browser_use", "langchain", "langchain_core", "openai", "anthropic", ]: dep_logger = logging.getLogger(logger_name) dep_logger.setLevel(logging.WARNING) dep_logger.handlers = [stderr_handler] dep_logger.propagate = False # Configure logging BEFORE importing browser_use and other noisy dependencies _configure_stdio_logging() # ruff: noqa: E402 - Intentional late imports after logging configuration from browser_use import Agent, BrowserProfile from browser_use.browser.profile import ProxySettings from fastmcp import FastMCP from fastmcp.dependencies import CurrentContext, Progress from fastmcp.server.context import Context from fastmcp.server.tasks.config import TaskConfig from .config import settings from .exceptions import BrowserError, LLMProviderError from .observability import TaskRecord, TaskStage, TaskStatus, bind_task_context, clear_task_context, get_task_logger, setup_structured_logging from .observability.store import get_task_store from .providers import get_llm from .research.machine import ResearchMachine from .skills import SkillAnalyzer, SkillExecutor, SkillRecorder, SkillRunner, SkillStore from .utils import save_execution_result if TYPE_CHECKING: from browser_use.agent.views import AgentOutput from browser_use.browser.views import BrowserStateSummary # Apply configured log level (may override the default WARNING) logger = logging.getLogger("mcp_server_browser_use") logger.setLevel(getattr(logging, settings.server.logging_level.upper())) # Global registry of running asyncio tasks for cancellation support _running_tasks: dict[str, asyncio.Task] = {} def serve() -> FastMCP: """Create and configure MCP server with background task support.""" # Set up structured logging first setup_structured_logging() server = FastMCP("mcp_server_browser_use") # Initialize skill components (only when skills feature is enabled) skill_store: SkillStore | None = None skill_executor: SkillExecutor | None = None if settings.skills.enabled: skill_store = SkillStore(directory=settings.skills.directory) skill_executor = SkillExecutor() def _get_llm_and_profile(): """Helper to get LLM instance and browser profile.""" llm = get_llm( provider=settings.llm.provider, model=settings.llm.model_name, api_key=settings.llm.get_api_key_for_provider(), base_url=settings.llm.base_url, azure_endpoint=settings.llm.azure_endpoint, azure_api_version=settings.llm.azure_api_version, aws_region=settings.llm.aws_region, ) proxy = None if settings.browser.proxy_server: proxy = ProxySettings(server=settings.browser.proxy_server, bypass=settings.browser.proxy_bypass) profile = BrowserProfile( headless=settings.browser.headless, proxy=proxy, cdp_url=settings.browser.cdp_url, user_data_dir=settings.browser.user_data_dir, ) if settings.browser.cdp_url: logger.info(f"Using external browser via CDP: {settings.browser.cdp_url}") return llm, profile @server.tool(task=TaskConfig(mode="optional")) async def run_browser_agent( task: str, max_steps: int | None = None, skill_name: str | None = None, skill_params: str | dict | None = None, learn: bool = False, save_skill_as: str | None = None, ctx: Context = CurrentContext(), progress: Progress = Progress(), ) -> str: """ Execute a browser automation task using AI. Supports background execution with progress tracking when client requests it. EXECUTION MODE (default): - When skill_name is provided, hints are injected for efficient navigation. LEARNING MODE (learn=True): - Agent executes with API discovery instructions - On success, attempts to extract a reusable skill from the execution - If save_skill_as is provided, saves the learned skill Args: task: Natural language description of what to do in the browser max_steps: Maximum number of agent steps (default from settings) skill_name: Optional skill name to use for hints (execution mode) skill_params: Optional parameters for the skill (JSON string or dict) learn: Enable learning mode - agent focuses on API discovery save_skill_as: Name to save the learned skill (requires learn=True) Returns: Result of the browser automation task. In learning mode, includes skill extraction status. """ # --- Task Tracking Setup --- task_id = str(uuid.uuid4()) task_store = get_task_store() task_record = TaskRecord( task_id=task_id, tool_name="run_browser_agent", status=TaskStatus.PENDING, input_params={"task": task, "max_steps": max_steps, "skill_name": skill_name, "learn": learn}, ) await task_store.create_task(task_record) bind_task_context(task_id, "run_browser_agent") task_logger = get_task_logger() await ctx.info(f"Starting: {task}") logger.info(f"Starting browser agent task: {task[:100]}...") task_logger.info("task_created", task_preview=task[:100]) try: llm, profile = _get_llm_and_profile() except LLMProviderError as e: logger.error(f"LLM initialization failed: {e}") await task_store.update_status(task_id, TaskStatus.FAILED, error=str(e)) clear_task_context() return f"Error: {e}" # Mark task as running await task_store.update_status(task_id, TaskStatus.RUNNING) await task_store.update_progress(task_id, 0, 0, "Initializing...", TaskStage.INITIALIZING) task_logger.info("task_running") # Determine execution mode skill = None augmented_task = task params_dict: dict = {} if learn and skill_name: # Can't use both learning and existing skill logger.warning("learn=True ignores skill_name - running in learning mode") skill_name = None if learn and skill_executor: # LEARNING MODE: Inject API discovery instructions await ctx.info("Learning mode: Agent will discover APIs") augmented_task = skill_executor.inject_learning_mode(task) logger.info("Learning mode enabled - API discovery instructions injected") elif learn: # Skills disabled - warn and continue without learning await ctx.info("Skills feature disabled - learn parameter ignored") logger.warning("learn=True ignored - skills.enabled is False") learn = False # Disable learning for rest of execution elif skill_name and settings.skills.enabled and skill_store and skill_executor: # EXECUTION MODE: Load skill skill = skill_store.load(skill_name) if skill: # Parse skill params (accepts dict or JSON string) if skill_params: if isinstance(skill_params, dict): params_dict = skill_params elif isinstance(skill_params, str): import json try: parsed = json.loads(skill_params) if isinstance(parsed, dict): params_dict = parsed else: logger.warning(f"skill_params must be an object, got {type(parsed).__name__}") params_dict = {} except json.JSONDecodeError: logger.warning(f"Invalid skill_params JSON: {skill_params}") params_dict = {} else: logger.warning(f"skill_params must be dict or JSON string, got {type(skill_params).__name__}") params_dict = {} # Merge user params with skill parameter defaults merged_params = skill.merge_params(params_dict) # NEW: Try direct execution if skill supports it if skill.supports_direct_execution: await ctx.info(f"Direct execution: {skill.name}") logger.info(f"Attempting direct execution for skill: {skill.name}") try: # Create browser session for fetch execution from browser_use.browser.session import BrowserSession browser_session = BrowserSession(browser_profile=profile) await browser_session.start() try: runner = SkillRunner() run_result = await runner.run(skill, merged_params, browser_session) if run_result.success: # Direct execution succeeded! skill_store.record_usage(skill.name, success=True) await ctx.info("Direct execution completed") logger.info(f"Skill direct execution succeeded: {skill.name}") # Format result import json if isinstance(run_result.data, (dict, list)): final_result = json.dumps(run_result.data, indent=2) else: final_result = str(run_result.data) # Auto-save result if configured if settings.server.results_dir: saved_path = save_execution_result( final_result, prefix=f"skill_{skill.name}", metadata={"skill": skill.name, "params": params_dict, "direct": True}, ) await ctx.info(f"Saved to: {saved_path.name}") # Mark task as completed before returning await task_store.update_status(task_id, TaskStatus.COMPLETED, result=final_result) task_logger.info("task_completed", result_length=len(final_result), direct=True) clear_task_context() return final_result elif run_result.auth_recovery_triggered: # Auth failed - fall back to agent for re-auth await ctx.info("Auth required, falling back to agent...") logger.info("Direct execution needs auth recovery, falling back to agent") # Continue to agent execution below else: # Direct execution failed - fall back to agent await ctx.info(f"Direct failed: {run_result.error}, trying agent...") logger.warning(f"Direct execution failed: {run_result.error}") # Continue to agent execution below finally: await browser_session.stop() except Exception as e: logger.error(f"Direct execution error: {e}") await ctx.info("Direct execution error, trying agent...") # Continue to agent execution below # Inject hints for agent execution (fallback or non-direct skills) augmented_task = skill_executor.inject_hints(task, skill, merged_params) await ctx.info(f"Using skill hints: {skill.name}") logger.info(f"Skill hints injected for: {skill.name}") else: await ctx.info(f"Skill not found: {skill_name}") logger.warning(f"Skill not found: {skill_name}") elif skill_name: # Skills disabled - warn and continue without skill await ctx.info("Skills feature disabled - skill_name parameter ignored") logger.warning(f"skill_name='{skill_name}' ignored - skills.enabled is False") steps = max_steps if max_steps is not None else settings.agent.max_steps await progress.set_total(steps) # Track page changes and navigation for potential skill extraction last_url: str | None = None navigation_urls: list[str] = [] last_db_update: float = 0.0 # Throttle DB writes to once per second async def step_callback( state: "BrowserStateSummary", output: "AgentOutput", step_num: int, ) -> None: nonlocal last_url, last_db_update url_changed = state.url != last_url if url_changed: await ctx.info(f"→ {state.title or state.url}") navigation_urls.append(state.url) last_url = state.url await progress.increment() # Throttle DB updates: only write once per second or on URL change now = time.monotonic() if url_changed or (now - last_db_update) >= 1.0: stage = TaskStage.NAVIGATING if state.url else TaskStage.EXTRACTING message = state.title or state.url or f"Step {step_num}" await task_store.update_progress(task_id, step_num, steps, message[:100], stage) last_db_update = now task_logger.debug("step_completed", step=step_num, url=state.url) # Initialize recorder for learning mode recorder: SkillRecorder | None = None if learn: recorder = SkillRecorder(task=task) # Track recorder attachment for cleanup recorder_attached = False try: agent = Agent( task=augmented_task, llm=llm, browser_profile=profile, max_steps=steps, register_new_step_callback=step_callback, ) # In learning mode, start browser early and attach recorder to CDP if recorder: await ctx.info("Attaching network recorder...") await agent.browser_session.start() await recorder.attach(agent.browser_session) recorder_attached = True logger.info("SkillRecorder attached via CDP for network capture") # Register task for cancellation support agent_task = asyncio.create_task(agent.run()) _running_tasks[task_id] = agent_task try: result = await agent_task finally: _running_tasks.pop(task_id, None) final = result.final_result() or "Task completed without explicit result." # Validate result if skill was used (execution mode) is_valid = True if skill and skill_executor and settings.skills.validate_results: is_valid = skill_executor.validate_result(final, skill) if not is_valid: await ctx.info("Skill validation failed - hints may be outdated") logger.warning(f"Skill validation failed for: {skill.name}") # Handle fallback based on skill config if skill.fallback.strategy == "explore_full": await ctx.info("Falling back to exploration without hints...") # Re-run without hints agent = Agent( task=task, # Original task without hints llm=llm, browser_profile=profile, max_steps=steps, register_new_step_callback=step_callback, ) result = await agent.run() final = result.final_result() or "Task completed without explicit result." is_valid = True # Fallback execution is considered valid # Record skill usage statistics (execution mode) if skill and skill_store: skill_store.record_usage(skill.name, success=is_valid) # LEARNING MODE: Attempt to extract skill from execution skill_extraction_result = "" if learn and final and save_skill_as: await ctx.info("Analyzing execution for skill extraction...") try: # Finalize recorder and get full CDP recording if recorder and recorder_attached: await recorder.finalize() await recorder.detach() recorder_attached = False # Mark as detached recording = recorder.get_recording(result=final) api_count = recorder.api_call_count await ctx.info(f"Captured {api_count} API calls for analysis") logger.info(f"Recording captured: {recorder.request_count} requests, {api_count} API calls") else: # Fallback to simplified recording (shouldn't happen in learn mode) from .skills import SessionRecording recording = SessionRecording( task=task, result=final, navigation_urls=navigation_urls, ) logger.warning("Using simplified recording - recorder was not attached") # Analyze with LLM analyzer = SkillAnalyzer(llm) extracted_skill = await analyzer.analyze(recording) if extracted_skill and skill_store: extracted_skill.name = save_skill_as skill_store.save(extracted_skill) skill_extraction_result = f"\n\n[SKILL LEARNED] Saved as '{save_skill_as}'" await ctx.info(f"Skill saved: {save_skill_as}") logger.info(f"Skill extracted and saved: {save_skill_as}") else: skill_extraction_result = "\n\n[SKILL NOT LEARNED] Could not extract API from execution" await ctx.info("Could not extract skill - no suitable API found") logger.info("Skill extraction failed - no suitable API found") except Exception as e: logger.error(f"Skill extraction failed: {e}") skill_extraction_result = f"\n\n[SKILL EXTRACTION ERROR] {e}" # Auto-save result if results_dir is configured if settings.server.results_dir: saved_path = save_execution_result( final, prefix=f"agent_{task[:20]}", metadata={"task": task, "max_steps": steps, "skill": skill_name, "learn": learn}, ) await ctx.info(f"Saved to: {saved_path.name}") await ctx.info(f"Completed: {final[:100]}") logger.info(f"Agent completed: {final[:100]}...") # Mark task as completed final_result = final + skill_extraction_result await task_store.update_status(task_id, TaskStatus.COMPLETED, result=final_result) task_logger.info("task_completed", result_length=len(final_result)) clear_task_context() return final_result except asyncio.CancelledError: # Task was cancelled - record failure if skill and skill_store: skill_store.record_usage(skill.name, success=False) await task_store.update_status(task_id, TaskStatus.CANCELLED, error="Cancelled by user") task_logger.info("task_cancelled") clear_task_context() raise except Exception as e: # Record failure if skill was used if skill and skill_store: skill_store.record_usage(skill.name, success=False) # Mark task as failed await task_store.update_status(task_id, TaskStatus.FAILED, error=str(e)) task_logger.error("task_failed", error=str(e)) clear_task_context() logger.error(f"Browser agent failed: {e}") raise BrowserError(f"Browser automation failed: {e}") from e finally: # Ensure CDP listeners are always detached, even if exceptions occurred if recorder and recorder_attached: try: await recorder.detach() logger.info("CDP listeners detached successfully in finally block") except Exception as cleanup_error: # Log exception but don't mask the original error logger.exception(f"Critical: Failed to detach CDP listeners in finally block: {cleanup_error}") @server.tool(task=TaskConfig(mode="optional")) async def run_deep_research( topic: str, max_searches: int | None = None, save_to_file: str | None = None, ctx: Context = CurrentContext(), progress: Progress = Progress(), ) -> str: """ Execute deep research on a topic with progress tracking. Runs as a background task if client requests it, otherwise synchronous. Progress updates are streamed via the MCP task protocol. Args: topic: The research topic or question to investigate max_searches: Maximum number of web searches (default from settings) save_to_file: Optional file path to save the report Returns: The research report as markdown """ # --- Task Tracking Setup --- task_id = str(uuid.uuid4()) task_store = get_task_store() task_record = TaskRecord( task_id=task_id, tool_name="run_deep_research", status=TaskStatus.PENDING, input_params={"topic": topic, "max_searches": max_searches, "save_to_file": save_to_file}, ) await task_store.create_task(task_record) bind_task_context(task_id, "run_deep_research") task_logger = get_task_logger() logger.info(f"Starting deep research on: {topic}") task_logger.info("task_created", topic=topic[:100]) try: llm, profile = _get_llm_and_profile() except LLMProviderError as e: logger.error(f"LLM initialization failed: {e}") await task_store.update_status(task_id, TaskStatus.FAILED, error=str(e)) clear_task_context() return f"Error: {e}" # Mark task as running await task_store.update_status(task_id, TaskStatus.RUNNING) task_logger.info("task_running") searches = max_searches if max_searches is not None else settings.research.max_searches # Sanitize topic for safe filename safe_topic = re.sub(r"[^\w\s-]", "", topic[:50]).strip().replace(" ", "_") save_path = save_to_file or (f"{settings.research.save_directory}/{safe_topic}.md" if settings.research.save_directory else None) try: # Execute research with progress tracking machine = ResearchMachine( topic=topic, max_searches=searches, save_path=save_path, llm=llm, browser_profile=profile, progress=progress, ctx=ctx, ) # Register task for cancellation support research_task = asyncio.create_task(machine.run()) _running_tasks[task_id] = research_task try: report = await research_task finally: _running_tasks.pop(task_id, None) # Auto-save result if results_dir is configured and no explicit save path if settings.server.results_dir and not save_to_file: saved_path = save_execution_result( report, prefix=f"research_{topic[:20]}", metadata={"topic": topic, "max_searches": searches}, ) await ctx.info(f"Saved to: {saved_path.name}") # Mark task as completed await task_store.update_status(task_id, TaskStatus.COMPLETED, result=report) task_logger.info("task_completed", result_length=len(report)) clear_task_context() return report except asyncio.CancelledError: # Task was cancelled await task_store.update_status(task_id, TaskStatus.CANCELLED, error="Cancelled by user") task_logger.info("task_cancelled") clear_task_context() raise except Exception as e: await task_store.update_status(task_id, TaskStatus.FAILED, error=str(e)) task_logger.error("task_failed", error=str(e)) clear_task_context() raise # --- Skill Management Tools (only registered when skills.enabled) --- if settings.skills.enabled and skill_store: @server.tool() async def skill_list() -> str: """ List all available browser skills. Returns: JSON list of skill summaries with name, description, and usage stats """ import json assert skill_store is not None # Type narrowing for mypy skills = skill_store.list_all() if not skills: return json.dumps({"skills": [], "message": "No skills found. Use learn=True with save_skill_as to learn new skills."}) return json.dumps( { "skills": [ { "name": s.name, "description": s.description, "success_rate": round(s.success_rate * 100, 1), "usage_count": s.success_count + s.failure_count, "last_used": s.last_used.isoformat() if s.last_used else None, } for s in skills ], "skills_directory": str(skill_store.directory), }, indent=2, ) @server.tool() async def skill_get(skill_name: str) -> str: """ Get full details of a specific skill. Args: skill_name: Name of the skill to retrieve Returns: Full skill definition as YAML """ assert skill_store is not None # Type narrowing for mypy skill = skill_store.load(skill_name) if not skill: return f"Error: Skill '{skill_name}' not found in {skill_store.directory}" return skill_store.to_yaml(skill) @server.tool() async def skill_delete(skill_name: str) -> str: """ Delete a skill by name. Args: skill_name: Name of the skill to delete Returns: Success or error message """ assert skill_store is not None # Type narrowing for mypy if skill_store.delete(skill_name): return f"Skill '{skill_name}' deleted successfully" return f"Error: Skill '{skill_name}' not found" # --- Observability Tools --- # Define tool functions async def _health_check_impl() -> str: """Implementation of health check.""" import json import psutil task_store = get_task_store() running_tasks = await task_store.get_running_tasks() stats = await task_store.get_stats() # Get process stats process = psutil.Process() memory_info = process.memory_info() return json.dumps( { "status": "healthy", "uptime_seconds": round(time.time() - _server_start_time, 1), "memory_mb": round(memory_info.rss / 1024 / 1024, 1), "running_tasks": len(running_tasks), "tasks": [ { "task_id": t.task_id[:8], "tool": t.tool_name, "stage": t.stage.value if t.stage else None, "progress": f"{t.progress_current}/{t.progress_total}", "message": t.progress_message, } for t in running_tasks ], "stats": stats, }, indent=2, ) async def _task_list_impl(limit: int = 20, status_filter: str | None = None) -> str: """Implementation of task list.""" import json task_store = get_task_store() status = None if status_filter: try: status = TaskStatus(status_filter) except ValueError: return f"Error: Invalid status '{status_filter}'. Use: running, completed, failed, pending" tasks = await task_store.get_task_history(limit=limit, status=status) return json.dumps( { "tasks": [ { "task_id": t.task_id[:8], "tool": t.tool_name, "status": t.status.value, "progress": f"{t.progress_current}/{t.progress_total}", "created": t.created_at.isoformat(), "duration_sec": round(t.duration_seconds, 1) if t.duration_seconds else None, } for t in tasks ], "count": len(tasks), }, indent=2, ) async def _task_get_impl(task_id: str) -> str: """Implementation of task get.""" import json task_store = get_task_store() # Try exact match first, then prefix match task = await task_store.get_task(task_id) if not task: # Try prefix match tasks = await task_store.get_task_history(limit=100) for t in tasks: if t.task_id.startswith(task_id): task = t break if not task: return f"Error: Task '{task_id}' not found" return json.dumps( { "task_id": task.task_id, "tool": task.tool_name, "status": task.status.value, "stage": task.stage.value if task.stage else None, "progress": { "current": task.progress_current, "total": task.progress_total, "message": task.progress_message, "percent": task.progress_percent, }, "timestamps": { "created": task.created_at.isoformat(), "started": task.started_at.isoformat() if task.started_at else None, "completed": task.completed_at.isoformat() if task.completed_at else None, "duration_sec": round(task.duration_seconds, 1) if task.duration_seconds else None, }, "input": task.input_params, "result": task.result[:500] if task.result else None, "error": task.error, }, indent=2, ) @server.tool() async def health_check() -> str: """ Health check endpoint with system stats and running task information. Returns: JSON object with server health status, running tasks, and statistics """ return await _health_check_impl() @server.tool() async def task_list( limit: int = 20, status_filter: str | None = None, ) -> str: """ List recent tasks with optional filtering. Args: limit: Maximum number of tasks to return (default 20) status_filter: Optional status filter (running, completed, failed) Returns: JSON list of recent tasks """ return await _task_list_impl(limit, status_filter) @server.tool() async def task_get(task_id: str) -> str: """ Get full details of a specific task. Args: task_id: Task ID (full or prefix) Returns: JSON object with task details, input, and result/error """ return await _task_get_impl(task_id) @server.tool() async def task_cancel(task_id: str) -> str: """ Cancel a running browser agent or research task. Args: task_id: Task ID (full or prefix match) Returns: JSON with success status and message """ import json task_store = get_task_store() # Find by prefix match matched_id = None for full_id in _running_tasks: if full_id.startswith(task_id) or full_id == task_id: matched_id = full_id break if not matched_id: return json.dumps({"success": False, "error": f"Task '{task_id}' not found or not running"}) # Cancel the asyncio task task = _running_tasks[matched_id] task.cancel() # Update status in store await task_store.update_status(matched_id, TaskStatus.CANCELLED, error="Cancelled by user") return json.dumps({"success": True, "task_id": matched_id[:8], "message": "Task cancelled"}) # --- Web Viewer UI --- @server.custom_route(path="/", methods=["GET"]) async def serve_viewer(request): """Serve the web viewer UI for task monitoring.""" from starlette.responses import FileResponse # Get the path to the viewer.html file viewer_path = Path(__file__).parent / "ui" / "viewer.html" if not viewer_path.exists(): from starlette.responses import Response return Response( content="Web viewer not found. Make sure ui/viewer.html exists.", status_code=404, media_type="text/plain", ) return FileResponse(viewer_path, media_type="text/html") @server.custom_route(path="/dashboard", methods=["GET"]) async def serve_dashboard(request): """Serve the dashboard UI for task/skill management.""" from starlette.responses import FileResponse dashboard_path = Path(__file__).parent / "ui" / "dashboard.html" if not dashboard_path.exists(): from starlette.responses import Response return Response( content="Dashboard not found. Make sure ui/dashboard.html exists.", status_code=404, media_type="text/plain", ) return FileResponse(dashboard_path, media_type="text/html") # REST API endpoints for the web viewer (simpler than JSON-RPC for browser) @server.custom_route(path="/api/health", methods=["GET"]) async def api_health(request): """REST endpoint for health check.""" import json from starlette.responses import JSONResponse result = await _health_check_impl() return JSONResponse(json.loads(result)) @server.custom_route(path="/api/tasks", methods=["GET"]) async def api_tasks(request): """REST endpoint for task list.""" import json from starlette.responses import JSONResponse limit = int(request.query_params.get("limit", "20")) status_filter = request.query_params.get("status", None) result = await _task_list_impl(limit=limit, status_filter=status_filter) return JSONResponse(json.loads(result)) @server.custom_route(path="/api/tasks/{task_id}", methods=["GET"]) async def api_task_get(request): """REST endpoint for task details.""" import json from starlette.responses import JSONResponse task_id = request.path_params["task_id"] result = await _task_get_impl(task_id) # Check if it's an error message if result.startswith("Error:"): return JSONResponse({"error": result}, status_code=404) return JSONResponse(json.loads(result)) # REST API endpoints for skills def _get_skill_store() -> SkillStore | None: """Get skill store instance if skills are enabled.""" if settings.skills.enabled: return SkillStore(directory=settings.skills.directory) return None @server.custom_route(path="/api/skills", methods=["GET"]) async def api_skills(request): """REST endpoint for skills list.""" from starlette.responses import JSONResponse store = _get_skill_store() if not store: return JSONResponse({"error": "Skills feature is disabled"}, status_code=503) try: skills = store.list_all() return JSONResponse( { "skills": [ { "name": s.name, "description": s.description, "success_rate": round(s.success_rate * 100, 1), "usage_count": s.success_count + s.failure_count, "last_used": s.last_used.isoformat() if s.last_used else None, } for s in skills ], "count": len(skills), "skills_directory": str(store.directory), } ) except Exception as e: logger.error(f"Failed to list skills: {e}") return JSONResponse({"error": str(e)}, status_code=500) @server.custom_route(path="/api/skills/{name}", methods=["GET"]) async def api_skill_get(request): """REST endpoint for skill details.""" from starlette.responses import JSONResponse store = _get_skill_store() if not store: return JSONResponse({"error": "Skills feature is disabled"}, status_code=503) skill_name = request.path_params["name"] try: skill = store.load(skill_name) if not skill: return JSONResponse({"error": f"Skill '{skill_name}' not found"}, status_code=404) # Return skill as JSON (convert from dict representation) skill_dict = skill.to_dict() return JSONResponse(skill_dict) except Exception as e: logger.error(f"Failed to get skill {skill_name}: {e}") return JSONResponse({"error": str(e)}, status_code=500) @server.custom_route(path="/api/skills/{name}", methods=["DELETE"]) async def api_skill_delete(request): """REST endpoint for skill deletion.""" from starlette.responses import JSONResponse store = _get_skill_store() if not store: return JSONResponse({"error": "Skills feature is disabled"}, status_code=503) skill_name = request.path_params["name"] try: if store.delete(skill_name): return JSONResponse({"success": True, "message": f"Skill '{skill_name}' deleted successfully"}) return JSONResponse({"error": f"Skill '{skill_name}' not found"}, status_code=404) except Exception as e: logger.error(f"Failed to delete skill {skill_name}: {e}") return JSONResponse({"error": str(e)}, status_code=500) @server.custom_route(path="/api/skills/{name}/run", methods=["POST"]) async def api_skill_run(request): """REST endpoint for skill execution. Request body: { "url": "https://example.com", # Optional - can be part of task description "params": {...} # Optional skill parameters } Returns: { "task_id": "abc123...", "message": "Skill execution started" } """ from starlette.responses import JSONResponse if not settings.skills.enabled: return JSONResponse({"error": "Skills feature is disabled"}, status_code=503) skill_name = request.path_params["name"] try: body = await request.json() except Exception as e: return JSONResponse({"error": f"Invalid JSON body: {e}"}, status_code=400) url = body.get("url", "") params = body.get("params", {}) # Build task description task_desc = f"Use the {skill_name} skill" if url: task_desc += f" at {url}" # Create task ID for tracking task_id = str(uuid.uuid4()) task_store = get_task_store() task_record = TaskRecord( task_id=task_id, tool_name=f"skill_run:{skill_name}", status=TaskStatus.PENDING, input_params={"skill_name": skill_name, "url": url, "params": params}, ) await task_store.create_task(task_record) # Start execution in background async def execute_skill() -> None: """Background task to execute the skill.""" bind_task_context(task_id, f"skill_run:{skill_name}") task_logger = get_task_logger() try: llm, profile = _get_llm_and_profile() except LLMProviderError as e: logger.error(f"LLM initialization failed: {e}") await task_store.update_status(task_id, TaskStatus.FAILED, error=str(e)) clear_task_context() return await task_store.update_status(task_id, TaskStatus.RUNNING) task_logger.info("task_running") try: # Build agent with skill hints augmented_task = task_desc if skill_store: skill = skill_store.load(skill_name) if skill and skill_executor: merged_params = skill.merge_params(params) augmented_task = skill_executor.inject_hints(task_desc, skill, merged_params) agent = Agent( task=augmented_task, llm=llm, browser_profile=profile, max_steps=settings.agent.max_steps, ) # Register for cancellation agent_task = asyncio.create_task(agent.run()) _running_tasks[task_id] = agent_task try: result = await agent_task finally: _running_tasks.pop(task_id, None) final = result.final_result() or "Task completed without explicit result." # Record usage if skill_store: skill_store.record_usage(skill_name, success=True) await task_store.update_status(task_id, TaskStatus.COMPLETED, result=final) task_logger.info("task_completed", result_length=len(final)) except Exception as e: if skill_store: skill_store.record_usage(skill_name, success=False) await task_store.update_status(task_id, TaskStatus.FAILED, error=str(e)) task_logger.error("task_failed", error=str(e)) logger.error(f"Skill {skill_name} execution failed: {e}") finally: clear_task_context() # Start background task and keep reference to prevent garbage collection bg_task = asyncio.create_task(execute_skill()) # Store task reference to prevent GC _running_tasks[f"{task_id}_bg"] = bg_task return JSONResponse( { "task_id": task_id, "skill_name": skill_name, "message": "Skill execution started", "status_url": f"/api/tasks/{task_id}", }, status_code=202, ) @server.custom_route(path="/api/learn", methods=["POST"]) async def api_learn(request): """REST endpoint for learning mode. Request body: { "task": "Learn how to search on GitHub", "skill_name": "github_search" # Optional - name to save learned skill } Returns: { "task_id": "abc123...", "message": "Learning session started" } """ from starlette.responses import JSONResponse if not settings.skills.enabled: return JSONResponse({"error": "Skills feature is disabled"}, status_code=503) try: body = await request.json() except Exception as e: return JSONResponse({"error": f"Invalid JSON body: {e}"}, status_code=400) task_description = body.get("task") if not task_description: return JSONResponse({"error": "Missing required field: task"}, status_code=400) skill_name = body.get("skill_name") # Create task ID for tracking task_id = str(uuid.uuid4()) task_store = get_task_store() task_record = TaskRecord( task_id=task_id, tool_name="learn", status=TaskStatus.PENDING, input_params={"task": task_description, "skill_name": skill_name}, ) await task_store.create_task(task_record) # Start learning in background async def execute_learn() -> None: """Background task to execute learning mode.""" bind_task_context(task_id, "learn") task_logger = get_task_logger() try: llm, profile = _get_llm_and_profile() except LLMProviderError as e: logger.error(f"LLM initialization failed: {e}") await task_store.update_status(task_id, TaskStatus.FAILED, error=str(e)) clear_task_context() return await task_store.update_status(task_id, TaskStatus.RUNNING) task_logger.info("task_running") try: # Inject learning mode instructions augmented_task = task_description if skill_executor: augmented_task = skill_executor.inject_learning_mode(task_description) # Initialize recorder for learning mode recorder = SkillRecorder(task=task_description) agent = Agent( task=augmented_task, llm=llm, browser_profile=profile, max_steps=settings.agent.max_steps, ) # Attach recorder to CDP await agent.browser_session.start() await recorder.attach(agent.browser_session) recorder_attached = True # Register for cancellation agent_task = asyncio.create_task(agent.run()) _running_tasks[task_id] = agent_task try: result = await agent_task finally: _running_tasks.pop(task_id, None) final = result.final_result() or "Task completed without explicit result." # Extract skill from execution skill_extraction_result = "" if final and skill_name and skill_store: try: await recorder.finalize() await recorder.detach() recorder_attached = False recording = recorder.get_recording(result=final) # Analyze with LLM analyzer = SkillAnalyzer(llm) extracted_skill = await analyzer.analyze(recording) if extracted_skill: extracted_skill.name = skill_name skill_store.save(extracted_skill) skill_extraction_result = f"\n\n[SKILL LEARNED] Saved as '{skill_name}'" logger.info(f"Skill extracted and saved: {skill_name}") else: skill_extraction_result = "\n\n[SKILL NOT LEARNED] Could not extract API from execution" except Exception as e: logger.error(f"Skill extraction failed: {e}") skill_extraction_result = f"\n\n[SKILL EXTRACTION ERROR] {e}" finally: if recorder_attached: await recorder.detach() final_result = final + skill_extraction_result await task_store.update_status(task_id, TaskStatus.COMPLETED, result=final_result) task_logger.info("task_completed", result_length=len(final_result)) except Exception as e: await task_store.update_status(task_id, TaskStatus.FAILED, error=str(e)) task_logger.error("task_failed", error=str(e)) logger.error(f"Learning session failed: {e}") finally: clear_task_context() # Start background task and keep reference to prevent garbage collection bg_task = asyncio.create_task(execute_learn()) # Store task reference to prevent GC _running_tasks[f"{task_id}_bg"] = bg_task return JSONResponse( { "task_id": task_id, "learning_task": task_description, "skill_name": skill_name, "message": "Learning session started", "status_url": f"/api/tasks/{task_id}", }, status_code=202, ) # --- Server-Sent Events (SSE) Endpoints --- @server.custom_route(path="/api/events", methods=["GET"]) async def api_events(request): """SSE stream for real-time task updates. Streams task status changes and progress updates in real-time. Clients should connect once and listen for events. Event format: data: {"task_id": "...", "status": "...", "progress": {...}, "message": "..."} Heartbeat: : heartbeat Returns: StreamingResponse with text/event-stream content type """ import json from starlette.responses import StreamingResponse task_store = get_task_store() async def event_generator(): """Generate SSE events for task updates.""" try: last_task_states: dict[str, tuple[str, int, str]] = {} # task_id -> (status, progress_current, message) while True: # Get current running tasks running_tasks = await task_store.get_running_tasks() # Stream updates for tasks that changed for task in running_tasks: current_state = ( task.status.value, task.progress_current, task.progress_message or "", ) # Only send if state changed if task.task_id not in last_task_states or last_task_states[task.task_id] != current_state: event_data = { "task_id": task.task_id[:8], "full_task_id": task.task_id, "tool": task.tool_name, "status": task.status.value, "stage": task.stage.value if task.stage else None, "progress": { "current": task.progress_current, "total": task.progress_total, "percent": task.progress_percent, "message": task.progress_message, }, } yield f"data: {json.dumps(event_data)}\n\n" last_task_states[task.task_id] = current_state # Send heartbeat to keep connection alive yield ": heartbeat\n\n" # Wait before next update await asyncio.sleep(2) except asyncio.CancelledError: # Client disconnected logger.debug("SSE client disconnected from /api/events") raise return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Disable nginx buffering }, ) @server.custom_route(path="/api/tasks/{task_id}/logs", methods=["GET"]) async def api_task_logs(request): """SSE stream for individual task logs. Streams real-time updates for a specific task. Useful for monitoring long-running tasks in detail. Event format: data: {"status": "...", "progress": {...}, "stage": "...", "timestamp": "..."} Returns: StreamingResponse with text/event-stream content type """ import json from starlette.responses import StreamingResponse task_id = request.path_params["task_id"] task_store = get_task_store() # Find task by ID (exact or prefix match) task = await task_store.get_task(task_id) if not task: # Try prefix match tasks = await task_store.get_task_history(limit=100) for t in tasks: if t.task_id.startswith(task_id): task = t break if not task: from starlette.responses import JSONResponse return JSONResponse({"error": f"Task '{task_id}' not found"}, status_code=404) full_task_id = task.task_id async def log_generator(): """Generate SSE events for task-specific updates.""" try: last_state: tuple[str, int, str, str | None] | None = None # (status, progress_current, message, stage) while True: # Fetch latest task state current_task = await task_store.get_task(full_task_id) if not current_task: # Task was deleted or disappeared yield f"data: {json.dumps({'event': 'task_deleted'})}\n\n" break current_state = ( current_task.status.value, current_task.progress_current, current_task.progress_message or "", current_task.stage.value if current_task.stage else None, ) # Send update if state changed if current_state != last_state: # Use the most recent timestamp available timestamp = current_task.completed_at or current_task.started_at or current_task.created_at event_data = { "status": current_task.status.value, "stage": current_task.stage.value if current_task.stage else None, "progress": { "current": current_task.progress_current, "total": current_task.progress_total, "percent": current_task.progress_percent, "message": current_task.progress_message, }, "timestamp": timestamp.isoformat(), } # Include result/error if task completed/failed if current_task.status == TaskStatus.COMPLETED and current_task.result: event_data["result"] = current_task.result[:200] # Truncate for SSE elif current_task.status == TaskStatus.FAILED and current_task.error: event_data["error"] = current_task.error yield f"data: {json.dumps(event_data)}\n\n" last_state = current_state # Stop streaming if task reached terminal state if current_task.status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED): yield f"data: {json.dumps({'event': 'task_ended', 'status': current_task.status.value})}\n\n" break # Send heartbeat yield ": heartbeat\n\n" # Wait before next update await asyncio.sleep(1) except asyncio.CancelledError: # Client disconnected logger.debug(f"SSE client disconnected from /api/tasks/{task_id}/logs") raise return StreamingResponse( log_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Disable nginx buffering }, ) return server # Track server start time for uptime calculation _server_start_time = time.time() server_instance = serve() STDIO_DEPRECATION_MESSAGE = """ ╔══════════════════════════════════════════════════════════════════════════════╗ ║ ⚠️ STDIO TRANSPORT DEPRECATED ║ ╠══════════════════════════════════════════════════════════════════════════════╣ ║ ║ ║ Browser automation tasks take 60-120+ seconds, which causes timeouts ║ ║ with stdio transport. HTTP mode is now required for reliable operation. ║ ║ ║ ╠══════════════════════════════════════════════════════════════════════════════╣ ║ HOW TO MIGRATE ║ ╠══════════════════════════════════════════════════════════════════════════════╣ ║ ║ ║ 1. START THE HTTP SERVER (run this in terminal): ║ ║ ║ ║ uvx mcp-server-browser-use server ║ ║ ║ ║ 2. UPDATE YOUR CLAUDE DESKTOP CONFIG: ║ ║ ║ ║ Option A - Native HTTP (if your client supports it): ║ ║ { ║ ║ "mcpServers": { ║ ║ "browser-use": { ║ ║ "type": "streamable-http", ║ ║ "url": "http://localhost:8000/mcp" ║ ║ } ║ ║ } ║ ║ } ║ ║ ║ ║ Option B - Use mcp-remote bridge (works with any MCP client): ║ ║ { ║ ║ "mcpServers": { ║ ║ "browser-use": { ║ ║ "command": "npx", ║ ║ "args": ["mcp-remote", "http://localhost:8000/mcp"] ║ ║ } ║ ║ } ║ ║ } ║ ║ ║ ║ DOCUMENTATION: https://github.com/AiAscendant/mcp-browser-use ║ ║ ║ ╚══════════════════════════════════════════════════════════════════════════════╝ """ def main() -> None: """Entry point for MCP server.""" transport = settings.server.transport if transport == "stdio": # stdio is deprecated - print migration guide and exit print(STDIO_DEPRECATION_MESSAGE, file=sys.stderr) sys.exit(1) elif transport in ("streamable-http", "sse"): logger.info(f"Starting MCP browser-use server (provider: {settings.llm.provider}, transport: {transport})") logger.info(f"HTTP server at http://{settings.server.host}:{settings.server.port}/mcp") server_instance.run(transport=transport, host=settings.server.host, port=settings.server.port) else: raise ValueError(f"Unknown transport: {transport}") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Saik0s/mcp-browser-use'

If you have feedback or need assistance with the MCP directory API, please join our Discord server