Skip to main content
Glama
mcpServer.js56 kB
// src/server/mcpServer.js const { McpServer } = require('@modelcontextprotocol/sdk/server/mcp.js'); const { StdioServerTransport } = require('@modelcontextprotocol/sdk/server/stdio.js'); const { SSEServerTransport } = require('@modelcontextprotocol/sdk/server/sse.js'); const express = require('express'); const rateLimit = require('express-rate-limit'); const { z } = require('zod'); const { v4: uuidv4 } = require('uuid'); // Import uuid for connection IDs const config = require('../../config'); const { // Schemas conductResearchSchema, researchFollowUpSchema, getPastResearchSchema, rateResearchReportSchema, listResearchHistorySchema, getReportContentSchema, getServerStatusSchema, // Import schema for status tool listModelsSchema, // New: schema for listing models submitResearchSchema, searchSchema, querySchema, exportReportsSchema, importReportsSchema, backupDbSchema, dbHealthSchema, reindexVectorsSchema, searchWebSchema, fetchUrlSchema, indexTextsSchema, indexUrlSchema, searchIndexSchema, indexStatusSchema, listToolsSchema, searchToolsSchema, researchSchema, dateTimeSchema, calcSchema, retrieveSchema, // New: schema for retrieve tool getJobStatusSchema, cancelJobSchema, // Functions conductResearch, researchFollowUp, getPastResearch, rateResearchReport, listResearchHistory, getReportContent, getServerStatus, // Import function for status tool listModels, // New: function for listing models getJobStatusTool, cancelJobTool, exportReports, importReports, backupDb, dbHealth, reindexVectorsTool, searchWeb, fetchUrl, index_texts, index_url, search_index, index_status, listToolsTool, searchToolsTool, researchTool, dateTimeTool, calcTool, retrieveTool, // New: function for retrieve tool } = require('./tools'); const dbClient = require('../utils/dbClient'); // Import dbClient const nodeFetch = require('node-fetch'); const cors = require('cors'); // Create MCP server with proper capabilities declaration per MCP spec 2025-06-18 const server = new McpServer({ name: config.server.name, version: config.server.version, capabilities: { tools: {}, prompts: { listChanged: true }, resources: { subscribe: true, listChanged: true } } }); // MODE-based tool exposure const MODE = (config.mcp?.mode || 'ALL').toUpperCase(); const ALWAYS_ON = new Set(['ping','get_server_status','job_status','get_job_status','cancel_job']); const AGENT_ONLY = new Set(['agent']); const MANUAL_SET = new Set([ 'research','conduct_research','submit_research','research_follow_up', 'retrieve','search','query', 'get_report','get_report_content','history','list_research_history' ]); function shouldExpose(name) { if (ALWAYS_ON.has(name)) return true; if (MODE === 'AGENT') return AGENT_ONLY.has(name); if (MODE === 'MANUAL') return MANUAL_SET.has(name); return true; // ALL } function register(name, schema, handler) { if (shouldExpose(name)) server.tool(name, schema, handler); } // Permissive parameter normalizer to accept loose single-string inputs (e.g., random_string) function extractRawParam(input) { if (typeof input === 'string') return input; if (input && typeof input === 'object') { for (const key of ['random_string', 'raw', 'text', 'payload']) { if (typeof input[key] === 'string' && input[key].trim()) return input[key]; } } return null; } function tryParseJson(text) { try { const obj = JSON.parse(text); if (obj && typeof obj === 'object') return obj; } catch (_) {} return null; } function parseKeyVals(raw) { const out = {}; const s = String(raw || '').trim(); // Handle JSON upfront const j = tryParseJson(s); if (j) return j; // Quick patterns const lower = s.toLowerCase(); // Extract sql: rest of string after first occurrence if (/(^|[ ,;\n\t])sql\s*[:=]/i.test(s)) { const idx = s.toLowerCase().indexOf('sql'); const after = s.slice(idx).replace(/^sql\s*[:=]\s*/i, ''); out.sql = after.trim(); } // Generic key:value pairs (stop at next key) const regex = /(\w+)\s*[:=]\s*([^,;\n]+)(?=\s*[,;\n]|$)/g; let m; while ((m = regex.exec(s)) !== null) { const k = m[1]; const v = m[2].trim(); if (out[k] === undefined) out[k] = v; } // If nothing parsed, return a hint object if (Object.keys(out).length === 0) return { _raw: s }; return out; } function toNumberOr(value, fallback) { const n = Number(value); return Number.isFinite(n) ? n : fallback; } function toBoolean(value, fallback = false) { if (typeof value === 'boolean') return value; const s = String(value).trim().toLowerCase(); if (['true','1','yes','y','on'].includes(s)) return true; if (['false','0','no','n','off'].includes(s)) return false; return fallback; } function normalizeParamsForTool(toolName, params) { // If already a structured object without loose fields, pass through if (params && typeof params === 'object' && !('random_string' in params) && !('raw' in params) && !('text' in params)) { return params; } const raw = extractRawParam(params); const parsed = raw !== null ? parseKeyVals(raw) : (params || {}); const s = typeof raw === 'string' ? raw.trim() : ''; switch (toolName) { case 'calc': if (parsed && parsed.expr) return parsed; return { expr: s || String(parsed._raw || '') }; case 'date_time': if (parsed && parsed.format) return parsed; if (/^(iso|rfc|epoch|unix)$/i.test(s)) return { format: s.toLowerCase() === 'unix' ? 'epoch' : s.toLowerCase() }; return {}; case 'job_status': case 'get_job_status': if (parsed && parsed.job_id) return parsed; return { job_id: s || String(parsed._raw || '') }; case 'cancel_job': if (parsed && parsed.job_id) return parsed; return { job_id: s || String(parsed._raw || '') }; case 'get_report': case 'get_report_content': if (parsed && parsed.reportId) return parsed; if (parsed && parsed.id) return { reportId: parsed.id }; return { reportId: s || String(parsed._raw || '') }; case 'history': case 'list_research_history': if (parsed && (parsed.limit || parsed.queryFilter)) { const out = { ...parsed }; if (out.limit !== undefined) out.limit = toNumberOr(out.limit, 10); return out; } if (/^\d+$/.test(s)) return { limit: Number(s) }; return s ? { queryFilter: s } : {}; case 'retrieve': { const out = { ...parsed }; // Mode detection const isSql = /mode\s*[:=]\s*sql/i.test(s) || /^\s*select\s/i.test(s) || (out.sql && !out.query); if (isSql) { out.mode = 'sql'; out.sql = out.sql || (parsed._raw ? parsed._raw : s); // params support via JSON only; leave as-is if present } else { out.mode = out.mode || 'index'; out.query = out.query || (parsed._raw ? parsed._raw : s); } if (out.k !== undefined) out.k = toNumberOr(out.k, 10); return out; } case 'research': case 'submit_research': case 'conduct_research': if (parsed && (parsed.query || parsed.q)) { const out = { ...parsed }; if (out.q && !out.query) out.query = out.q; if (out.cost && !out.costPreference) out.costPreference = out.cost; if (out.async !== undefined) out.async = toBoolean(out.async, true); return out; } return s ? { query: s } : {}; case 'list_tools': case 'search_tools': if (parsed && parsed.query) return parsed; return s ? { query: s } : {}; case 'get_server_status': return {}; // no params default: // Best-effort passthrough return parsed && Object.keys(parsed).length ? parsed : {}; } } // Register prompts using latest MCP spec with proper protocol handlers if (config.mcp?.features?.prompts) { const prompts = new Map([ ['planning_prompt', { name: 'planning_prompt', description: 'Generate sophisticated multi-agent research plan using advanced XML tagging and domain-aware query decomposition', arguments: [ { name: 'query', description: 'Research query to decompose into specialized sub-queries', required: true }, { name: 'domain', description: 'Primary domain: general, technical, reasoning, search, creative', required: false }, { name: 'complexity', description: 'Query complexity: simple, moderate, complex', required: false }, { name: 'maxAgents', description: 'Maximum number of research agents (1-10)', required: false } ] }], ['synthesis_prompt', { name: 'synthesis_prompt', description: 'Synthesize ensemble research results with rigorous citation framework and confidence scoring', arguments: [ { name: 'query', description: 'Original research query for synthesis context', required: true }, { name: 'results', description: 'JSON string of research results to synthesize', required: true }, { name: 'outputFormat', description: 'Output format: report, briefing, bullet_points', required: false }, { name: 'audienceLevel', description: 'Target audience: beginner, intermediate, expert', required: false } ] }], ['research_workflow_prompt', { name: 'research_workflow_prompt', description: 'Complete research workflow: planning → parallel execution → synthesis with quality controls', arguments: [ { name: 'topic', description: 'Research topic or question', required: true }, { name: 'costBudget', description: 'Cost preference: low, high', required: false }, { name: 'async', description: 'Use async job processing: true, false', required: false } ] }] ]); server.setPromptRequestHandlers({ list: async () => ({ prompts: Array.from(prompts.values()) }), get: async (request) => { const prompt = prompts.get(request.params.name); if (!prompt) throw new Error(`Prompt not found: ${request.params.name}`); const { query, domain, complexity, maxAgents, results, outputFormat, audienceLevel, topic, costBudget, async } = request.params.arguments || {}; switch (request.params.name) { case 'planning_prompt': if (!query) { return { description: prompt.description, messages: [{ role: 'assistant', content: [{ type: 'text', text: 'Please provide a query parameter to generate a research plan.' }] }] }; } const p = require('../agents/planningAgent'); const planResult = await p.planResearch(query, { domain, complexity, maxAgents }, null, 'prompt'); return { description: prompt.description, messages: [{ role: 'assistant', content: [{ type: 'text', text: planResult }] }] }; case 'synthesis_prompt': const c = require('../agents/contextAgent'); let parsedResults = []; try { parsedResults = results ? JSON.parse(results) : []; } catch (e) { parsedResults = []; } let synthesisResult = ''; for await (const ch of c.contextualizeResultsStream(query, parsedResults, [], { includeSources: true, outputFormat: outputFormat || 'report', audienceLevel: audienceLevel || 'intermediate' }, 'prompt')) { if (ch.content) synthesisResult += ch.content; } return { description: prompt.description, messages: [{ role: 'assistant', content: [{ type: 'text', text: synthesisResult }] }] }; case 'research_workflow_prompt': const safeTopic = topic || '[your_topic]'; const workflowGuide = ` # Research Workflow for: ${safeTopic} ## 1. Planning Phase \`\`\`mcp planning_prompt { "query": "${safeTopic}", "domain": "auto-detect", "complexity": "auto-assess" } \`\`\` ## 2. Research Execution ${async === 'true' ? ` \`\`\`mcp submit_research { "query": "${safeTopic}", "costPreference": "${costBudget || 'low'}" } get_job_status { "job_id": "[returned_job_id]" } \`\`\` ` : ` \`\`\`mcp conduct_research { "query": "${safeTopic}", "costPreference": "${costBudget || 'low'}" } \`\`\` `} ## 3. Quality Assurance \`\`\`mcp get_past_research { "query": "${safeTopic}", "limit": 5 } search { "q": "${safeTopic}", "scope": "reports" } \`\`\` ## 4. Follow-up Analysis \`\`\`mcp research_follow_up { "originalQuery": "${safeTopic}", "followUpQuestion": "[your_specific_question]" } \`\`\` `; return { description: prompt.description, messages: [{ role: 'assistant', content: [{ type: 'text', text: workflowGuide }] }] }; default: throw new Error(`Unknown prompt: ${request.params.name}`); } } }); } // Register resources using latest MCP spec with proper protocol handlers and URI templates if (config.mcp?.features?.resources) { const resources = new Map([ ['mcp://specs/core', { uri: 'mcp://specs/core', name: 'MCP Core Specification', description: 'Canonical Model Context Protocol specification links and references', mimeType: 'application/json' }], ['mcp://tools/catalog', { uri: 'mcp://tools/catalog', name: 'Available Tools Catalog', description: 'Live MCP tools catalog with lightweight params for client UIs', mimeType: 'application/json' }], ['mcp://patterns/workflows', { uri: 'mcp://patterns/workflows', name: 'Research Workflow Patterns', description: 'Sophisticated tool chaining patterns for multi-agent research orchestration', mimeType: 'application/json' }], ['mcp://examples/multimodal', { uri: 'mcp://examples/multimodal', name: 'Multimodal Research Examples', description: 'Advanced examples for vision-capable research with dynamic model routing', mimeType: 'application/json' }], ['mcp://use-cases/domains', { uri: 'mcp://use-cases/domains', name: 'Domain-Specific Use Cases', description: 'Comprehensive use cases across technical, creative, and analytical domains', mimeType: 'application/json' }], ['mcp://optimization/caching', { uri: 'mcp://optimization/caching', name: 'Caching & Cost Optimization', description: 'Advanced caching strategies and cost-effective model selection patterns', mimeType: 'application/json' }] ]); // Helper function to generate domain-specific use cases const generateDomainUseCases = async () => { return { technical_research: { domain: "Technical Analysis", problem: "Understanding complex system architectures and implementation patterns", workflow: { step1: { tool: "search_web", params: { query: "microservices architecture patterns 2025" } }, step2: { tool: "fetch_url", params: { url: "authoritative_source_url" } }, step3: { tool: "conduct_research", params: { query: "Compare microservices vs monolithic architectures", textDocuments: ["fetched_content"] } }, step4: { tool: "research_follow_up", params: { originalQuery: "architecture comparison", followUpQuestion: "What are the security implications?" } } }, expected_outcome: "Comprehensive technical analysis with authoritative citations" }, business_intelligence: { domain: "Market Research & Analysis", problem: "Gathering competitive intelligence and market trends", workflow: { step1: { tool: "search_web", params: { query: "AI market trends Q3 2025" } }, step2: { tool: "submit_research", params: { query: "AI market competitive landscape analysis", costPreference: "low" } }, step3: { tool: "get_job_status", params: { job_id: "monitor_async_job" } }, step4: { tool: "get_past_research", params: { query: "AI market", limit: 3 } } }, expected_outcome: "Market intelligence report with trend analysis and competitive positioning" }, creative_synthesis: { domain: "Creative Content & Strategy", problem: "Developing innovative solutions and creative strategies", workflow: { step1: { tool: "conduct_research", params: { query: "innovative UX design patterns 2025", costPreference: "high" } }, step2: { tool: "search", params: { q: "UX design", scope: "reports" } }, step3: { tool: "research_follow_up", params: { originalQuery: "UX patterns", followUpQuestion: "How do these apply to AI interfaces?" } } }, expected_outcome: "Creative strategy recommendations with design inspiration" } }; }; server.setResourceRequestHandlers({ list: async () => ({ resources: Array.from(resources.values()) }), read: async (request) => { const uri = request.params.uri; const resource = resources.get(uri); if (!resource) throw new Error(`Resource not found: ${uri}`); let content; switch (uri) { case 'mcp://specs/core': content = { spec: 'https://spec.modelcontextprotocol.io/specification/2025-03-26/', jsonrpc: 'https://www.jsonrpc.org/specification', org: 'https://github.com/modelcontextprotocol', docs: 'https://modelcontextprotocol.io/', sdk: 'https://github.com/modelcontextprotocol/sdk', implementations: { openrouter_agents: 'https://github.com/wheattoast11/openrouter-deep-research', anthropic_examples: 'https://github.com/modelcontextprotocol/servers' } }; break; case 'mcp://tools/catalog': try { const text = await require('./tools').listToolsTool({ limit: 200, semantic: false }); content = JSON.parse(text); } catch (_) { content = { tools: [] }; } break; case 'mcp://patterns/workflows': content = { basic_patterns: [ { name: 'Search → Fetch → Research', steps: ['search_web { query }', 'fetch_url { url }', 'conduct_research { query, textDocuments:[content] }'], use_case: 'Web research with source verification' }, { name: 'Knowledge Base Query → Research', steps: ['search { q, scope:"reports" }', 'get_past_research { query }', 'conduct_research { query }'], use_case: 'Building on previous research' }, { name: 'Async Research Pipeline', steps: ['submit_research { query }', 'get_job_status { job_id }', 'get_report_content { reportId }'], use_case: 'Long-running comprehensive research' } ], advanced_patterns: [ { name: 'Multimodal Research Chain', steps: ['conduct_research { query, images:[...] }', 'research_follow_up { originalQuery, followUpQuestion }'], use_case: 'Vision-assisted analysis with iterative refinement' }, { name: 'Cost-Optimized Research', steps: ['list_models', 'conduct_research { query, costPreference:"low" }', 'rate_research_report'], use_case: 'Budget-conscious research with quality feedback' } ] }; break; case 'mcp://examples/multimodal': content = { vision_research: { conduct_research: { query: 'Analyze the technical architecture diagram and explain the data flow patterns', images: [{ url: 'data:image/png;base64,...', detail: 'high' }], costPreference: 'low', audienceLevel: 'expert' } }, document_analysis: { conduct_research: { query: 'Synthesize key findings from the research papers', textDocuments: [{ name: 'paper1.pdf', content: '...' }], structuredData: [{ name: 'results.csv', type: 'csv', content: 'metric,value\\n...' }] } } }; break; case 'mcp://use-cases/domains': content = await generateDomainUseCases(); break; case 'mcp://optimization/caching': content = { strategies: { result_caching: { description: 'Cache research results with semantic similarity matching', ttl_seconds: 3600, implementation: 'In-memory NodeCache + PGLite semantic search' }, model_routing: { description: 'Route queries to cost-effective models based on complexity', models: { simple: ['deepseek/deepseek-chat-v3.1', 'qwen/qwen3-coder'], complex: ['x-ai/grok-4', 'morph/morph-v3-large'], vision: ['z-ai/glm-4.5v', 'google/gemini-2.5-flash'] } }, batch_processing: { description: 'Process multiple queries in parallel with bounded concurrency', parallelism: 4, cost_savings: '60-80% through efficient resource utilization' } } }; break; default: throw new Error(`Unknown resource: ${uri}`); } return { contents: [{ uri: resource.uri, mimeType: resource.mimeType, text: JSON.stringify(content, null, 2) }] }; } }); } // Register tools (minimal unified set) register( "research", researchSchema, async (params, exchange) => { try { const norm = normalizeParamsForTool('research', params); const text = await researchTool(norm, exchange, `req-${Date.now()}`); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error research: ${e.message}` }], isError: true }; } } ); register( "agent", require('./tools').agentSchema, async (params, exchange) => { try { const norm = normalizeParamsForTool('agent', params); const text = await require('./tools').agentTool(norm, exchange, `req-${Date.now()}`); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error agent: ${e.message}` }], isError: true }; } } ); register( "ping", require('./tools').pingSchema, async (params) => { try { const text = await require('./tools').pingTool(params); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error ping: ${e.message}` }], isError: true }; } } ); register( "job_status", getJobStatusSchema, async (params) => { try { const norm = normalizeParamsForTool('job_status', params); const text = await getJobStatusTool(norm); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error job_status: ${e.message}` }], isError: true }; } } ); register( "cancel_job", cancelJobSchema, async (params) => { try { const norm = normalizeParamsForTool('cancel_job', params); const text = await cancelJobTool(norm); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error cancel_job: ${e.message}` }], isError: true }; } } ); register( "retrieve", retrieveSchema, async (params, exchange) => { try { const norm = normalizeParamsForTool('retrieve', params); const text = await retrieveTool(norm, exchange, `req-${Date.now()}`); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error retrieve: ${e.message}` }], isError: true }; } } ); register( "get_report", getReportContentSchema, async (params, exchange) => { const startTime = Date.now(); const requestId = `req-${startTime}-${Math.random().toString(36).substring(2, 7)}`; try { const norm = normalizeParamsForTool('get_report', params); const result = await getReportContent(norm, exchange, requestId); return { content: [{ type: 'text', text: result }] }; } catch (e) { return { content: [{ type: 'text', text: `Error get_report: ${e.message}` }], isError: true }; } } ); register( "history", listResearchHistorySchema, async (params, exchange) => { try { const norm = normalizeParamsForTool('history', params); const text = await listResearchHistory(norm, exchange, `req-${Date.now()}`); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error history: ${e.message}` }], isError: true }; } } ); register( "date_time", dateTimeSchema, async (params, exchange) => { try { const norm = normalizeParamsForTool('date_time', params); const text = await dateTimeTool(norm, exchange, `req-${Date.now()}`); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error date_time: ${e.message}` }], isError: true }; } } ); register( "calc", calcSchema, async (params, exchange) => { try { const norm = normalizeParamsForTool('calc', params); const text = await calcTool(norm, exchange, `req-${Date.now()}`); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error calc: ${e.message}` }], isError: true }; } } ); register( "list_tools", listToolsSchema, async (params, exchange) => { try { const norm = normalizeParamsForTool('list_tools', params); const text = await listToolsTool(norm, exchange, `req-${Date.now()}`); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error list_tools: ${e.message}` }], isError: true }; } } ); register( "search_tools", searchToolsSchema, async (params, exchange) => { try { const norm = normalizeParamsForTool('search_tools', params); const text = await searchToolsTool(norm, exchange, `req-${Date.now()}`); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error search_tools: ${e.message}` }], isError: true }; } } ); register( "get_server_status", getServerStatusSchema, async (params, exchange) => { try { const text = await getServerStatus({}, exchange, `req-${Date.now()}`); return { content: [{ type: 'text', text }] }; } catch (e) { return { content: [{ type: 'text', text: `Error get_server_status: ${e.message}` }], isError: true }; } } ); // Back-compat aliases → unified tools register("submit_research", submitResearchSchema, async (p, ex) => { try { const norm = normalizeParamsForTool('submit_research', p); const t = await researchTool({ ...norm, async: true }, ex, `req-${Date.now()}`); return { content: [{ type: 'text', text: t }] }; } catch (e){ return { content: [{ type: 'text', text: `Error submit_research: ${e.message}`}], isError:true }; }}); register("get_job_status", getJobStatusSchema, async (p) => { try { const norm = normalizeParamsForTool('get_job_status', p); const t = await getJobStatusTool(norm); return { content: [{ type: 'text', text: t }] }; } catch (e){ return { content: [{ type: 'text', text: `Error get_job_status: ${e.message}`}], isError:true }; }}); register("search", searchSchema, async (p, ex) => { try { const norm = normalizeParamsForTool('search', p); const t = await retrieveTool({ mode: 'index', query: norm.q || norm.query, k: norm.k, scope: norm.scope, rerank: norm.rerank }, ex, `req-${Date.now()}`); return { content: [{ type: 'text', text: t }] }; } catch (e){ return { content: [{ type: 'text', text: `Error search: ${e.message}`}], isError:true }; }}); register("query", querySchema, async (p, ex) => { try { const norm = normalizeParamsForTool('query', p); const t = await retrieveTool({ mode: 'sql', sql: norm.sql, params: norm.params, explain: norm.explain }, ex, `req-${Date.now()}`); return { content: [{ type: 'text', text: t }] }; } catch (e){ return { content: [{ type: 'text', text: `Error query: ${e.message}`}], isError:true }; }}); register("conduct_research", researchSchema, async (p, ex) => { try { const norm = normalizeParamsForTool('conduct_research', p); const t = await researchTool({ ...norm, async: false }, ex, `req-${Date.now()}`); return { content: [{ type: 'text', text: t }] }; } catch (e){ return { content: [{ type: 'text', text: `Error conduct_research: ${e.message}`}], isError:true }; }}); register("get_report_content", getReportContentSchema, async (p, ex) => { try { const norm = normalizeParamsForTool('get_report_content', p); const t = await getReportContent(norm, ex, `req-${Date.now()}`); return { content: [{ type: 'text', text: t }] }; } catch (e){ return { content: [{ type: 'text', text: `Error get_report_content: ${e.message}`}], isError:true }; }}); register("list_research_history", listResearchHistorySchema, async (p, ex) => { try { const norm = normalizeParamsForTool('list_research_history', p); const t = await listResearchHistory(norm, ex, `req-${Date.now()}`); return { content: [{ type: 'text', text: t }] }; } catch (e){ return { content: [{ type: 'text', text: `Error list_research_history: ${e.message}`}], isError:true }; }}); register("research_follow_up", researchFollowUpSchema, async (p, ex) => { try { const norm = normalizeParamsForTool('research_follow_up', p); const t = await researchFollowUp(norm, ex, `req-${Date.now()}`); return { content: [{ type: 'text', text: t }] }; } catch (e){ return { content: [{ type: 'text', text: `Error research_follow_up: ${e.message}`}], isError:true }; }}); // Set up transports based on environment const setupTransports = async () => { let lastSseTransport = null; // Variable to hold the last SSE transport const sseConnections = new Map(); // Map to store active SSE connections // For command-line usage, use STDIO if (process.argv.includes('--stdio')) { // console.error('Starting MCP server with STDIO transport'); // Commented out: Logs interfere with STDIO JSON-RPC const transport = new StdioServerTransport(); // console.error('Attempting server.connect(transport)...'); // Commented out: Logs interfere with STDIO JSON-RPC await server.connect(transport); // console.error('server.connect(transport) completed.'); // Commented out: Logs interfere with STDIO JSON-RPC return; // Exit after setting up stdio, don't proceed to HTTP setup } else { // Only setup HTTP/SSE if --stdio is NOT specified // For HTTP usage, set up Express with SSE and optional Streamable HTTP const app = express(); const port = config.server.port; // OAuth2/JWT placeholder: use AUTH_JWKS_URL or fallback to API key until configured const serverApiKey = config.server.apiKey; const jwksUrl = process.env.AUTH_JWKS_URL || null; const expectedAudience = process.env.AUTH_EXPECTED_AUD || 'mcp-server'; app.use(cors({ origin: '*', exposedHeaders: ['Mcp-Session-Id'], allowedHeaders: ['Content-Type', 'authorization', 'mcp-session-id'] })); // Rate limiting middleware - production hardening const limiter = rateLimit({ windowMs: 60 * 1000, // 1 minute max: 100, // 100 requests per minute per IP message: { error: 'Too many requests, please try again later.' }, standardHeaders: true, // Return rate limit info in `RateLimit-*` headers legacyHeaders: false, // Disable `X-RateLimit-*` headers }); app.use(limiter); // Request size limits app.use(express.json({ limit: '10mb' })); // Enforce HTTPS in production when required if (config.server.requireHttps) { app.use((req, res, next) => { const proto = req.headers['x-forwarded-proto'] || req.protocol; if (proto !== 'https') return res.status(400).json({ error: 'HTTPS required' }); next(); }); } // Authentication Middleware (JWT first, fallback API key if configured) const authenticate = async (req, res, next) => { const allowNoAuth = process.env.ALLOW_NO_API_KEY === 'true'; const authHeader = req.headers.authorization || ''; if (!authHeader.startsWith('Bearer ')) { if (allowNoAuth) return next(); return res.status(401).json({ error: 'Unauthorized: Missing bearer token' }); } const token = authHeader.split(' ')[1]; if (jwksUrl) { try { // Lazy import jose to keep dep optional const { createRemoteJWKSet, jwtVerify } = require('jose'); const JWKS = createRemoteJWKSet(new URL(jwksUrl)); const { payload } = await jwtVerify(token, JWKS, { audience: expectedAudience }); if (!payload || (expectedAudience && payload.aud !== expectedAudience && !Array.isArray(payload.aud))) { return res.status(403).json({ error: 'Forbidden: invalid token audience' }); } return next(); } catch (e) { if (!serverApiKey) { return res.status(403).json({ error: 'Forbidden: JWT verification failed' }); } // Fall through to API key if configured } } if (serverApiKey && token === serverApiKey) return next(); if (allowNoAuth) return next(); return res.status(403).json({ error: 'Forbidden: Auth failed' }); }; console.error(`Starting MCP server with HTTP/SSE transport on port ${port}`); // Use error if (jwksUrl) { console.error(`[${new Date().toISOString()}] OAuth2/JWT auth ENABLED (JWKS=${jwksUrl}, aud=${expectedAudience}).`); } else if (serverApiKey) { console.error(`[${new Date().toISOString()}] API key fallback ENABLED for HTTP transport.`); } else if (process.env.ALLOW_NO_API_KEY === 'true') { console.error(`[${new Date().toISOString()}] SECURITY WARNING: Authentication DISABLED for HTTP transport (ALLOW_NO_API_KEY=true).`); // Use error, keep as warning level } else { console.error(`[${new Date().toISOString()}] CRITICAL: SERVER_API_KEY not set and ALLOW_NO_API_KEY!=true. HTTP transport may fail.`); // Keep error } // Streamable HTTP transport (preferred) guarded by feature flag if (require('../../config').mcp.transport.streamableHttpEnabled) { try { const { StreamableHTTPServerTransport } = require('@modelcontextprotocol/sdk/server/streamableHttp.js'); app.all('/mcp', authenticate, async (req, res) => { const transport = new StreamableHTTPServerTransport({ enableDnsRebindingProtection: true, allowedHosts: ['127.0.0.1', 'localhost'], allowedOrigins: ['http://localhost', 'http://127.0.0.1'] }); res.on('close', () => transport.close()); await server.connect(transport); await transport.handleRequest(req, res, req.body); }); } catch (e) { console.error('StreamableHTTP transport not available:', e.message); } } // Endpoint for SSE - Apply authentication middleware // Endpoint for SSE - Apply authentication middleware app.get('/sse', authenticate, async (req, res) => { const connectionId = uuidv4(); // Generate a unique ID for this connection console.error(`[${new Date().toISOString()}] New SSE connection established with ID: ${connectionId}`); // Use error // Set headers for SSE res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', }); const transport = new SSEServerTransport('/messages', res); // Pass the response object sseConnections.set(connectionId, transport); // Store transport keyed by ID lastSseTransport = transport; // Keep track of the last one for the simple POST handler try { await server.connect(transport); // Connect the server to this specific transport console.error(`[${new Date().toISOString()}] MCP Server connected to SSE transport for connection ID: ${connectionId}`); } catch (error) { console.error(`[${new Date().toISOString()}] Error connecting MCP Server to SSE transport for ID ${connectionId}:`, error); sseConnections.delete(connectionId); // Clean up on connection error if (!res.writableEnded) { res.end(); } return; // Stop further processing for this request } // Handle client disconnect req.on('close', () => { console.error(`[${new Date().toISOString()}] SSE connection closed for ID: ${connectionId}`); sseConnections.delete(connectionId); if (lastSseTransport === transport) { lastSseTransport = null; // Clear if it was the last one } // Optionally notify the server instance if needed, though transport might handle this // server.disconnect(transport); // If SDK supports targeted disconnect }); }); // Job events SSE per job id app.get('/jobs/:jobId/events', authenticate, async (req, res) => { const { jobId } = req.params; res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' }); let lastEventId = 0; const send = (type, data) => { try { res.write(`event: ${type}\n`); res.write(`data: ${JSON.stringify(data)}\n\n`);} catch(_){} }; send('open', { ok: true, jobId }); const timer = setInterval(async () => { try { const events = await dbClient.getJobEvents(jobId, lastEventId, 200); for (const ev of events) { lastEventId = ev.id; send(ev.event_type || 'message', ev); } const j = await dbClient.getJobStatus(jobId); if (!j || j.status === 'succeeded' || j.status === 'failed' || j.status === 'canceled') { send('complete', j || { jobId, status: 'unknown' }); clearInterval(timer); if (!res.writableEnded) res.end(); } } catch (e) { send('error', { message: e.message }); } }, 1000); req.on('close', () => { clearInterval(timer); }); }); // Simple HTTP job submission for testing and automation app.post('/jobs', authenticate, express.json(), async (req, res) => { try { const params = req.body || {}; const jobId = await dbClient.createJob('research', params); await dbClient.appendJobEvent(jobId, 'submitted', { query: params.query || params.q || '' }); res.json({ job_id: jobId }); } catch (e) { res.status(500).json({ error: e.message }); } }); // Lightweight JSON metrics app.get('/metrics', authenticate, async (req, res) => { try { const embedderReady = dbClient.isEmbedderReady(); const dbInitialized = dbClient.isDbInitialized(); const dbPathInfo = dbClient.getDbPathInfo(); const rows = await dbClient.executeQuery(`SELECT status, COUNT(*) AS n FROM jobs GROUP BY status`, []); const recent = await dbClient.executeQuery(`SELECT id, type, status, created_at, finished_at FROM jobs ORDER BY created_at DESC LIMIT 25`, []); // Aggregate usage totals from recent reports const usageRows = await dbClient.executeQuery(`SELECT research_metadata FROM reports ORDER BY id DESC LIMIT 200`, []); const usageTotals = { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 }; try { for (const r of usageRows) { const meta = typeof r.research_metadata === 'string' ? JSON.parse(r.research_metadata) : r.research_metadata; const t = meta?.usage?.totals; if (!t) continue; usageTotals.prompt_tokens += Number(t.prompt_tokens||0); usageTotals.completion_tokens += Number(t.completion_tokens||0); usageTotals.total_tokens += Number(t.total_tokens||0); } } catch(_) {} if ((req.headers['accept'] || '').includes('text/plain')) { res.setHeader('Content-Type','text/plain; version=0.0.4'); const lines = []; lines.push(`# HELP jobs_total Number of jobs by status`); lines.push(`# TYPE jobs_total gauge`); for (const r of rows) lines.push(`jobs_total{status="${r.status}"} ${Number(r.n||0)}`); lines.push(`# HELP embedder_ready Whether embedder is initialized`); lines.push(`# TYPE embedder_ready gauge`); lines.push(`embedder_ready ${embedderReady?1:0}`); lines.push(`# HELP db_initialized Whether DB is initialized`); lines.push(`# TYPE db_initialized gauge`); lines.push(`db_initialized ${dbInitialized?1:0}`); lines.push(`# HELP tokens_prompt Total prompt tokens from recent reports`); lines.push(`# TYPE tokens_prompt counter`); lines.push(`tokens_prompt ${usageTotals.prompt_tokens}`); lines.push(`# HELP tokens_completion Total completion tokens from recent reports`); lines.push(`# TYPE tokens_completion counter`); lines.push(`tokens_completion ${usageTotals.completion_tokens}`); lines.push(`# HELP tokens_total Total tokens from recent reports`); lines.push(`# TYPE tokens_total counter`); lines.push(`tokens_total ${usageTotals.total_tokens}`); return res.end(lines.join('\n') + '\n'); } res.json({ time: new Date().toISOString(), database: { initialized: dbInitialized, storageType: dbPathInfo }, embedder: { ready: embedderReady }, jobs: rows, recent, usageTotals, }); } catch (e) { res.status(500).json({ error: e.message }); } }); // Health endpoint for monitoring (no auth required) app.get('/health', async (req, res) => { try { const dbInitialized = dbClient.isDbInitialized(); const embedderReady = dbClient.isEmbedderReady(); const healthy = dbInitialized; // Minimum: DB must be initialized res.status(healthy ? 200 : 503).json({ status: healthy ? 'healthy' : 'unhealthy', version: config.server.version, timestamp: new Date().toISOString(), checks: { database: dbInitialized ? 'ok' : 'not_initialized', embedder: embedderReady ? 'ready' : 'initializing' } }); } catch (e) { res.status(503).json({ status: 'unhealthy', error: e.message, timestamp: new Date().toISOString() }); } }); // Server discovery endpoint for MCP clients (no auth required per MCP draft spec Nov 2025) app.get('/.well-known/mcp-server', (req, res) => { res.json({ name: config.server.name, version: config.server.version, description: 'OpenRouter MCP server for multi-agent deep research', specification: '2025-06-18', specificationDraft: '2025-11-25', capabilities: { tools: {}, prompts: { listChanged: true }, resources: { subscribe: true, listChanged: true }, async: true, // Supports async operations via job system streaming: true, // Supports SSE streaming authentication: ['jwt', 'bearer', 'optional'] }, transports: [ { type: 'stdio', command: 'npx @terminals-tech/openrouter-agents --stdio', description: 'Standard I/O transport for IDE integration' }, { type: 'sse', endpoint: '/sse', messageEndpoint: '/messages', description: 'Server-Sent Events transport with per-connection routing' }, { type: 'http', endpoint: '/mcp', description: 'StreamableHTTP transport (if enabled)' } ], endpoints: { health: '/health', metrics: '/metrics', jobs: '/jobs', jobEvents: '/jobs/:jobId/events', discovery: '/.well-known/mcp-server', ui: '/ui' }, extensions: { 'async-operations': { version: '1.0', description: 'Long-running async operations via job system', endpoints: { submit: '/jobs', status: 'tool:job_status', cancel: 'tool:cancel_job', events: '/jobs/:jobId/events' } }, 'knowledge-base': { version: '1.0', description: 'Semantic knowledge base with hybrid BM25+vector search', features: ['vector-search', 'bm25', 'hybrid-fusion', 'llm-rerank'] }, 'multi-agent': { version: '1.0', description: 'Multi-agent orchestration (planning → research → synthesis)', features: ['domain-aware-planning', 'ensemble-execution', 'streaming-synthesis'] } }, contact: { name: 'Tej Desai', email: 'admin@terminals.tech', url: 'https://terminals.tech' }, repository: 'https://github.com/wheattoast11/openrouter-deep-research-mcp', homepage: 'https://terminals.tech' }); }); // About endpoint for directory metadata app.get('/about', (req, res) => { res.json({ name: config.server.name, version: config.server.version, author: 'Tej Desai', email: 'admin@terminals.tech', homepage: 'https://terminals.tech', privacy: 'https://terminals.tech/privacy', support: 'admin@terminals.tech' }); }); // Minimal static UI placeholder (can be replaced later) app.get('/ui', (req, res) => { res.setHeader('Content-Type', 'text/html'); res.end(`<!doctype html><html><head><meta charset="utf-8"><title>MCP Jobs</title><style> :root{--ok:#22c55e;--warn:#f59e0b;--err:#ef4444;--muted:#6b7280;--bg:#0b1220;--card:#0f172a;--fg:#e5e7eb;--chip:#1f2937} body{font-family:Inter,system-ui,Segoe UI,Roboto,Arial,sans-serif;margin:0;background:var(--bg);color:var(--fg)} header{display:flex;gap:8px;align-items:center;padding:12px 16px;background:linear-gradient(180deg,#0b1220,#0b122000)} input,button{border-radius:10px;border:1px solid #334155;background:#0b1220;color:var(--fg);padding:8px 10px} button{background:#1d4ed8;border-color:#1e40af;cursor:pointer} main{display:grid;grid-template-columns: 1.2fr 0.8fr;gap:12px;padding:12px} .card{background:var(--card);border:1px solid #1f2a44;border-radius:14px;box-shadow:0 8px 24px #0008;overflow:hidden} .title{padding:10px 12px;font-weight:600;border-bottom:1px solid #1f2a44;background:#0b1224} .lane{display:flex;flex-direction:column;gap:8px;padding:10px;max-height:64vh;overflow:auto} .row{display:flex;align-items:center;gap:8px} .chip{background:var(--chip);border:1px solid #374151;padding:2px 8px;border-radius:999px;font-size:12px;color:#cbd5e1} .ok{color:var(--ok)}.warn{color:var(--warn)}.err{color:var(--err)} .log{white-space:pre-wrap;font-family:ui-monospace,monospace;padding:10px;max-height:64vh;overflow:auto} </style></head><body> <header> <strong>MCP Job Stream</strong> <input id="job" placeholder="job_id" style="width:320px"> <button id="go">Connect</button> <span id="status" class="chip">idle</span> </header> <main> <section class="card"> <div class="title">Agents</div> <div class="lane" id="agents"></div> </section> <section class="card"> <div class="title">Synthesis</div> <div class="log" id="log"></div> </section> </main> <script> const logEl=document.getElementById('log'); const agentsEl=document.getElementById('agents'); const statusEl=document.getElementById('status'); const rows=new Map(); function addAgentRow(id,text,cls){ let r=rows.get(id); if(!r){ r=document.createElement('div'); r.className='row'; r.innerHTML='<span class="chip">agent '+id+'</span><span class="chip" id="st"></span><span id="q" class="muted"></span>'; agentsEl.appendChild(r); rows.set(id,r); } r.querySelector('#st').textContent=text; r.querySelector('#st').className='chip '+(cls||''); } function appendLog(s){ logEl.textContent += s; logEl.scrollTop = logEl.scrollHeight; } document.getElementById('go').onclick=()=>{ logEl.textContent=''; agentsEl.textContent=''; rows.clear(); statusEl.textContent='connecting…'; const id=document.getElementById('job').value.trim(); if(!id){ alert('enter job id'); return; } const es=new EventSource('/jobs/'+id+'/events'); es.addEventListener('open', e=>{ statusEl.textContent='open'; statusEl.className='chip'; }); es.addEventListener('progress', e=>{ /* generic progress hook */ }); es.addEventListener('agent_started', e=>{ const p=JSON.parse(e.data).payload||JSON.parse(e.data); addAgentRow(p.agent_id,'started','warn'); }); es.addEventListener('agent_completed', e=>{ const p=JSON.parse(e.data).payload||JSON.parse(e.data); addAgentRow(p.agent_id, p.ok===false?'failed':'done', p.ok===false?'err':'ok'); }); es.addEventListener('agent_usage', e=>{ const p=JSON.parse(e.data).payload||JSON.parse(e.data); addAgentRow(p.agent_id, 'tokens:'+ (p.usage?.total_tokens||'?'), ''); }); es.addEventListener('synthesis_token', e=>{ const d=JSON.parse(e.data).payload||JSON.parse(e.data); appendLog(d.content); }); es.addEventListener('synthesis_error', e=>{ const d=JSON.parse(e.data).payload||JSON.parse(e.data); appendLog('\n[error] '+(d.error||'')+'\n'); }); es.addEventListener('report_saved', e=>{ const d=JSON.parse(e.data).payload||JSON.parse(e.data); appendLog('\n[report] id='+d.report_id+'\n'); }); es.addEventListener('complete', e=>{ statusEl.textContent='complete'; es.close(); }); es.onerror=()=>{ statusEl.textContent='error'; statusEl.className='chip err'; }; }; </script> </body></html>`); }); // Client demo: tool catalog UI (client-side), server performs semantic ranking app.get('/client/tools', async (req, res) => { try { const q = typeof req.query.q === 'string' ? req.query.q : undefined; const limit = req.query.limit ? Math.max(1, Math.min(200, Number(req.query.limit))) : undefined; const semantic = req.query.semantic === 'false' ? false : true; const text = await require('./tools').listToolsTool({ query: q, limit: limit || 50, semantic }); res.setHeader('Content-Type', 'application/json'); res.end(text); } catch (e) { res.status(500).json({ error: e.message }); } }); // Endpoint for messages with per-connection routing and authentication // Supports both legacy (no connectionId) and new path/query param routing app.post(['/messages', '/messages/:connectionId'], authenticate, express.json(), (req, res) => { // Prefer explicit connectionId via route param or query const routeId = req.params.connectionId; const queryId = req.query.connectionId; const connectionId = routeId || queryId || null; if (connectionId) { const transport = sseConnections.get(connectionId); if (!transport) { console.error(`[${new Date().toISOString()}] Received POST /messages for unknown connectionId: ${connectionId}`); return res.status(404).json({ error: 'Unknown connectionId' }); } console.error(`[${new Date().toISOString()}] Routing POST /messages to connectionId: ${connectionId}`); return transport.handlePostMessage(req, res); } // Legacy behavior: fall back to last transport if no connectionId provided if (!lastSseTransport) { console.error(`[${new Date().toISOString()}] Received POST /messages without connectionId and no active SSE transport found.`); return res.status(500).json({ error: 'No active SSE transport available' }); } console.error(`[${new Date().toISOString()}] Handling legacy POST /messages via last active SSE transport.`); return lastSseTransport.handlePostMessage(req, res); }); // Start server app.listen(port, () => { console.error(`MCP server listening on port ${port}`); // Use error }); } // Close the else block for HTTP setup }; // Start the server setupTransports().catch(error => { console.error('Failed to start MCP server:', error.message); // Keep error process.exit(1); }); // Start in-process job worker (async function startJobWorker(){ const { concurrency, heartbeatMs } = require('../../config').jobs; const runners = Array.from({ length: Math.max(1, concurrency) }, () => (async function loop(){ while (true) { try { const job = await dbClient.claimNextJob(); if (!job) { await new Promise(r=>setTimeout(r, 750)); continue; } const jobId = job.id; await dbClient.appendJobEvent(jobId, 'started', {}); const hb = setInterval(()=> dbClient.heartbeatJob(jobId).catch(()=>{}), Math.max(1000, heartbeatMs)); try { if (job.type === 'research') { // Reuse conductResearch flow but stream events via job events const params = typeof job.params === 'string' ? JSON.parse(job.params) : job.params; // Minimal bridge: send progress chunks into job events const exchange = { progressToken: 'job', sendProgress: ({ value }) => dbClient.appendJobEvent(jobId, 'progress', value || {}) }; const resultText = await require('./tools').conductResearch(params, exchange, jobId); await dbClient.setJobStatus(jobId, 'succeeded', { result: { message: resultText }, finished: true }); await dbClient.appendJobEvent(jobId, 'completed', { message: resultText }); // Optional webhook notification try { if (params?.notify) { await nodeFetch(params.notify, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ job_id: jobId, status: 'succeeded', message: resultText }) }).catch(()=>{}); } } catch (_) {} } else { await dbClient.setJobStatus(jobId, 'failed', { result: { error: 'Unknown job type' }, finished: true }); await dbClient.appendJobEvent(jobId, 'error', { message: 'Unknown job type' }); // Notify if requested try { const params = typeof job.params === 'string' ? JSON.parse(job.params) : job.params; if (params?.notify) { await nodeFetch(params.notify, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ job_id: jobId, status: 'failed', error: 'Unknown job type' }) }).catch(()=>{}); } } catch (_) {} } } catch (e) { await dbClient.setJobStatus(jobId, 'failed', { result: { error: e.message }, finished: true }); await dbClient.appendJobEvent(jobId, 'error', { message: e.message }); try { const params = typeof job.params === 'string' ? JSON.parse(job.params) : job.params; if (params?.notify) { await nodeFetch(params.notify, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ job_id: jobId, status: 'failed', error: e.message }) }).catch(()=>{}); } } catch (_) {} } finally { clearInterval(hb); } } catch (_) { await new Promise(r=>setTimeout(r, 1000)); } } })()); await Promise.allSettled(runners); })();

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wheattoast11/openrouter-deep-research-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server