Skip to main content
Glama

PubNub MCP Server

by pubnub
test_sse.js8.83 kB
#!/usr/bin/env node import { EventSource } from 'eventsource'; const SERVER_URL = 'http://localhost:3000'; const MCP_ENDPOINT = `${SERVER_URL}/mcp`; // ANSI color codes const colors = { green: '\x1b[32m', red: '\x1b[31m', yellow: '\x1b[33m', blue: '\x1b[34m', reset: '\x1b[0m' }; function log(message, color = 'reset') { console.log(`${colors[color]}${message}${colors.reset}`); } // Test 1: Initialize session async function testInitialize() { log('\n=== Test 1: Initialize Session ===', 'blue'); try { const initRequest = { jsonrpc: '2.0', method: 'initialize', params: { protocolVersion: '0.1.0', capabilities: { tools: {}, sampling: {} }, clientInfo: { name: 'test-client', version: '1.0.0' } }, id: 1 }; const response = await fetch(MCP_ENDPOINT, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' }, body: JSON.stringify(initRequest) }); const sessionId = response.headers.get('mcp-session-id'); const responseText = await response.text(); // Parse SSE format response let result; if (responseText.startsWith('event:')) { const dataLine = responseText.split('\n').find(line => line.startsWith('data:')); if (dataLine) { result = JSON.parse(dataLine.substring(5).trim()); } } else { result = JSON.parse(responseText); } log(`✓ Initialization successful`, 'green'); log(` Session ID: ${sessionId}`, 'yellow'); log(` Server name: ${result.result?.serverInfo?.name}`, 'yellow'); log(` Server version: ${result.result?.serverInfo?.version}`, 'yellow'); return sessionId; } catch (error) { log(`✗ Initialization failed: ${error.message}`, 'red'); throw error; } } // Test 2: Test SSE connection async function testSSE(sessionId) { log('\n=== Test 2: SSE Connection ===', 'blue'); return new Promise((resolve, reject) => { const eventSource = new EventSource(MCP_ENDPOINT, { headers: { 'mcp-session-id': sessionId } }); eventSource.onopen = () => { log('✓ SSE connection opened', 'green'); }; eventSource.onmessage = (event) => { log(`✓ Received SSE message: ${event.data}`, 'green'); }; eventSource.onerror = (error) => { // Don't reject on SSE errors, just log them // SSE will reconnect automatically log(`⚠ SSE connection issue (this is normal during testing)`, 'yellow'); }; // Keep connection open for 5 seconds to observe any server-sent events setTimeout(() => { eventSource.close(); log('✓ SSE connection closed after 5 seconds', 'yellow'); resolve(); }, 5000); }); } // Test 3: List available tools async function testListTools(sessionId) { log('\n=== Test 3: List Available Tools ===', 'blue'); try { const listToolsRequest = { jsonrpc: '2.0', method: 'tools/list', params: {}, id: 2 }; const response = await fetch(MCP_ENDPOINT, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream', 'mcp-session-id': sessionId }, body: JSON.stringify(listToolsRequest) }); const responseText = await response.text(); // Parse SSE format response let result; if (responseText.startsWith('event:')) { const dataLine = responseText.split('\n').find(line => line.startsWith('data:')); if (dataLine) { result = JSON.parse(dataLine.substring(5).trim()); } } else { result = JSON.parse(responseText); } log(`✓ Tools list retrieved`, 'green'); log(` Available tools:`, 'yellow'); result.result?.tools?.forEach(tool => { log(` - ${tool.name}: ${tool.description?.substring(0, 60)}...`, 'yellow'); }); return result.result?.tools || []; } catch (error) { log(`✗ List tools failed: ${error.message}`, 'red'); throw error; } } // Test 4: Call a tool async function testCallTool(sessionId) { log('\n=== Test 4: Call Tool (read_pubnub_resources) ===', 'blue'); try { const callToolRequest = { jsonrpc: '2.0', method: 'tools/call', params: { name: 'read_pubnub_resources', arguments: { document: 'pubnub_concepts' } }, id: 3 }; const response = await fetch(MCP_ENDPOINT, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream', 'mcp-session-id': sessionId }, body: JSON.stringify(callToolRequest) }); const responseText = await response.text(); // Parse SSE format response let result; if (responseText.startsWith('event:')) { const dataLine = responseText.split('\n').find(line => line.startsWith('data:')); if (dataLine) { result = JSON.parse(dataLine.substring(5).trim()); } } else { result = JSON.parse(responseText); } if (result.result) { log(`✓ Tool call successful`, 'green'); log(` Content length: ${result.result.content?.[0]?.text?.length || 0} characters`, 'yellow'); log(` First 100 chars: ${result.result.content?.[0]?.text?.substring(0, 100)}...`, 'yellow'); } else if (result.error) { log(`✗ Tool call returned error: ${result.error.message}`, 'red'); } return result; } catch (error) { log(`✗ Tool call failed: ${error.message}`, 'red'); throw error; } } // Test 5: Publish a PubNub message async function testPublishMessage(sessionId) { log('\n=== Test 5: Publish PubNub Message ===', 'blue'); try { const publishRequest = { jsonrpc: '2.0', method: 'tools/call', params: { name: 'publish_pubnub_message', arguments: { channel: 'test-channel', message: 'Hello from SSE test!' } }, id: 4 }; const response = await fetch(MCP_ENDPOINT, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream', 'mcp-session-id': sessionId }, body: JSON.stringify(publishRequest) }); const responseText = await response.text(); // Parse SSE format response let result; if (responseText.startsWith('event:')) { const dataLine = responseText.split('\n').find(line => line.startsWith('data:')); if (dataLine) { result = JSON.parse(dataLine.substring(5).trim()); } } else { result = JSON.parse(responseText); } if (result.result) { log(`✓ Message published successfully`, 'green'); log(` ${result.result.content?.[0]?.text}`, 'yellow'); } else if (result.error) { log(`✗ Publish failed: ${result.error.message}`, 'red'); } return result; } catch (error) { log(`✗ Publish failed: ${error.message}`, 'red'); throw error; } } // Test 6: Close session async function testCloseSession(sessionId) { log('\n=== Test 6: Close Session ===', 'blue'); try { const response = await fetch(MCP_ENDPOINT, { method: 'DELETE', headers: { 'mcp-session-id': sessionId } }); if (response.ok) { log(`✓ Session closed successfully`, 'green'); } else { log(`✗ Failed to close session: ${response.status} ${response.statusText}`, 'red'); } } catch (error) { log(`✗ Close session failed: ${error.message}`, 'red'); throw error; } } // Main test runner async function runTests() { log('\n🚀 Starting PubNub MCP SSE Tests', 'blue'); log(`Testing server at: ${SERVER_URL}`, 'yellow'); try { // Check if server is running try { await fetch(SERVER_URL); } catch (error) { log(`\n✗ Server is not running at ${SERVER_URL}`, 'red'); log(` Please start the server with: HTTP_PORT=3000 node index.js`, 'yellow'); process.exit(1); } // Run tests const sessionId = await testInitialize(); // Run SSE test in parallel with other tests const ssePromise = testSSE(sessionId); await testListTools(sessionId); await testCallTool(sessionId); await testPublishMessage(sessionId); // Wait for SSE test to complete await ssePromise; await testCloseSession(sessionId); log('\n✅ All tests completed successfully!', 'green'); } catch (error) { log(`\n❌ Tests failed: ${error.message}`, 'red'); process.exit(1); } } // Run the tests runTests();

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pubnub/pubnub-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server