Skip to main content
Glama

LLM Gateway MCP Server

smart_browser_demo.py35.5 kB
#!/usr/bin/env python """ DETAILED Demonstration script for the Smart Browser Tools in Ultimate MCP Server, showcasing browsing, interaction, search, download, macro, and autopilot features. """ import asyncio import logging import sys import time import traceback from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional, Tuple # Add project root to path for imports when running as script # Adjust this relative path if your script structure is different _PROJECT_ROOT = Path(__file__).resolve().parent.parent if str(_PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(_PROJECT_ROOT)) print(f"INFO: Added {_PROJECT_ROOT} to sys.path") # Rich imports for enhanced terminal UI from rich import box, get_console # noqa: E402 from rich.console import Group # noqa: E402 from rich.markup import escape # noqa: E402 from rich.panel import Panel # noqa: E402 from rich.rule import Rule # noqa: E402 from rich.table import Table # noqa: E402 from rich.text import Text # noqa: E402 from rich.traceback import install as install_rich_traceback # noqa: E402 # Initialize Rich console console = get_console() # Define a fallback logger in case the import fails def create_fallback_logger(name): logger = logging.getLogger(name) logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) return logger # Import Gateway and MCP components from ultimate_mcp_server.core.server import Gateway # noqa: E402 from ultimate_mcp_server.exceptions import ToolError, ToolInputError # noqa: E402 # Import smart browser tools directly from ultimate_mcp_server.tools.smart_browser import ( # noqa: E402 autopilot, browse, click, collect_documentation, download, download_site_pdfs, parallel, run_macro, search, shutdown, type_text, ) from ultimate_mcp_server.utils import get_logger # noqa: E402 from ultimate_mcp_server.utils.display import CostTracker # noqa: E402 # Initialize logger logger = get_logger("demo.smart_browser") # Install rich tracebacks install_rich_traceback(show_locals=True, width=console.width, extra_lines=2) # --- Configuration --- # Base directory for Smart Browser outputs SMART_BROWSER_INTERNAL_BASE = "storage/smart_browser_internal" # Relative path used by the tool SMART_BROWSER_DOWNLOADS_BASE = "storage/smart_browser_downloads" # Default download relative path DEMO_OUTPUTS_DIR = Path( "./sb_demo_outputs" ) # Local dir for demo-specific outputs like the test HTML # Example URLs for demo URL_EXAMPLE = "http://example.com" URL_BOOKSTORE = "http://books.toscrape.com/" URL_QUOTES = "http://quotes.toscrape.com/" URL_PDF_SAMPLE = "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf" URL_GITHUB = "https://github.com/features/copilot" # --- Demo Helper Functions (Unchanged from previous version) --- def timestamp_str(short: bool = False) -> str: """Return a formatted timestamp string.""" now = time.time() # Use time.time for consistency dt_now = datetime.fromtimestamp(now) if short: return f"[dim]{dt_now.strftime('%H:%M:%S')}[/]" return f"[dim]{dt_now.strftime('%Y-%m-%d %H:%M:%S')}[/]" def truncate_text_by_lines(text: str, max_lines: int = 50) -> str: """Truncates text to show first/last lines if too long.""" if not text: return "" lines = text.splitlines() if len(lines) <= max_lines: return text half_lines = max_lines // 2 # Ensure half_lines is at least 1 if max_lines >= 2 half_lines = max(1, half_lines) # Handle edge case where max_lines is 1 if max_lines == 1: return lines[0] + "\n[...TRUNCATED...]" # Return first half, separator, and last half return "\n".join(lines[:half_lines] + ["[...TRUNCATED...]"] + lines[-half_lines:]) def format_value(key: str, value: Any, detail_level: int = 1) -> Any: """Format specific values for display, returning strings with markup.""" if value is None: return "[dim]None[/]" # Keep markup if isinstance(value, bool): return "[green]Yes[/]" if value else "[red]No[/]" # Keep markup if isinstance(value, float): return f"{value:.3f}" # Return simple string if key.lower().endswith("time_seconds") or key.lower() == "duration_ms": try: val_s = float(value) / 1000.0 if key.lower() == "duration_ms" else float(value) return f"[green]{val_s:.3f}s[/]" # Keep markup except (ValueError, TypeError): return escape(str(value)) # Fallback for non-numeric time values if key.lower() == "size_bytes" and isinstance(value, int): if value < 0: return "[dim]N/A[/]" if value > 1024 * 1024: return f"{value / (1024 * 1024):.2f} MB" if value > 1024: return f"{value / 1024:.2f} KB" return f"{value} Bytes" # Return simple string if isinstance(value, list): if not value: return "[dim]Empty List[/]" # Keep markup list_len = len(value) preview_count = 3 if detail_level < 2 else 5 suffix = ( f" [dim]... ({list_len} items total)[/]" if list_len > preview_count else "" ) # Keep markup if detail_level >= 1: previews = [ str( format_value(f"{key}[{i}]", item, detail_level=0) ) # Recursive call returns string for i, item in enumerate(value[:preview_count]) ] return f"[{', '.join(previews)}]{suffix}" # Returns string with markup else: return f"[List with {list_len} items]" # Keep markup if isinstance(value, dict): if not value: return "[dim]Empty Dict[/]" # Keep markup dict_len = len(value) preview_count = 4 if detail_level < 2 else 8 preview_keys = list(value.keys())[:preview_count] suffix = ( f" [dim]... ({dict_len} keys total)[/]" if dict_len > preview_count else "" ) # Keep markup if detail_level >= 1: items_preview = [ # Key repr for clarity, value formatted recursively f"{repr(k)}: {str(format_value(k, value[k], detail_level=0))}" for k in preview_keys ] return f"{{{'; '.join(items_preview)}}}{suffix}" # Returns string with markup else: return f"[Dict with {dict_len} keys]" # Keep markup if isinstance(value, str): value_truncated = truncate_text_by_lines(value, 30) # Truncate by lines first preview_len = 300 if detail_level < 2 else 600 suffix = "" # Check length after line truncation if len(value_truncated) > preview_len: value_display = value_truncated[:preview_len] suffix = "[dim]... (truncated)[/]" # Keep markup else: value_display = value_truncated # Escape only if it doesn't look like it contains Rich markup if "[" in value_display and "]" in value_display and "/" in value_display: # Heuristic: Assume it might contain markup, don't escape return value_display + suffix else: # Safe to escape plain strings return escape(value_display) + suffix # Fallback: escape the string representation of other types return escape(str(value)) def display_page_state(state: Dict[str, Any], title: str = "Page State"): """Display the 'page_state' dictionary nicely.""" panel_content = [] url = state.get("url", "N/A") panel_content.append( Text.from_markup(f"[bold cyan]URL:[/bold cyan] [link={url}]{escape(url)}[/link]") ) panel_content.append( Text.from_markup(f"[bold cyan]Title:[/bold cyan] {escape(state.get('title', 'N/A'))}") ) main_text = state.get("main_text", "") if main_text: truncated_text = truncate_text_by_lines(main_text, 15) panel_content.append(Text.from_markup("\n[bold cyan]Main Text Summary:[/bold cyan]")) panel_content.append(Panel(escape(truncated_text), border_style="dim", padding=(0, 1))) elements = state.get("elements", []) if elements: elements_table = Table( title=Text.from_markup(f"Interactive Elements ({len(elements)} found)"), box=box.MINIMAL, show_header=True, padding=(0, 1), border_style="blue", ) elements_table.add_column("ID", style="magenta", no_wrap=True) elements_table.add_column("Tag", style="cyan") elements_table.add_column("Role", style="yellow") elements_table.add_column("Text Preview", style="white", max_width=60) elements_table.add_column("BBox", style="dim") preview_count = 15 for elem in elements[:preview_count]: elem_text_raw = elem.get("text", "") elem_text_preview = escape( elem_text_raw[:60] + ("..." if len(elem_text_raw) > 60 else "") ) bbox = elem.get("bbox", []) if len(bbox) == 4: bbox_str = f"({bbox[0]}x{bbox[1]}, {bbox[2]}w{bbox[3]}h)" else: bbox_str = "[Invalid Bbox]" elements_table.add_row( str(elem.get("id", "?")), str(elem.get("tag", "?")), str(elem.get("role", "")), elem_text_preview, # Pass escaped preview string bbox_str, ) if len(elements) > preview_count: elements_table.add_row( "...", Text.from_markup(f"[dim]{len(elements) - preview_count} more...[/]"), "", "", "", ) panel_content.append(Text.from_markup("\n[bold cyan]Elements:[/bold cyan]")) panel_content.append(elements_table) console.print( Panel( Group(*panel_content), title=Text.from_markup(title), border_style="blue", padding=(1, 2), expand=False, ) ) def display_result( title: str, result: Dict[str, Any], display_options: Optional[Dict] = None ) -> None: """Display operation result with enhanced formatting using Rich.""" display_options = display_options or {} console.print( Rule( Text.from_markup(f"[bold cyan]{escape(title)}[/] {timestamp_str(short=True)}"), style="cyan", ) ) success = result.get("success", False) detail_level = display_options.get("detail_level", 1) # Use _display_options from result if available, otherwise use passed options effective_display_options = result.get("_display_options", display_options) hide_keys_set = set( effective_display_options.get( "hide_keys", [ "success", "page_state", "results", "steps", "download", "final_page_state", "documentation", "raw_response", "raw_llm_response", "_display_options", # Also hide internal options ], ) ) # --- Status Panel --- status_panel_content = Text.from_markup( f"Status: {'[bold green]Success[/]' if success else '[bold red]Failed[/]'}\n" ) if not success: error_code = result.get("error_code", "N/A") error_msg = result.get("error", "Unknown error") status_panel_content.append( Text.from_markup(f"Error Code: [yellow]{escape(str(error_code))}[/]\n") ) status_panel_content.append( Text.from_markup(f"Message: [red]{escape(str(error_msg))}[/]\n") ) console.print( Panel( status_panel_content, title="Operation Status", border_style="red", padding=(1, 2), expand=False, ) ) else: console.print( Panel( status_panel_content, title="Operation Status", border_style="green", padding=(0, 1), expand=False, ) ) # --- Top Level Details --- details_table = Table( title="Result Summary", box=box.MINIMAL, show_header=False, padding=(0, 1) ) details_table.add_column("Key", style="cyan", justify="right", no_wrap=True) details_table.add_column("Value", style="white") has_details = False for key, value in result.items(): if key in hide_keys_set or key.startswith("_"): continue formatted_value = format_value(key, value, detail_level=detail_level) details_table.add_row( escape(str(key)), formatted_value ) # formatted_value is already string/markup has_details = True if has_details: console.print(details_table) # --- Special Section Displays --- # Page State if "page_state" in result and isinstance(result["page_state"], dict): display_page_state(result["page_state"], title="Page State After Action") elif "final_page_state" in result and isinstance(result["final_page_state"], dict): display_page_state(result["final_page_state"], title="Final Page State") # Search Results if "results" in result and isinstance(result["results"], list) and "query" in result: search_results = result["results"] search_table = Table( title=Text.from_markup( f"Search Results for '{escape(result['query'])}' ({len(search_results)} found)" ), box=box.ROUNDED, show_header=True, padding=(0, 1), ) search_table.add_column("#", style="dim") search_table.add_column("Title", style="cyan") search_table.add_column("URL", style="blue", no_wrap=False) search_table.add_column("Snippet", style="white", no_wrap=False) for i, item in enumerate(search_results, 1): title = truncate_text_by_lines(item.get("title", ""), 3) snippet = truncate_text_by_lines(item.get("snippet", ""), 5) url = item.get("url", "") search_table.add_row( str(i), escape(title), f"[link={url}]{escape(url)}[/link]", escape(snippet) ) console.print(search_table) # Download Result if "download" in result and isinstance(result["download"], dict): dl_info = result["download"] dl_table = Table( title="Download Details", box=box.MINIMAL, show_header=False, padding=(0, 1) ) dl_table.add_column("Metric", style="cyan", justify="right") dl_table.add_column("Value", style="white") dl_table.add_row("File Path", escape(dl_info.get("file_path", "N/A"))) dl_table.add_row("File Name", escape(dl_info.get("file_name", "N/A"))) dl_table.add_row("SHA256", escape(dl_info.get("sha256", "N/A"))) dl_table.add_row("Size", format_value("size_bytes", dl_info.get("size_bytes", -1))) dl_table.add_row("Source URL", escape(dl_info.get("url", "N/A"))) dl_table.add_row( "Tables Extracted", format_value("tables_extracted", dl_info.get("tables_extracted", False)), ) if dl_info.get("tables"): # format_value handles potential markup in table preview string dl_table.add_row("Table Preview", format_value("tables", dl_info.get("tables"))) console.print( Panel(dl_table, title="Download Result", border_style="green", padding=(1, 2)) ) # Macro/Autopilot Steps if "steps" in result and isinstance(result["steps"], list): steps = result["steps"] steps_table = Table( title=Text.from_markup(f"Macro/Autopilot Steps ({len(steps)} executed)"), box=box.ROUNDED, show_header=True, padding=(0, 1), ) steps_table.add_column("#", style="dim") steps_table.add_column("Action/Tool", style="cyan") steps_table.add_column("Arguments/Hint", style="white", no_wrap=False) steps_table.add_column("Status", style="yellow") steps_table.add_column("Result/Error", style="white", no_wrap=False) for i, step in enumerate(steps, 1): action = step.get("action", step.get("tool", "?")) args = step.get("args") # Check if 'args' exists if args is None: # If no 'args', use the step itself excluding status keys args = { k: v for k, v in step.items() if k not in ["action", "tool", "success", "result", "error", "step", "duration_ms"] } args_preview = format_value("args", args, detail_level=0) # format_value handles markup success_step = step.get("success", False) status = "[green]OK[/]" if success_step else "[red]FAIL[/]" # Markup string outcome = step.get("result", step.get("error", "")) outcome_preview = format_value( "outcome", outcome, detail_level=0 ) # format_value handles markup steps_table.add_row(str(i), escape(action), args_preview, status, outcome_preview) console.print(steps_table) # Documentation (assuming it's stored under 'file_path' key now) if ( "file_path" in result and result.get("pages_collected") is not None ): # Check for doc collection result structure doc_file_path = result.get("file_path") pages_collected = result.get("pages_collected") if doc_file_path and pages_collected > 0: content_to_display: Any = f"[dim]Documentation saved to: {escape(doc_file_path)}[/]" try: with open(doc_file_path, "r", encoding="utf-8") as f: content = f.read(1500) # Read preview content_to_display += f"\n\n[bold]File Preview ({len(content)} chars):[/]\n" content_to_display += escape(content) + "\n[dim]...[/]" except Exception as e: content_to_display += f"\n[yellow]Could not read file preview: {escape(str(e))}[/]" console.print( Panel( Text.from_markup(content_to_display), title=f"Collected Documentation ({pages_collected} pages)", border_style="magenta", padding=(1, 2), ) ) console.print() # Add spacing async def safe_tool_call( operation_name: str, tool_func: callable, *args, tracker: Optional[CostTracker] = None, **kwargs ) -> Tuple[bool, Dict[str, Any]]: """Safely call a tool function, handling exceptions and logging.""" console.print( f"\n[cyan]Calling Tool:[/][bold] {escape(operation_name)}[/] {timestamp_str(short=True)}" ) display_options = kwargs.pop("display_options", {}) log_args_repr = {} MAX_ARG_LEN = 100 for k, v in kwargs.items(): try: if isinstance(v, (str, bytes)) and len(v) > MAX_ARG_LEN: log_args_repr[k] = f"{type(v).__name__}(len={len(v)})" elif isinstance(v, (list, dict)) and len(v) > 10: log_args_repr[k] = f"{type(v).__name__}(len={len(v)})" else: log_args_repr[k] = repr(v) except Exception: # Handle potential errors during repr() log_args_repr[k] = f"<{type(v).__name__} repr_error>" logger.debug(f"Executing {operation_name} with args: {args}, kwargs: {log_args_repr}") try: # Call the tool function directly result = await tool_func(*args, **kwargs) if not isinstance(result, dict): logger.error(f"Tool '{operation_name}' returned non-dict type: {type(result)}") return False, { "success": False, "error": f"Tool returned unexpected type: {type(result).__name__}", "error_code": "INTERNAL_ERROR", "_display_options": display_options, } # Store display options within the result for the display function result["_display_options"] = display_options logger.debug(f"Tool '{operation_name}' completed.") # Add success=True if missing and no error key present (should usually be set by tool) if "success" not in result and "error" not in result: result["success"] = True return result.get("success", False), result # Return success flag and the result dict except ToolInputError as e: logger.warning(f"Input error for {operation_name}: {e}") return False, { "success": False, "error": str(e), "error_code": getattr(e, "error_code", "INPUT_ERROR"), "_display_options": display_options, } except ToolError as e: logger.error(f"Tool error during {operation_name}: {e}", exc_info=True) return False, { "success": False, "error": str(e), "error_code": getattr(e, "error_code", "TOOL_ERROR"), "_display_options": display_options, } except Exception as e: logger.error(f"Unexpected error during {operation_name}: {e}", exc_info=True) tb_str = traceback.format_exc(limit=1) return False, { "success": False, "error": f"{type(e).__name__}: {e}\n{tb_str}", "error_type": type(e).__name__, "error_code": "UNEXPECTED_ERROR", "_display_options": display_options, } # --- Demo Sections --- async def demo_section_1_browse(gateway, tracker: CostTracker) -> None: console.print(Rule("[bold green]Demo 1: Basic Browsing[/]", style="green")) logger.info("Starting Demo Section 1: Basic Browsing") # 1a: Browse Example.com success, result = await safe_tool_call( "Browse Example.com", browse, url=URL_EXAMPLE, tracker=tracker ) display_result("Browse Example.com", result) # 1b: Browse Bookstore (wait for specific element) success, result = await safe_tool_call( "Browse Bookstore (wait for footer)", browse, url=URL_BOOKSTORE, wait_for_selector="footer.footer", tracker=tracker, ) display_result("Browse Bookstore (Wait)", result) async def demo_section_2_interaction(gateway, tracker: CostTracker) -> None: console.print(Rule("[bold green]Demo 2: Page Interaction[/]", style="green")) logger.info("Starting Demo Section 2: Page Interaction") # 2a: Search on Bookstore console.print(f"--- Scenario: Search for 'Science' on {URL_BOOKSTORE} ---") success, initial_state_res = await safe_tool_call( "Load Bookstore Search Page", browse, url=URL_BOOKSTORE, tracker=tracker, ) if not success: console.print("[red]Cannot proceed with interaction demo, failed to load page.[/]") return display_result("Bookstore Initial State", initial_state_res) # Fill the search form using task hints fields_to_type = [ {"task_hint": "The search input field", "text": "Science", "enter": False}, ] success, fill_res = await safe_tool_call( "Type into Bookstore Search Form", type_text, url=URL_BOOKSTORE, fields=fields_to_type, submit_hint="The search button", wait_after_submit_ms=1500, tracker=tracker, ) display_result("Type into Bookstore Search Form", fill_res) # 2b: Click the first search result (if successful) if success: console.print("--- Scenario: Click the first search result ---") current_url = fill_res.get("page_state", {}).get("url", URL_BOOKSTORE) success, click_res = await safe_tool_call( "Click First Book Result", click, url=current_url, task_hint="The link for the first book shown in the results list", wait_ms=1000, tracker=tracker, ) display_result("Click First Book Result", click_res) async def demo_section_3_search(gateway, tracker: CostTracker) -> None: console.print(Rule("[bold green]Demo 3: Web Search[/]", style="green")) logger.info("Starting Demo Section 3: Web Search") search_query = "latest advancements in large language models" # 3a: Search Bing success, result = await safe_tool_call( "Search Bing", search, query=search_query, engine="bing", max_results=5, tracker=tracker, ) display_result(f"Search Bing: '{search_query}'", result) # 3b: Search DuckDuckGo success, result = await safe_tool_call( "Search DuckDuckGo", search, query=search_query, engine="duckduckgo", max_results=5, tracker=tracker, ) display_result(f"Search DuckDuckGo: '{search_query}'", result) async def demo_section_4_download(gateway, tracker: CostTracker) -> None: console.print(Rule("[bold green]Demo 4: File Download[/]", style="green")) logger.info("Starting Demo Section 4: File Download") # Ensure local demo output dir exists DEMO_OUTPUTS_DIR_ABS = DEMO_OUTPUTS_DIR.resolve(strict=False) # Resolve to absolute, allow non-existent DEMO_OUTPUTS_DIR_ABS.mkdir(parents=True, exist_ok=True) # Ensure it exists after resolving # Create the parent directory for PDF downloads if it doesn't exist pdf_parent_dir = "storage/smart_browser_site_pdfs" console.print(f"[cyan]Creating parent directory for PDFs: {pdf_parent_dir}[/cyan]") from ultimate_mcp_server.tools.filesystem import create_directory parent_dir_result = await create_directory(path=pdf_parent_dir) if not parent_dir_result.get("success", False): console.print(f"[yellow]Warning: Could not create parent directory: {parent_dir_result.get('error', 'Unknown error')}[/yellow]") else: console.print(f"[green]Successfully created parent directory: {pdf_parent_dir}[/green]") # 4a: Download PDFs from a site console.print("--- Scenario: Find and Download PDFs from Example.com ---") success, result = await safe_tool_call( "Download PDFs from Example.com", download_site_pdfs, start_url=URL_EXAMPLE, max_depth=1, max_pdfs=5, dest_subfolder="example_com_pdfs", tracker=tracker, ) display_result("Download PDFs from Example.com", result) if result.get("pdf_count", 0) == 0: console.print("[yellow]Note: No PDFs found on example.com as expected.[/]") # 4b: Click-based download download_page_content = f""" <!DOCTYPE html> <html><head><title>Download Test</title></head> <body><h1>Download Page</h1> <p>Click the link to download a dummy PDF.</p> <a href="{URL_PDF_SAMPLE}" id="downloadLink">Download Dummy PDF Now</a> <p>Another paragraph.</p> </body></html> """ download_page_path = DEMO_OUTPUTS_DIR_ABS / "download_test.html" try: download_page_path.write_text(download_page_content, encoding="utf-8") local_url = download_page_path.as_uri() console.print("\n--- Scenario: Click a link to download a file ---") success, result = await safe_tool_call( "Click to Download PDF", download, url=local_url, task_hint="The 'Download Dummy PDF Now' link", dest_dir="storage/sb_demo_outputs/clicked_downloads", # Adjusted path tracker=tracker, ) display_result("Click to Download PDF", result) except Exception as e: console.print(f"[red]Error setting up or running click-download demo: {e}[/]") finally: if download_page_path.exists(): try: download_page_path.unlink() except OSError: pass async def demo_section_5_macro(gateway, tracker: CostTracker) -> None: console.print(Rule("[bold green]Demo 5: Execute Macro[/]", style="green")) logger.info("Starting Demo Section 5: Execute Macro") macro_task = f"Go to {URL_BOOKSTORE}, search for 'History', find the book 'Sapiens: A Brief History of Humankind', and click its link." console.print("--- Scenario: Execute Macro ---") console.print(f"[italic]Task:[/italic] {escape(macro_task)}") success, result = await safe_tool_call( "Execute Bookstore Search Macro", run_macro, url=URL_BOOKSTORE, task=macro_task, max_rounds=5, tracker=tracker, ) display_result("Execute Bookstore Search Macro", result) async def demo_section_6_autopilot(gateway, tracker: CostTracker) -> None: console.print(Rule("[bold green]Demo 6: Autopilot[/]", style="green")) logger.info("Starting Demo Section 6: Autopilot") autopilot_task = "Search the web for the official documentation URL of the 'httpx' Python library, then browse that URL and summarize the main page content." console.print("--- Scenario: Autopilot ---") console.print(f"[italic]Task:[/italic] {escape(autopilot_task)}") success, result = await safe_tool_call( "Run Autopilot: Find httpx Docs", autopilot, task=autopilot_task, max_steps=8, scratch_subdir="autopilot_demo", tracker=tracker, ) display_result("Run Autopilot: Find httpx Docs", result) if result.get("run_log"): console.print(f"[dim]Autopilot run log saved to: {result['run_log']}[/]") async def demo_section_7_parallel(gateway, tracker: CostTracker) -> None: console.print(Rule("[bold green]Demo 7: Parallel Processing[/]", style="green")) logger.info("Starting Demo Section 7: Parallel Processing") urls_to_process = [ URL_EXAMPLE, URL_BOOKSTORE, URL_QUOTES, "http://httpbin.org/delay/1", "https://webscraper.io/test-sites/e-commerce/static", ] console.print("--- Scenario: Get Page State for Multiple URLs in Parallel ---") console.print(f"[dim]URLs:[/dim] {urls_to_process}") success, result = await safe_tool_call( "Parallel Get Page State", parallel, urls=urls_to_process, action="get_state", # Only 'get_state' supported currently # max_tabs=3 # Can override default here if needed tracker=tracker, ) # Custom display for parallel results (same logic as before) console.print(Rule("[bold cyan]Parallel Processing Results[/]", style="cyan")) if success: console.print(f"Total URLs Processed: {result.get('processed_count', 0)}") console.print(f"Successful: {result.get('successful_count', 0)}") console.print("-" * 20) for i, item_result in enumerate(result.get("results", [])): url = item_result.get("url", f"URL {i + 1}") item_success = item_result.get("success", False) panel_title = f"Result for: {escape(url)}" border = "green" if item_success else "red" content = "" if item_success: state = item_result.get("page_state", {}) content = f"Title: {escape(state.get('title', 'N/A'))}\nElements Found: {len(state.get('elements', []))}" else: content = f"[red]Error:[/red] {escape(item_result.get('error', 'Unknown'))}" console.print( Panel(content, title=panel_title, border_style=border, padding=(0, 1), expand=False) ) else: console.print( Panel( f"[red]Parallel processing tool call failed:[/red]\n{escape(result.get('error', '?'))}", border_style="red", ) ) console.print() async def demo_section_8_docs(gateway, tracker: CostTracker) -> None: console.print(Rule("[bold green]Demo 8: Documentation Collection[/]", style="green")) logger.info("Starting Demo Section 8: Documentation Collection") package_name = "fastapi" # Use a different package console.print(f"--- Scenario: Collect Documentation for '{package_name}' ---") success, result = await safe_tool_call( f"Collect Docs: {package_name}", collect_documentation, package=package_name, max_pages=15, rate_limit_rps=2.0, tracker=tracker, ) # Use the updated display logic that looks for file_path and pages_collected display_result(f"Collect Docs: {package_name}", result) # --- Main Function --- async def main() -> int: """Run the SmartBrowser tools demo.""" console.print(Rule("[bold magenta]Smart Browser Tools Demo[/bold magenta]")) exit_code = 0 gateway = None # Ensure local demo output directory exists DEMO_OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) console.print(f"[dim]Demo-specific outputs will be saved in: {DEMO_OUTPUTS_DIR}[/]") try: # --- Initialize Gateway for providers only --- console.print("[cyan]Initializing MCP Gateway...[/]") gateway = Gateway("smart-browser-demo") console.print("[cyan]Initializing Providers (for LLM tools)...[/]") await gateway._initialize_providers() # --- Initialize Smart Browser module --- console.print("[cyan]Initializing Smart Browser tool...[/]") # await initialize() # Initialize CostTracker tracker = CostTracker() # Run Demo Sections (passing gateway and tracker) await demo_section_1_browse(gateway, tracker) await demo_section_2_interaction(gateway, tracker) await demo_section_3_search(gateway, tracker) await demo_section_4_download(gateway, tracker) await demo_section_5_macro(gateway, tracker) await demo_section_6_autopilot(gateway, tracker) # Uncomment to run autopilot # console.print( # "[yellow]Skipping Autopilot demo section (can be intensive). Uncomment to run.[/]" # ) await demo_section_7_parallel(gateway, tracker) await demo_section_8_docs(gateway, tracker) console.print(Rule("[bold magenta]Demo Complete[/bold magenta]")) except Exception as e: logger.critical(f"Demo failed with critical error: {e}", exc_info=True) console.print("[bold red]CRITICAL ERROR DURING DEMO:[/]") console.print_exception(show_locals=True) exit_code = 1 finally: # Shutdown Smart Browser console.print("[cyan]Shutting down Smart Browser tool...[/]") try: await shutdown() except Exception as e: logger.error(f"Error during Smart Browser shutdown: {e}") return exit_code if __name__ == "__main__": # Ensure the script is run with asyncio try: exit_code = asyncio.run(main()) sys.exit(exit_code) except KeyboardInterrupt: console.print("\n[yellow]Demo interrupted by user. Shutting down...[/]") # Try to run shutdown asynchronously even on keyboard interrupt try: asyncio.run(shutdown()) except Exception as e: print(f"Error during emergency shutdown: {e}") sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dicklesworthstone/llm_gateway_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server