Skip to main content
Glama

DataForSEO MCP Server

index-sse-http.ts11.8 kB
import express, { Request as ExpressRequest, Response, NextFunction } from 'express'; import { randomUUID } from "node:crypto"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { z } from 'zod'; import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; import { DataForSEOClient, DataForSEOConfig } from '../core/client/dataforseo.client.js'; import { EnabledModulesSchema, isModuleEnabled } from '../core/config/modules.config.js'; import { BaseModule, ToolDefinition } from '../core/modules/base.module.js'; import { name, version } from '../core/utils/version.js'; import { InMemoryEventStore } from '@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js'; import { ModuleLoaderService } from '../core/utils/module-loader.js'; import { initializeFieldConfiguration } from '../core/config/field-configuration.js'; // Initialize field configuration if provided initializeFieldConfiguration(); console.error('Starting DataForSEO MCP Server...'); console.error(`Server name: ${name}, version: ${version}`); /** * This example server demonstrates backwards compatibility with both: * 1. The deprecated HTTP+SSE transport (protocol version 2024-11-05) * 2. The Streamable HTTP transport (protocol version 2025-03-26) * * It maintains a single MCP server instance but exposes two transport options: * - /mcp: The new Streamable HTTP endpoint (supports GET/POST/DELETE) * - /sse: The deprecated SSE endpoint for older clients (GET to establish stream) * - /messages: The deprecated POST endpoint for older clients (POST to send messages) */ // Configuration constants const CONNECTION_TIMEOUT = 30000; // 30 seconds const CLEANUP_INTERVAL = 60000; // 1 minute // Extended request interface to include auth properties interface Request extends ExpressRequest { username?: string; password?: string; } // Transport interface with timestamp interface TransportWithTimestamp { transport: StreamableHTTPServerTransport | SSEServerTransport; lastActivity: number; } // Store transports by session ID const transports: Record<string, TransportWithTimestamp> = {}; // Cleanup function for stale connections function cleanupStaleConnections() { const now = Date.now(); Object.entries(transports).forEach(([sessionId, { transport, lastActivity }]) => { if (now - lastActivity > CONNECTION_TIMEOUT) { console.log(`Cleaning up stale connection for session ${sessionId}`); try { transport.close(); } catch (error) { console.error(`Error closing transport for session ${sessionId}:`, error); } delete transports[sessionId]; } }); } // Start periodic cleanup const cleanupInterval = setInterval(cleanupStaleConnections, CLEANUP_INTERVAL); function getServer(username: string | undefined, password: string | undefined): McpServer { const server = new McpServer({ name, version, }, { capabilities: { logging: {} } }); // Initialize DataForSEO client const dataForSEOConfig: DataForSEOConfig = { username: username || "", password: password || "", }; const dataForSEOClient = new DataForSEOClient(dataForSEOConfig); console.error('DataForSEO client initialized'); // Parse enabled modules from environment const enabledModules = EnabledModulesSchema.parse(process.env.ENABLED_MODULES); // Initialize modules const modules: BaseModule[] = ModuleLoaderService.loadModules(dataForSEOClient, enabledModules); // Register module tools modules.forEach(module => { const tools = module.getTools(); Object.entries(tools).forEach(([name, tool]) => { const typedTool = tool as ToolDefinition; const schema = z.object(typedTool.params); server.tool( name, typedTool.description, schema.shape, typedTool.handler ); }); }); return server; } // Create Express application const app = express(); app.use(express.json()); // Basic Auth Middleware const basicAuth = (req: Request, res: Response, next: NextFunction) => { const authHeader = req.headers.authorization; if (!authHeader || !authHeader.startsWith('Basic ')) { next(); return; } const base64Credentials = authHeader.split(' ')[1]; const credentials = Buffer.from(base64Credentials, 'base64').toString('utf-8'); const [username, password] = credentials.split(':'); if (!username || !password) { console.error('Invalid credentials'); res.status(401).json({ jsonrpc: "2.0", error: { code: -32001, message: "Invalid credentials" }, id: null }); return; } req.username = username; req.password = password; next(); }; //============================================================================= // STREAMABLE HTTP TRANSPORT (PROTOCOL VERSION 2025-03-26) //============================================================================= app.post('/http', basicAuth, async (req: Request, res: Response) => { // In stateless mode, create a new instance of transport and server for each request // to ensure complete isolation. A single instance would cause request ID collisions // when multiple clients connect concurrently. try { console.error(Date.now().toLocaleString()) // Handle credentials if (!req.username && !req.password) { const envUsername = process.env.DATAFORSEO_USERNAME; const envPassword = process.env.DATAFORSEO_PASSWORD; if (!envUsername || !envPassword) { console.error('No DataForSEO credentials provided'); res.status(401).json({ jsonrpc: "2.0", error: { code: -32001, message: "Authentication required. Provide DataForSEO credentials." }, id: null }); return; } req.username = envUsername; req.password = envPassword; } const server = getServer(req.username, req.password); console.error(Date.now().toLocaleString()) const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined }); await server.connect(transport); console.error('handle request'); await transport.handleRequest(req , res, req.body); console.error('end handle request'); req.on('close', () => { console.error('Request closed'); transport.close(); server.close(); }); } catch (error) { console.error('Error handling MCP request:', error); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', }, id: null, }); } } }); app.get('/http', async (req: Request, res: Response) => { console.error('Received GET MCP request'); res.status(405).json({ jsonrpc: "2.0", error: { code: -32000, message: "Method not allowed." }, id: null }); }); app.delete('/http', async (req: Request, res: Response) => { console.error('Received DELETE MCP request'); res.status(405).json({ jsonrpc: "2.0", error: { code: -32000, message: "Method not allowed." }, id: null }); }); //============================================================================= // DEPRECATED HTTP+SSE TRANSPORT (PROTOCOL VERSION 2024-11-05) //============================================================================= app.get('/sse', basicAuth, async (req: Request, res: Response) => { console.log('Received GET request to /sse (deprecated SSE transport)'); // Handle credentials if (!req.username && !req.password) { const envUsername = process.env.DATAFORSEO_USERNAME; const envPassword = process.env.DATAFORSEO_PASSWORD; if (!envUsername || !envPassword) { console.error('No DataForSEO credentials provided'); res.status(401).json({ jsonrpc: "2.0", error: { code: -32001, message: "Authentication required. Provide DataForSEO credentials." }, id: null }); return; } req.username = envUsername; req.password = envPassword; } const transport = new SSEServerTransport('/messages', res); // Store transport with timestamp transports[transport.sessionId] = { transport, lastActivity: Date.now() }; // Handle connection cleanup const cleanup = () => { try { transport.close(); } catch (error) { console.error(`Error closing transport for session ${transport.sessionId}:`, error); } delete transports[transport.sessionId]; }; res.on("error", cleanup); req.on("error", cleanup); req.socket.on("error", cleanup); req.socket.on("timeout", cleanup); // Set socket timeout req.socket.setTimeout(CONNECTION_TIMEOUT); const server = getServer(req.username, req.password); await server.connect(transport); }); app.post("/messages", basicAuth, async (req: Request, res: Response) => { const sessionId = req.query.sessionId as string; // Handle credentials if (!req.username && !req.password) { const envUsername = process.env.DATAFORSEO_USERNAME; const envPassword = process.env.DATAFORSEO_PASSWORD; if (!envUsername || !envPassword) { res.status(401).json({ jsonrpc: "2.0", error: { code: -32001, message: "Authentication required. Provide DataForSEO credentials." }, id: null }); return; } req.username = envUsername; req.password = envPassword; } const transportData = transports[sessionId]; if (!transportData) { res.status(400).send('No transport found for sessionId'); return; } if (!(transportData.transport instanceof SSEServerTransport)) { res.status(400).json({ jsonrpc: '2.0', error: { code: -32000, message: 'Bad Request: Session exists but uses a different transport protocol', }, id: null, }); return; } // Update last activity timestamp transportData.lastActivity = Date.now(); await transportData.transport.handlePostMessage(req, res, req.body); }); // Start the server const PORT = process.env.PORT ? parseInt(process.env.PORT, 10) : 3000; const server = app.listen(PORT, () => { console.log(`DataForSEO MCP Server with SSE compatibility listening on port ${PORT}`); console.log(` ============================================== SUPPORTED TRANSPORT OPTIONS: 1. Streamable Http (Protocol version: 2025-03-26) Endpoint: /http (POST) 2. Http + SSE (Protocol version: 2024-11-05) Endpoints: /sse (GET) and /messages (POST) Usage: - Establish SSE stream with GET to /sse - Send requests with POST to /messages?sessionId=<id> ============================================== `); }); // Handle server shutdown process.on('SIGINT', async () => { console.log('Shutting down server...'); // Clear cleanup interval clearInterval(cleanupInterval); // Close HTTP server server.close(); // Close all active transports for (const sessionId in transports) { try { console.log(`Closing transport for session ${sessionId}`); await transports[sessionId].transport.close(); delete transports[sessionId]; } catch (error) { console.error(`Error closing transport for session ${sessionId}:`, error); } } console.log('Server shutdown complete'); process.exit(0); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ravinwebsurgeon/seo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server