MCP Selenium

by angiejones
Verified
#!/usr/bin/env node import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { z } from "zod"; import pkg from 'selenium-webdriver'; const { Builder, By, Key, until, Actions } = pkg; import { Options as ChromeOptions } from 'selenium-webdriver/chrome.js'; import { Options as FirefoxOptions } from 'selenium-webdriver/firefox.js'; // Create an MCP server const server = new McpServer({ name: "MCP Selenium", version: "1.0.0" }); // Server state const state = { drivers: new Map(), currentSession: null }; // Helper functions const getDriver = () => { const driver = state.drivers.get(state.currentSession); if (!driver) { throw new Error('No active browser session'); } return driver; }; const getLocator = (by, value) => { switch (by.toLowerCase()) { case 'id': return By.id(value); case 'css': return By.css(value); case 'xpath': return By.xpath(value); case 'name': return By.name(value); case 'tag': return By.css(value); case 'class': return By.className(value); default: throw new Error(`Unsupported locator strategy: ${by}`); } }; // Common schemas const browserOptionsSchema = z.object({ headless: z.boolean().optional().describe("Run browser in headless mode"), arguments: z.array(z.string()).optional().describe("Additional browser arguments") }).optional(); const locatorSchema = { by: z.enum(["id", "css", "xpath", "name", "tag", "class"]).describe("Locator strategy to find element"), value: z.string().describe("Value for the locator strategy"), timeout: z.number().optional().describe("Maximum time to wait for element in milliseconds") }; // Browser Management Tools server.tool( "start_browser", "launches browser", { browser: z.enum(["chrome", "firefox"]).describe("Browser to launch (chrome or firefox)"), options: browserOptionsSchema }, async ({ browser, options = {} }) => { try { let builder = new Builder(); let driver; if (browser === 'chrome') { const chromeOptions = new ChromeOptions(); if (options.headless) { chromeOptions.addArguments('--headless=new'); } if (options.arguments) { options.arguments.forEach(arg => chromeOptions.addArguments(arg)); } driver = await builder .forBrowser('chrome') .setChromeOptions(chromeOptions) .build(); } else { const firefoxOptions = new FirefoxOptions(); if (options.headless) { firefoxOptions.addArguments('--headless'); } if (options.arguments) { options.arguments.forEach(arg => firefoxOptions.addArguments(arg)); } driver = await builder .forBrowser('firefox') .setFirefoxOptions(firefoxOptions) .build(); } const sessionId = `${browser}_${Date.now()}`; state.drivers.set(sessionId, driver); state.currentSession = sessionId; return { content: [{ type: 'text', text: `Browser started with session_id: ${sessionId}` }] }; } catch (e) { return { content: [{ type: 'text', text: `Error starting browser: ${e.message}` }] }; } } ); server.tool( "navigate", "navigates to a URL", { url: z.string().describe("URL to navigate to") }, async ({ url }) => { try { const driver = getDriver(); await driver.get(url); return { content: [{ type: 'text', text: `Navigated to ${url}` }] }; } catch (e) { return { content: [{ type: 'text', text: `Error navigating: ${e.message}` }] }; } } ); // Element Interaction Tools server.tool( "find_element", "finds an element", { ...locatorSchema }, async ({ by, value, timeout = 10000 }) => { try { const driver = getDriver(); const locator = getLocator(by, value); await driver.wait(until.elementLocated(locator), timeout); return { content: [{ type: 'text', text: 'Element found' }] }; } catch (e) { return { content: [{ type: 'text', text: `Error finding element: ${e.message}` }] }; } } ); server.tool( "click_element", "clicks an element", { ...locatorSchema }, async ({ by, value, timeout = 10000 }) => { try { const driver = getDriver(); const locator = getLocator(by, value); const element = await driver.wait(until.elementLocated(locator), timeout); await element.click(); return { content: [{ type: 'text', text: 'Element clicked' }] }; } catch (e) { return { content: [{ type: 'text', text: `Error clicking element: ${e.message}` }] }; } } ); server.tool( "send_keys", "sends keys to an element, aka typing", { ...locatorSchema, text: z.string().describe("Text to enter into the element") }, async ({ by, value, text, timeout = 10000 }) => { try { const driver = getDriver(); const locator = getLocator(by, value); const element = await driver.wait(until.elementLocated(locator), timeout); await element.clear(); await element.sendKeys(text); return { content: [{ type: 'text', text: `Text "${text}" entered into element` }] }; } catch (e) { return { content: [{ type: 'text', text: `Error entering text: ${e.message}` }] }; } } ); server.tool( "get_element_text", "gets the text() of an element", { ...locatorSchema }, async ({ by, value, timeout = 10000 }) => { try { const driver = getDriver(); const locator = getLocator(by, value); const element = await driver.wait(until.elementLocated(locator), timeout); const text = await element.getText(); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error getting element text: ${e.message}` }] }; } } ); server.tool( "hover", "moves the mouse to hover over an element", { ...locatorSchema }, async ({ by, value, timeout = 10000 }) => { try { const driver = getDriver(); const locator = getLocator(by, value); const element = await driver.wait(until.elementLocated(locator), timeout); const actions = driver.actions({ bridge: true }); await actions.move({ origin: element }).perform(); return { content: [{ type: 'text', text: 'Hovered over element' }] }; } catch (e) { return { content: [{ type: 'text', text: `Error hovering over element: ${e.message}` }] }; } } ); server.tool( "drag_and_drop", "drags an element and drops it onto another element", { ...locatorSchema, targetBy: z.enum(["id", "css", "xpath", "name", "tag", "class"]).describe("Locator strategy to find target element"), targetValue: z.string().describe("Value for the target locator strategy") }, async ({ by, value, targetBy, targetValue, timeout = 10000 }) => { try { const driver = getDriver(); const sourceLocator = getLocator(by, value); const targetLocator = getLocator(targetBy, targetValue); const sourceElement = await driver.wait(until.elementLocated(sourceLocator), timeout); const targetElement = await driver.wait(until.elementLocated(targetLocator), timeout); const actions = driver.actions({ bridge: true }); await actions.dragAndDrop(sourceElement, targetElement).perform(); return { content: [{ type: 'text', text: 'Drag and drop completed' }] }; } catch (e) { return { content: [{ type: 'text', text: `Error performing drag and drop: ${e.message}` }] }; } } ); server.tool( "double_click", "performs a double click on an element", { ...locatorSchema }, async ({ by, value, timeout = 10000 }) => { try { const driver = getDriver(); const locator = getLocator(by, value); const element = await driver.wait(until.elementLocated(locator), timeout); const actions = driver.actions({ bridge: true }); await actions.doubleClick(element).perform(); return { content: [{ type: 'text', text: 'Double click performed' }] }; } catch (e) { return { content: [{ type: 'text', text: `Error performing double click: ${e.message}` }] }; } } ); server.tool( "right_click", "performs a right click (context click) on an element", { ...locatorSchema }, async ({ by, value, timeout = 10000 }) => { try { const driver = getDriver(); const locator = getLocator(by, value); const element = await driver.wait(until.elementLocated(locator), timeout); const actions = driver.actions({ bridge: true }); await actions.contextClick(element).perform(); return { content: [{ type: 'text', text: 'Right click performed' }] }; } catch (e) { return { content: [{ type: 'text', text: `Error performing right click: ${e.message}` }] }; } } ); server.tool( "press_key", "simulates pressing a keyboard key", { key: z.string().describe("Key to press (e.g., 'Enter', 'Tab', 'a', etc.)") }, async ({ key }) => { try { const driver = getDriver(); const actions = driver.actions({ bridge: true }); await actions.keyDown(key).keyUp(key).perform(); return { content: [{ type: 'text', text: `Key '${key}' pressed` }] }; } catch (e) { return { content: [{ type: 'text', text: `Error pressing key: ${e.message}` }] }; } } ); server.tool( "upload_file", "uploads a file using a file input element", { ...locatorSchema, filePath: z.string().describe("Absolute path to the file to upload") }, async ({ by, value, filePath, timeout = 10000 }) => { try { const driver = getDriver(); const locator = getLocator(by, value); const element = await driver.wait(until.elementLocated(locator), timeout); await element.sendKeys(filePath); return { content: [{ type: 'text', text: 'File upload initiated' }] }; } catch (e) { return { content: [{ type: 'text', text: `Error uploading file: ${e.message}` }] }; } } ); server.tool( "take_screenshot", "captures a screenshot of the current page", { outputPath: z.string().optional().describe("Optional path where to save the screenshot. If not provided, returns base64 data.") }, async ({ outputPath }) => { try { const driver = getDriver(); const screenshot = await driver.takeScreenshot(); if (outputPath) { const fs = await import('fs'); await fs.promises.writeFile(outputPath, screenshot, 'base64'); return { content: [{ type: 'text', text: `Screenshot saved to ${outputPath}` }] }; } else { return { content: [ { type: 'text', text: 'Screenshot captured as base64:' }, { type: 'text', text: screenshot } ] }; } } catch (e) { return { content: [{ type: 'text', text: `Error taking screenshot: ${e.message}` }] }; } } ); server.tool( "close_session", "closes the current browser session", {}, async () => { try { const driver = getDriver(); await driver.quit(); state.drivers.delete(state.currentSession); const sessionId = state.currentSession; state.currentSession = null; return { content: [{ type: 'text', text: `Browser session ${sessionId} closed` }] }; } catch (e) { return { content: [{ type: 'text', text: `Error closing session: ${e.message}` }] }; } } ); // Resources server.resource( "browser-status", new ResourceTemplate("browser-status://current"), async (uri) => ({ contents: [{ uri: uri.href, text: state.currentSession ? `Active browser session: ${state.currentSession}` : "No active browser session" }] }) ); // Cleanup handler async function cleanup() { for (const [sessionId, driver] of state.drivers) { try { await driver.quit(); } catch (e) { console.error(`Error closing browser session ${sessionId}:`, e); } } state.drivers.clear(); state.currentSession = null; process.exit(0); } process.on('SIGTERM', cleanup); process.on('SIGINT', cleanup); // Start the server const transport = new StdioServerTransport(); await server.connect(transport);