Skip to main content
Glama

Cisco MCP Pods Server

by ruegreen
test-server.js15 kB
#!/usr/bin/env node /** * Cisco MCP Pods Server - Local Test Suite * Tests both SSE and Streamable HTTP transports, health checks, and tool connectivity */ const config = { serverUrl: 'http://localhost:1013', serverPath: '/CiscoMCPPods', }; // ANSI color codes const colors = { reset: '\x1b[0m', bright: '\x1b[1m', dim: '\x1b[2m', red: '\x1b[31m', green: '\x1b[32m', yellow: '\x1b[33m', blue: '\x1b[34m', magenta: '\x1b[35m', cyan: '\x1b[36m', white: '\x1b[37m', }; const symbols = { success: '✓', error: '✗', info: 'ℹ', rocket: '🚀', check: '🔍', server: '🖥️', warning: '⚠️', http: '📡', sse: '📨', }; let testResults = { passed: 0, failed: 0, tests: [], }; let detectedTransport = null; // Will be 'sse' or 'http' function log(message, color = colors.reset) { console.log(`${color}${message}${colors.reset}`); } function logTest(name, passed, details = '') { const symbol = passed ? symbols.success : symbols.error; const color = passed ? colors.green : colors.red; const status = passed ? 'PASS' : 'FAIL'; log(` ${symbol} ${name} - ${color}${status}${colors.reset}`, color); if (details) { log(` ${colors.dim}${details}${colors.reset}`); } testResults.tests.push({ name, passed, details }); if (passed) testResults.passed++; else testResults.failed++; } function printHeader(title) { const line = '═'.repeat(60); log(`\n${colors.cyan}${line}${colors.reset}`); log(`${colors.cyan}${colors.bright} ${title}${colors.reset}`); log(`${colors.cyan}${line}${colors.reset}\n`); } function printSummary() { const total = testResults.passed + testResults.failed; const successRate = ((testResults.passed / total) * 100).toFixed(1); printHeader('Test Summary'); log(` Total Tests: ${colors.bright}${total}${colors.reset}`); log(` ${colors.green}Passed: ${testResults.passed}${colors.reset}`); log(` ${colors.red}Failed: ${testResults.failed}${colors.reset}`); log(` Success Rate: ${colors.cyan}${successRate}%${colors.reset}`); if (testResults.failed === 0) { log(`\n ${symbols.success} ${colors.green}${colors.bright}All tests passed!${colors.reset}\n`); } else { log(`\n ${symbols.error} ${colors.red}${colors.bright}Some tests failed${colors.reset}\n`); log(`${colors.yellow}Failed Tests:${colors.reset}`); testResults.tests .filter(t => !t.passed) .forEach(t => log(` - ${t.name}: ${t.details}`, colors.red)); log(''); } } async function testHealthEndpoint() { printHeader(`${symbols.check} Health Check Test`); try { const url = `${config.serverUrl}${config.serverPath}/health`; log(` Testing: ${colors.dim}${url}${colors.reset}`); const response = await fetch(url); const data = await response.json(); const healthOk = response.ok && data.status === 'healthy'; logTest( 'Health endpoint responding', healthOk, healthOk ? `Service: ${data.service}` : `Status: ${response.status}` ); const hasCorrectService = data.service === 'cisco-mcp-retail'; logTest( 'Service name correct', hasCorrectService, `Expected: cisco-mcp-retail, Got: ${data.service}` ); const hasApiUrl = !!data.apiBaseUrl; logTest( 'API base URL configured', hasApiUrl, `URL: ${data.apiBaseUrl}` ); // Detect transport type from health response if (data.transport) { if (data.transport === 'streamable-http') { detectedTransport = 'http'; log(`\n ${symbols.http} ${colors.cyan}Detected: Streamable HTTP transport${colors.reset}`); } else if (data.transport === 'sse') { detectedTransport = 'sse'; log(`\n ${symbols.sse} ${colors.cyan}Detected: SSE transport${colors.reset}`); } } return healthOk; } catch (error) { logTest('Health endpoint responding', false, error.message); return false; } } async function testStreamableHTTP() { printHeader(`${symbols.http} Streamable HTTP Tests`); // Load API key from .env file let apiKey = null; try { const fs = await import('fs'); const path = await import('path'); const { fileURLToPath } = await import('url'); const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const envContent = fs.readFileSync(path.join(__dirname, '.env'), 'utf8'); const match = envContent.match(/MCP_API_KEY=(.+)/); apiKey = match ? match[1].trim() : null; } catch (error) { log(` ${symbols.warning} ${colors.yellow}Could not load API key from .env${colors.reset}`); } try { // Test 1: Initialize session const initUrl = `${config.serverUrl}${config.serverPath}/mcp`; log(` Testing: ${colors.dim}POST ${initUrl}${colors.reset}`); const initHeaders = { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream', }; // Add API key if available if (apiKey) { initHeaders['X-API-Key'] = apiKey; } const initResponse = await fetch(initUrl, { method: 'POST', headers: initHeaders, body: JSON.stringify({ jsonrpc: '2.0', id: 1, method: 'initialize', params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'test-client', version: '1.0.0', }, }, }), signal: AbortSignal.timeout(5000), // 5 second timeout }); const initData = await initResponse.json(); const sessionId = initResponse.headers.get('mcp-session-id'); const initOk = initResponse.ok && initData.result; logTest( 'MCP initialization', initOk, initOk ? `Protocol: ${initData.result.protocolVersion}` : `Status: ${initResponse.status}` ); const hasSessionId = !!sessionId; logTest( 'Session ID assigned', hasSessionId, hasSessionId ? `Session: ${sessionId.substring(0, 8)}...` : 'No session ID in response' ); if (!sessionId) { log(` ${symbols.warning} ${colors.yellow}Cannot continue Streamable HTTP tests without session ID${colors.reset}\n`); return false; } // Test 2: List tools with session const toolsUrl = `${config.serverUrl}${config.serverPath}/mcp`; const toolsHeaders = { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream', 'Mcp-Session-Id': sessionId, }; // Add API key if available if (apiKey) { toolsHeaders['X-API-Key'] = apiKey; } const toolsResponse = await fetch(toolsUrl, { method: 'POST', headers: toolsHeaders, body: JSON.stringify({ jsonrpc: '2.0', id: 2, method: 'tools/list', params: {}, }), signal: AbortSignal.timeout(5000), // 5 second timeout }); const toolsData = await toolsResponse.json(); const toolsOk = toolsResponse.ok && toolsData.result && toolsData.result.tools; logTest( 'Tools list request (with session)', toolsOk, toolsOk ? `Found ${toolsData.result.tools.length} tools` : `Status: ${toolsResponse.status}` ); // Test 3: SSE stream connection const sseController = new AbortController(); const sseTimeout = setTimeout(() => sseController.abort(), 3000); try { const sseHeaders = { 'Accept': 'text/event-stream', 'Mcp-Session-Id': sessionId, }; // Add API key if available if (apiKey) { sseHeaders['X-API-Key'] = apiKey; } const sseResponse = await fetch(`${config.serverUrl}${config.serverPath}/mcp`, { method: 'GET', headers: sseHeaders, signal: sseController.signal, }); clearTimeout(sseTimeout); const sseOk = sseResponse.ok; logTest( 'SSE stream connection', sseOk, sseOk ? 'Stream established' : `Status: ${sseResponse.status}` ); sseController.abort(); } catch (error) { clearTimeout(sseTimeout); if (error.name !== 'AbortError') { logTest('SSE stream connection', false, error.message); } } return initOk; } catch (error) { logTest('Streamable HTTP tests', false, error.message); return false; } } async function testSSEConnection() { printHeader(`${symbols.sse} SSE Connection Test`); return new Promise((resolve) => { const url = `${config.serverUrl}${config.serverPath}/sse`; log(` Testing: ${colors.dim}${url}${colors.reset}`); const controller = new AbortController(); const timeout = setTimeout(() => { controller.abort(); logTest('SSE endpoint timeout', false, 'Connection timed out after 5s'); resolve(false); }, 5000); fetch(url, { signal: controller.signal, headers: { 'Accept': 'text/event-stream', }, }) .then(response => { clearTimeout(timeout); const connectionOk = response.ok; logTest( 'SSE connection established', connectionOk, `Status: ${response.status}` ); const hasCorrectType = response.headers.get('content-type')?.includes('text/event-stream'); logTest( 'SSE content-type header', hasCorrectType, `Content-Type: ${response.headers.get('content-type')}` ); controller.abort(); resolve(connectionOk); }) .catch(error => { clearTimeout(timeout); if (error.name !== 'AbortError') { logTest('SSE connection established', false, error.message); } resolve(false); }); }); } async function testRootEndpoint() { printHeader(`${symbols.info} Server Info Test`); try { const url = `${config.serverUrl}/`; log(` Testing: ${colors.dim}${url}${colors.reset}`); const response = await fetch(url); const data = await response.json(); const rootOk = response.ok; logTest( 'Root endpoint responding', rootOk, `Name: ${data.name}` ); const hasEndpoints = data.endpoints && Object.keys(data.endpoints).length > 0; logTest( 'Endpoint information available', hasEndpoints, hasEndpoints ? `Endpoints: ${Object.keys(data.endpoints).join(', ')}` : 'No endpoints found' ); const hasTransport = !!data.transport; logTest( 'Transport type declared', hasTransport, hasTransport ? `Transport: ${data.transport}` : 'No transport info' ); return rootOk; } catch (error) { logTest('Root endpoint responding', false, error.message); return false; } } async function testMessageEndpoint() { printHeader(`${symbols.rocket} Message Endpoint Test (SSE)`); try { const url = `${config.serverUrl}${config.serverPath}/messages`; log(` Testing: ${colors.dim}POST ${url}${colors.reset}`); const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ jsonrpc: '2.0', id: 1, method: 'tools/list', params: {}, }), }); const messageOk = response.ok; logTest( 'Message endpoint accepting requests', messageOk, `Status: ${response.status}` ); if (messageOk) { const data = await response.json(); const hasTools = data.result && data.result.tools; logTest( 'Tools list returned', hasTools, hasTools ? `Found ${data.result.tools.length} tools` : 'No tools in response' ); } return messageOk; } catch (error) { logTest('Message endpoint accepting requests', false, error.message); return false; } } async function checkServerRunning() { printHeader(`${symbols.server} Server Status Check`); try { const response = await fetch(`${config.serverUrl}${config.serverPath}/health`, { signal: AbortSignal.timeout(3000), }); if (response.ok) { log(` ${symbols.success} ${colors.green}Server is running on port 3010${colors.reset}\n`); return true; } else { log(` ${symbols.error} ${colors.red}Server responded with status ${response.status}${colors.reset}\n`); return false; } } catch (error) { log(` ${symbols.error} ${colors.red}Server is not running or not reachable${colors.reset}`); log(` ${colors.yellow}${symbols.info} Start the server with:${colors.reset}`); log(` ${colors.cyan}npm run start:sse${colors.reset} (for SSE transport)`); log(` ${colors.cyan}npm run start:http${colors.reset} (for Streamable HTTP transport)${colors.reset}\n`); return false; } } async function runAllTests() { console.clear(); log(`${colors.cyan}${colors.bright}`); log('╔═══════════════════════════════════════════════════════════╗'); log('║ ║'); log('║ 🚀 CISCO MCP RETAIL SERVER TEST SUITE 🚀 ║'); log('║ ║'); log('╚═══════════════════════════════════════════════════════════╝'); log(colors.reset); log(`${colors.dim} Server: ${config.serverUrl}${config.serverPath}${colors.reset}`); log(`${colors.dim} Time: ${new Date().toLocaleString()}${colors.reset}\n`); // Check if server is running first const serverRunning = await checkServerRunning(); if (!serverRunning) { log(`${colors.red}${colors.bright}Cannot run tests - server is not running!${colors.reset}\n`); process.exit(1); } // Run common tests await testHealthEndpoint(); await testRootEndpoint(); // Run transport-specific tests based on detected transport if (detectedTransport === 'http') { log(`\n${colors.cyan}${colors.bright}Running Streamable HTTP transport tests...${colors.reset}`); await testStreamableHTTP(); } else if (detectedTransport === 'sse') { log(`\n${colors.cyan}${colors.bright}Running SSE transport tests...${colors.reset}`); await testSSEConnection(); await testMessageEndpoint(); } else { log(`\n${colors.yellow}${symbols.warning} Could not detect transport type, running both test suites...${colors.reset}`); // Try SSE first await testSSEConnection(); await testMessageEndpoint(); // Then try Streamable HTTP await testStreamableHTTP(); } // Print summary printSummary(); // Exit with appropriate code process.exit(testResults.failed > 0 ? 1 : 0); } // Run tests runAllTests().catch(error => { log(`\n${symbols.error} ${colors.red}Test suite crashed: ${error.message}${colors.reset}\n`); process.exit(1); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ruegreen/CiscoMCPPods'

If you have feedback or need assistance with the MCP directory API, please join our Discord server