Skip to main content
Glama

Scenic MCP

tools.ex•26.7 kB
defmodule ScenicMcp.Tools do @moduledoc """ Tool handlers for MCP server. Provides handlers for: - Keyboard input - Mouse input - Screenshot capture - Viewport inspection All functions return either `{:ok, result}` or `{:error, reason}` tuples for consistent error handling. """ require Logger # ======================================================================== # Scenic Process Lookup # ======================================================================== @doc """ Get the PID of the configured viewport process. Returns `{:ok, pid}` if found, or `{:error, reason}` if not found. """ @spec viewport_pid() :: {:ok, pid()} | {:error, String.t()} def viewport_pid do viewport_name = ScenicMcp.Config.viewport_name() case Process.whereis(viewport_name) do nil -> {:error, "Unable to find Scenic viewport process ':#{viewport_name}'. " <> "Ensure your Scenic application is running and the viewport is registered with this name. " <> "You can configure a different name with: config :scenic_mcp, viewport_name: :your_name"} pid -> {:ok, pid} end end @doc """ Get the viewport state. Returns `{:ok, state}` if successful, or `{:error, reason}` if the viewport cannot be found or the state cannot be retrieved. """ @spec viewport_state() :: {:ok, map()} | {:error, String.t()} def viewport_state do with {:ok, pid} <- viewport_pid() do state = :sys.get_state(pid, 5000) {:ok, state} end catch :exit, reason -> {:error, "Failed to get viewport state: #{inspect(reason)}"} end @doc """ Get the PID of the configured driver process. Returns `{:ok, pid}` if found, or `{:error, reason}` if not found. """ @spec driver_pid() :: {:ok, pid()} | {:error, String.t()} def driver_pid do driver_name = ScenicMcp.Config.driver_name() case Process.whereis(driver_name) do pid when is_pid(pid) -> {:ok, pid} _otherwise -> {:error, "Unable to find Scenic driver process ':#{driver_name}'"} end catch :exit, reason -> {:error, "Failed to find driver process: #{inspect(reason)}"} end @doc """ Get the driver state. Returns `{:ok, state}` if successful, or `{:error, reason}` if the driver cannot be found or the state cannot be retrieved. """ @spec driver_state() :: {:ok, any()} | {:error, String.t()} def driver_state do with {:ok, pid} <- driver_pid() do state = :sys.get_state(pid, 5000) {:ok, state} end catch :exit, reason -> {:error, "Failed to get driver state: #{inspect(reason)}"} end # ======================================================================== # Input Handling # ======================================================================== @doc """ Send input to the Scenic driver. Returns `{:ok, :sent}` if successful, or `{:error, reason}` if the driver cannot be found or the input cannot be sent. """ @spec send_input(any()) :: {:ok, :sent} | {:error, String.t()} def send_input(input) do with {:ok, driver_struct} <- driver_state() do Scenic.Driver.send_input(driver_struct, input) {:ok, :sent} end catch :exit, reason -> {:error, "Failed to send input: #{inspect(reason)}"} end # ======================================================================== # Tool Handlers (called from server.ex) # ======================================================================== @doc """ Handle keyboard input. Accepts either: - `%{"text" => string}` - Type text character by character - `%{"key" => string, "modifiers" => list}` - Press special key with optional modifiers Returns `{:ok, result_map}` or `{:error, reason}`. """ @spec handle_send_keys(map()) :: {:ok, map()} | {:error, String.t()} def handle_send_keys(%{"text" => text}) when is_binary(text) do with {:ok, driver_struct} <- driver_state() do text |> String.graphemes() |> Enum.each(fn char -> codepoint = char |> String.to_charlist() |> List.first() Scenic.Driver.send_input(driver_struct, {:codepoint, {codepoint, []}}) end) {:ok, %{status: "ok", message: "Text sent: #{text}"}} end end def handle_send_keys(%{"key" => key} = params) when is_binary(key) do with {:ok, driver_struct} <- driver_state() do modifiers = parse_modifiers(Map.get(params, "modifiers", [])) key_atom = normalize_key(key) # Key state: 1 = press, 0 = release Scenic.Driver.send_input(driver_struct, {:key, {key_atom, 1, modifiers}}) Process.sleep(10) Scenic.Driver.send_input(driver_struct, {:key, {key_atom, 0, modifiers}}) {:ok, %{status: "ok", message: "Key sent: #{key}"}} end end def handle_send_keys(_params) do {:error, "Invalid parameters: must provide either 'text' or 'key' parameter"} end @doc """ Handle mouse movement. Accepts `%{"x" => number, "y" => number}`. Returns `{:ok, result_map}` or `{:error, reason}`. """ @spec handle_mouse_move(map()) :: {:ok, map()} | {:error, String.t()} def handle_mouse_move(%{"x" => x, "y" => y}) do with {:ok, driver_struct} <- driver_state() do Scenic.Driver.send_input(driver_struct, {:cursor_pos, {x, y}}) {:ok, %{status: "ok", message: "Mouse moved to (#{x}, #{y})"}} end end def handle_mouse_move(_params) do {:error, "Invalid parameters: must provide 'x' and 'y' coordinates"} end @doc """ Handle mouse clicks. Accepts `%{"x" => number, "y" => number, "button" => string}`. Button is optional and defaults to "left". Returns `{:ok, result_map}` or `{:error, reason}`. """ @spec handle_mouse_click(map()) :: {:ok, map()} | {:error, String.t()} def handle_mouse_click(%{"x" => x, "y" => y} = params) do with {:ok, driver_struct} <- driver_state() do button = parse_button(Map.get(params, "button", "left")) # Move to position Scenic.Driver.send_input(driver_struct, {:cursor_pos, {x, y}}) # Click - Scenic format: {:cursor_button, {button, state, modifiers, coords}} # state: 1 = press, 0 = release Scenic.Driver.send_input(driver_struct, {:cursor_button, {button, 1, [], {x, y}}}) Process.sleep(10) Scenic.Driver.send_input(driver_struct, {:cursor_button, {button, 0, [], {x, y}}}) {:ok, %{status: "ok", message: "Mouse clicked at (#{x}, #{y})"}} end end def handle_mouse_click(_params) do {:error, "Invalid parameters: must provide 'x' and 'y' coordinates"} end @doc """ Get viewport graph information. Returns `{:ok, result_map}` with viewport structure information, or `{:error, reason}` if the viewport cannot be inspected. """ @spec handle_get_scenic_graph(any()) :: {:ok, map()} | {:error, String.t()} def handle_get_scenic_graph(_args \\ nil) do with {:ok, vp_state} <- viewport_state() do case vp_state do %{script_table: script_table} = state when script_table != nil -> scripts = :ets.tab2list(script_table) visual_description = build_scene_description(scripts) semantic_info = build_semantic_description(Map.get(state, :semantic_table)) {:ok, %{ status: "ok", script_count: length(scripts), visual_description: visual_description, semantic_elements: semantic_info, raw_scripts: Enum.map(scripts, fn {id, _compiled, _pid} -> id {id, _compiled} -> id end) }} _ -> {:error, "No script table found in viewport state. The viewport may not be fully initialized. State keys: #{inspect(Map.keys(vp_state))}"} end end end @doc """ Find clickable elements in the current viewport. Returns `{:ok, result_map}` with a list of clickable elements and their center coordinates, or `{:error, reason}` if elements cannot be found. Optional params: - `filter`: Filter by element ID (matches against atom keys) """ @spec find_clickable_elements(map()) :: {:ok, map()} | {:error, String.t()} def find_clickable_elements(params) do with {:ok, vp_state} <- viewport_state() do case Map.get(vp_state, :semantic_table) do nil -> {:error, "No semantic table found - the viewport may not have semantic DOM enabled"} semantic_table -> # Flatten nested elements structure and keep best version (prefer :_root_) flat_elements = :ets.tab2list(semantic_table) |> Enum.flat_map(fn {graph_key, data} -> timestamp = Map.get(data, :timestamp, 0) case Map.get(data, :elements) do elements when is_map(elements) -> Enum.map(elements, fn {id, element_info} -> {id, element_info, timestamp, graph_key} end) _ -> [] end end) # Keep the best version of each ID (prefer :_root_, otherwise latest timestamp) |> Enum.group_by(fn {id, _element_info, _timestamp, _graph_key} -> id end) |> Enum.map(fn {_id, versions} -> # Write debug to file debug_data = versions |> Enum.map(fn {id, info, ts, gk} -> bounds = Map.get(info, :bounds) || Map.get(Map.get(info, :semantic, %{}), :bounds) " {#{inspect(id)}, #{inspect(gk)}, ts: #{ts}, bounds: #{inspect(bounds)}}\n" end) |> Enum.join("") File.write!("/tmp/scenic_mcp_debug.txt", "Versions:\n#{debug_data}", [:append]) best_version = Enum.max_by(versions, fn {_id, _info, ts, graph_key} -> root_priority = if graph_key in [:_root_, "_root_"], do: 1_000_000_000_000, else: 0 root_priority + ts end) {id, element_info, _timestamp, graph_key} = best_version File.write!("/tmp/scenic_mcp_debug.txt", "Selected: #{inspect(id)} from #{inspect(graph_key)}, bounds: #{inspect(Map.get(element_info, :bounds))}\n\n", [:append]) {id, element_info} end) filter = Map.get(params, "filter") clickable_elements = flat_elements |> Enum.filter(fn {_id, element_info} -> # Check for clickable in semantic (component registration) or top-level (manual registration) Map.get(element_info, :clickable, false) || (element_info |> Map.get(:semantic, %{}) |> Map.get(:clickable, false)) end) |> maybe_filter_by_id(filter) |> Enum.map(fn {id, element_info} -> # Calculate center coordinates from bounds and transforms semantic = Map.get(element_info, :semantic, %{}) # Check for bounds at top level first (manual registration), then in semantic (component) bounds = Map.get(element_info, :bounds) || Map.get(semantic, :bounds) transforms = Map.get(element_info, :transforms, %{}) center = calculate_center_with_transforms(bounds, transforms) %{ id: inspect(id), raw_id: id, type: Map.get(element_info, :type) || Map.get(semantic, :type, :unknown), bounds: bounds, center: center, transforms: transforms, data: semantic } |> sanitize_for_json() end) {:ok, %{ status: "ok", count: length(clickable_elements), elements: clickable_elements }} end end end @doc """ Click on an element by its semantic ID. This is a high-level convenience function similar to Playwright's `page.click(selector)`. It finds the element, calculates its center, and clicks it automatically. Params: - `element_id`: The semantic ID to click (string or atom, e.g., ":load_component_button") Returns `{:ok, result_map}` with click details, or `{:error, reason}` if element not found. """ @spec click_element(map()) :: {:ok, map()} | {:error, String.t()} def click_element(%{"element_id" => element_id}) when is_binary(element_id) do with {:ok, result} <- find_clickable_elements(%{"filter" => element_id}), element <- List.first(result.elements) do if element do # Handle both atom and string keys (after sanitization) center = get_in_sanitized(element, [:center]) || get_in_sanitized(element, ["center"]) case center do %{"x" => x, "y" => y} when is_number(x) and is_number(y) -> # Click at the element's center case handle_mouse_click(%{"x" => x, "y" => y}) do {:ok, _click_result} -> {:ok, %{ status: "ok", message: "Clicked element #{element_id}", element: element, clicked_at: %{x: x, y: y} }} {:error, reason} -> {:error, "Failed to click element: #{reason}"} end _ -> {:error, "Element found but has no valid center coordinates: #{inspect(center)}"} end else {:error, "Element '#{element_id}' not found or not clickable"} end end end def click_element(_params) do {:error, "Invalid parameters: must provide 'element_id' parameter"} end @doc """ Move mouse to hover over an element by its semantic ID. Similar to Playwright's hover functionality - finds the element and moves the mouse to its center without clicking. Params: - `element_id`: The semantic ID to hover over (string or atom) Returns `{:ok, result_map}` with hover details, or `{:error, reason}` if element not found. """ @spec hover_element(map()) :: {:ok, map()} | {:error, String.t()} def hover_element(%{"element_id" => element_id}) when is_binary(element_id) do with {:ok, result} <- find_clickable_elements(%{"filter" => element_id}), element <- List.first(result.elements) do if element do # Handle both atom and string keys (after sanitization) center = get_in_sanitized(element, [:center]) || get_in_sanitized(element, ["center"]) case center do %{"x" => x, "y" => y} when is_number(x) and is_number(y) -> case handle_mouse_move(%{"x" => x, "y" => y}) do {:ok, _move_result} -> {:ok, %{ status: "ok", message: "Hovering over element #{element_id}", element: element, position: %{x: x, y: y} }} {:error, reason} -> {:error, "Failed to move mouse to element: #{reason}"} end _ -> {:error, "Element found but has no valid center coordinates: #{inspect(center)}"} end else {:error, "Element '#{element_id}' not found"} end end end def hover_element(_params) do {:error, "Invalid parameters: must provide 'element_id' parameter"} end @doc """ Capture a screenshot of the Scenic application. Accepts `%{"format" => "path" | "base64", "filename" => string}`. Both parameters are optional. Returns `{:ok, result_map}` with screenshot information, or `{:error, reason}` if the screenshot cannot be captured. """ @spec take_screenshot(map()) :: {:ok, map()} | {:error, String.t()} def take_screenshot(params) do format = Map.get(params, "format", "path") filename = Map.get(params, "filename") path = if filename do if String.ends_with?(filename, ".png") do filename else filename <> ".png" end else timestamp = DateTime.utc_now() |> DateTime.to_string() |> String.replace(~r/[:\s]/, "_") "/tmp/scenic_screenshot_#{timestamp}.png" end with {:ok, vp_pid} <- viewport_pid(), {:ok, driver_pid} <- get_driver_from_viewport(vp_pid), driver_state <- :sys.get_state(driver_pid), :ok <- Scenic.Driver.Local.screenshot(driver_state, path) do if format == "base64" do case File.read(path) do {:ok, binary} -> encoded = Base.encode64(binary) {:ok, %{ status: "ok", path: path, format: "base64", data: encoded, size: byte_size(binary) }} {:error, reason} -> {:error, "Failed to read screenshot file: #{inspect(reason)}"} end else {:ok, %{status: "ok", path: path, format: "path"}} end else {:error, reason} -> {:error, reason} other -> {:error, "Screenshot failed: #{inspect(other)}"} end end # ======================================================================== # Helper Functions # ======================================================================== defp get_driver_from_viewport(vp_pid) do case :sys.get_state(vp_pid) do %{driver_pids: [driver_pid | _]} -> {:ok, driver_pid} _ -> {:error, "No driver found in viewport state. Ensure your Scenic driver is properly configured."} end catch :exit, reason -> {:error, "Failed to get driver from viewport: #{inspect(reason)}"} end defp build_scene_description(scripts) do scripts |> Enum.map(fn {id, _compiled_script, _pid} -> {id, nil} {id, _compiled_script} -> {id, nil} other -> {other, nil} end) |> Enum.map(fn {id, _} -> component_name = case id do {name, _uid} when is_atom(name) -> Atom.to_string(name) name when is_atom(name) -> Atom.to_string(name) _ -> inspect(id) end %{ id: inspect(id), component: component_name } end) |> Enum.group_by(& &1.component) |> Enum.map(fn {component, items} -> "#{component} (#{length(items)} instances)" end) |> Enum.join(", ") end defp build_semantic_description(nil), do: %{count: 0, elements: [], summary: "No semantic DOM available"} defp build_semantic_description(semantic_table) do try do # Each entry in semantic_table is {graph_key, %{elements: %{id => element_info}, timestamp: ...}} # We need to flatten the nested elements map # IMPORTANT: Keep only the LATEST version of each element ID (by timestamp) raw_table = :ets.tab2list(semantic_table) IO.inspect(raw_table |> Enum.map(fn {key, data} -> {key, Map.keys(Map.get(data, :elements, %{})), Map.get(data, :timestamp)} end), label: "DEBUG: Raw semantic table entries") flat_elements = raw_table |> Enum.flat_map(fn {graph_key, data} -> timestamp = Map.get(data, :timestamp, 0) # Extract the nested elements map case Map.get(data, :elements) do elements when is_map(elements) -> Enum.map(elements, fn {id, element_info} -> {id, element_info, timestamp, graph_key} end) _ -> [] end end) # Keep the best version of each ID # Prefer entries from :_root_ graph, otherwise take the latest by timestamp |> Enum.group_by(fn {id, _element_info, _timestamp, _graph_key} -> id end) |> Enum.map(fn {_id, versions} -> # Prefer _root_ entries, otherwise take latest timestamp best_version = Enum.max_by(versions, fn {_id, _info, ts, graph_key} -> root_priority = if graph_key in [:_root_, "_root_"], do: 1_000_000_000_000, else: 0 root_priority + ts end) {id, element_info, _timestamp, _graph_key} = best_version {id, element_info} end) summary = flat_elements |> Enum.map(fn {_id, element_info} -> element_info |> Map.get(:semantic, %{}) |> Map.get(:type, :unknown) |> to_string() end) |> Enum.frequencies() |> Enum.map(fn {type, count} -> "#{count} #{type}" end) |> Enum.join(", ") %{ count: length(flat_elements), elements: Enum.map(flat_elements, fn {id, element_info} -> element_info |> Map.put(:key, inspect(id)) |> sanitize_for_json() end), summary: summary, by_type: Enum.frequencies( Enum.map(flat_elements, fn {_id, element_info} -> Map.get(element_info, :semantic, %{}) |> Map.get(:type, :unknown) end) ), clickable_count: Enum.count(flat_elements, fn {_id, element_info} -> element_info |> Map.get(:semantic, %{}) |> Map.get(:clickable, false) end) } rescue _ -> %{count: 0, elements: [], summary: "Error reading semantic table"} end end defp normalize_key(key) do case String.downcase(key) do "enter" -> :key_enter "escape" -> :key_escape "tab" -> :key_tab "backspace" -> :key_backspace "delete" -> :key_delete "space" -> :key_space "up" -> :key_up "down" -> :key_down "left" -> :key_left "right" -> :key_right "home" -> :key_home "end" -> :key_end "page_up" -> :key_pageup "page_down" -> :key_pagedown "f1" -> :key_f1 "f2" -> :key_f2 "f3" -> :key_f3 "f4" -> :key_f4 "f5" -> :key_f5 "f6" -> :key_f6 "f7" -> :key_f7 "f8" -> :key_f8 "f9" -> :key_f9 "f10" -> :key_f10 "f11" -> :key_f11 "f12" -> :key_f12 other -> String.to_atom("key_" <> other) end end defp parse_modifiers(modifiers) when is_list(modifiers) do modifiers |> Enum.filter(&(&1 in ["shift", "ctrl", "alt", "cmd", "meta"])) end defp parse_modifiers(_), do: [] defp parse_button(button) when is_binary(button) do case String.downcase(button) do "left" -> :btn_left "right" -> :btn_right "middle" -> :btn_middle _ -> :btn_left end end defp parse_button(_), do: :btn_left defp calculate_center(bounds) when is_map(bounds) do # Bounds format: %{left: x, top: y, width: w, height: h} left = Map.get(bounds, :left, 0) top = Map.get(bounds, :top, 0) width = Map.get(bounds, :width, 0) height = Map.get(bounds, :height, 0) %{ x: left + width / 2, y: top + height / 2 } end defp calculate_center(_), do: %{x: 0, y: 0} defp calculate_center_with_transforms(bounds, transforms) when is_map(bounds) do # Calculate local center local_center = calculate_center(bounds) # DEBUG: See what transforms we're getting IO.inspect(transforms, label: "DEBUG: Transforms structure") # Apply translate transform if present # Transforms can be in format: %{translate: {x, y}} or %{pin: ..., translate: ...} translate = case Map.get(transforms, :translate) do {tx, ty} when is_number(tx) and is_number(ty) -> {tx, ty} _ -> {0, 0} end {tx, ty} = translate IO.inspect({tx, ty}, label: "DEBUG: Applied translate") %{ x: local_center.x + tx, y: local_center.y + ty } end defp calculate_center_with_transforms(bounds, _transforms) do calculate_center(bounds) end defp maybe_filter_by_id(elements, nil), do: elements defp maybe_filter_by_id(elements, filter) when is_binary(filter) do # Try to match the filter against the element key # Support both ":atom_name" and "atom_name" formats filter_atom = filter |> String.trim_leading(":") |> String.to_atom() Enum.filter(elements, fn {key, _data} -> key == filter_atom or inspect(key) =~ filter end) end defp maybe_filter_by_id(elements, _), do: elements # Helper to get values from maps that may have atom or string keys defp get_in_sanitized(map, [key | rest]) when is_map(map) do value = Map.get(map, key) || Map.get(map, to_string(key)) if rest == [] do value else get_in_sanitized(value, rest) end end defp get_in_sanitized(_, _), do: nil # Recursively sanitize data structures to be JSON-encodable # Converts tuples, atoms, and other non-JSON types to strings/basic types defp sanitize_for_json(data) when is_map(data) do data |> Enum.map(fn {k, v} -> {sanitize_for_json(k), sanitize_for_json(v)} end) |> Enum.into(%{}) end defp sanitize_for_json(data) when is_list(data) do Enum.map(data, &sanitize_for_json/1) end defp sanitize_for_json(data) when is_tuple(data) do # Convert tuples to string representation inspect(data) end defp sanitize_for_json(data) when is_atom(data) and data != nil and data != true and data != false do # Convert atoms (except nil, true, false) to strings Atom.to_string(data) end defp sanitize_for_json(data) when is_binary(data) or is_number(data) or is_boolean(data) or is_nil(data) do # These types are already JSON-safe data end defp sanitize_for_json(data) do # Fallback for any other types (PIDs, refs, etc.) inspect(data) end # This one here is the real milk in the tea! Here we map what actions we receive to tool calls def handle_action(%{"action" => "inspect_viewport"} = _actn) do ScenicMcp.Tools.handle_get_scenic_graph() end def handle_action(%{"action" => "send_keys"} = actn) do ScenicMcp.Tools.handle_send_keys(actn) end def handle_action(%{"action" => "send_mouse_move"} = actn) do ScenicMcp.Tools.handle_mouse_move(actn) end def handle_action(%{"action" => "send_mouse_click"} = actn) do ScenicMcp.Tools.handle_mouse_click(actn) end def handle_action(%{"action" => "take_screenshot"} = actn) do ScenicMcp.Tools.take_screenshot(actn) end def handle_action(%{"action" => "find_clickable"} = actn) do ScenicMcp.Tools.find_clickable_elements(actn) end def handle_action(%{"action" => "click_element"} = actn) do ScenicMcp.Tools.click_element(actn) end def handle_action(%{"action" => "hover_element"} = actn) do ScenicMcp.Tools.hover_element(actn) end def handle_action(%{"action" => _action}) do {:error, "Unknown command"} end def handle_action(_) do {:error, "Invalid action format - must include 'action' key"} end end

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/scenic-contrib/scenic_mcp_experimental'

If you have feedback or need assistance with the MCP directory API, please join our Discord server