Skip to main content
Glama

MCP Auth

by rubenpenap
index.test.ts9.4 kB
import { invariant } from '@epic-web/invariant' import { type JSONRPCMessage, JSONRPCMessageSchema, } from '@modelcontextprotocol/sdk/types.js' import { test, expect, inject } from 'vitest' const mcpServerPort = inject('mcpServerPort') const EPIC_ME_AUTH_SERVER_URL = 'http://localhost:7788' const mcpServerUrl = `http://localhost:${mcpServerPort}` test(`prompts are not visible if the user does not have the required scopes`, async () => { const tokenResult = await getAuthToken({ scopes: [] }) const response = await initialize(tokenResult.access_token) const sessionId = response.headers.get('mcp-session-id') invariant( sessionId, '🚨 initialization response should have an MCP session ID header', ) // list prompts const promptsResponse = await fetch(`${mcpServerUrl}/mcp`, { method: 'POST', headers: { 'mcp-session-id': sessionId, accept: 'application/json, text/event-stream', 'Content-Type': 'application/json', Authorization: `Bearer ${tokenResult.access_token}`, }, body: JSON.stringify({ jsonrpc: '2.0', id: crypto.randomUUID(), method: 'prompts/list', }), }) const promptsResponseData = await handleStreamableResponse(promptsResponse) expect( promptsResponseData, '🚨 there should be no prompts available', ).toEqual([ { error: { code: -32601, message: 'Method not found', }, id: expect.any(String), jsonrpc: '2.0', }, ]) }) test(`prompts are visible if the user has the required scopes`, async () => { const tokenResult = await getAuthToken({ scopes: ['entries:read', 'tags:read'], }) const response = await initialize(tokenResult.access_token) const sessionId = response.headers.get('mcp-session-id') invariant( sessionId, '🚨 initialization response should have an MCP session ID header', ) const promptsResponse = await fetch(`${mcpServerUrl}/mcp`, { method: 'POST', headers: { 'mcp-session-id': sessionId, accept: 'application/json, text/event-stream', 'Content-Type': 'application/json', Authorization: `Bearer ${tokenResult.access_token}`, }, body: JSON.stringify({ jsonrpc: '2.0', id: crypto.randomUUID(), method: 'prompts/list', }), }) const promptsResponseData = await handleStreamableResponse(promptsResponse) expect( promptsResponseData, '🚨 the suggest_tags prompt should be available with entries:read and tags:read scopes', ).toEqual([ { id: expect.any(String), jsonrpc: '2.0', result: { prompts: [ expect.objectContaining({ name: 'suggest_tags', }), ], }, }, ]) }) type Scopes = | 'user:read' | 'user:write' | 'entries:read' | 'entries:write' | 'tags:read' | 'tags:write' async function getAuthToken({ scopes }: { scopes: Array<Scopes> }) { const redirectUri = `https://example.com/test-mcp-client` const clientRegistrationResponse = await fetch( `${EPIC_ME_AUTH_SERVER_URL}/oauth/register`, { method: 'POST', headers: { 'content-type': 'application/json', accept: 'application/json, text/event-stream', }, body: JSON.stringify({ client_name: 'Test MCP Client', redirect_uris: [redirectUri], }), }, ) expect( clientRegistrationResponse.ok, '🚨 Client registration should succeed', ).toBe(true) const clientRegistration = (await clientRegistrationResponse.json()) as ClientRegistration expect( clientRegistration.client_id, '🚨 Client ID should be returned from registration', ).toBeTruthy() const { codeVerifier, codeChallenge, codeChallengeMethod } = generateCodeChallenge() const state = crypto.randomUUID() const testAuthUrl = new URL(`${EPIC_ME_AUTH_SERVER_URL}/test-auth`) // Use the registered client ID instead of the one from the auth URL testAuthUrl.searchParams.set('client_id', clientRegistration.client_id) testAuthUrl.searchParams.set('redirect_uri', redirectUri) testAuthUrl.searchParams.set('response_type', 'code') testAuthUrl.searchParams.set('code_challenge', codeChallenge) testAuthUrl.searchParams.set('code_challenge_method', codeChallengeMethod) testAuthUrl.searchParams.set('scope', scopes.join(' ')) testAuthUrl.searchParams.set('state', state) const authCodeResponse = await fetch(testAuthUrl.toString()) expect(authCodeResponse.ok, '🚨 Auth code request should succeed').toBe(true) const authResult = (await authCodeResponse.json()) as AuthResult expect( authResult.redirectTo, '🚨 Redirect URL should be returned', ).toBeTruthy() const redirectUrl = new URL(authResult.redirectTo) const authCode = redirectUrl.searchParams.get('code') const returnedState = redirectUrl.searchParams.get('state') expect( authCode, '🚨 Auth code should be present in redirect URL', ).toBeTruthy() expect(returnedState, '🚨 State should be returned').toBe(state) const tokenParams = new URLSearchParams({ grant_type: 'authorization_code', code: authCode!, redirect_uri: redirectUri, client_id: clientRegistration.client_id, // Use registered client ID code_verifier: codeVerifier, }) // Add client_secret if provided during registration if (clientRegistration.client_secret) { tokenParams.set('client_secret', clientRegistration.client_secret) } const tokenResponse = await fetch(`${EPIC_ME_AUTH_SERVER_URL}/oauth/token`, { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: tokenParams, }) if (!tokenResponse.ok) { const errorText = await tokenResponse.text() console.error('Token exchange failed:', tokenResponse.status, errorText) } expect(tokenResponse.ok, '🚨 Token exchange should succeed').toBe(true) const tokenResult = (await tokenResponse.json()) as TokenResult expect( tokenResult.access_token, '🚨 Access token should be returned', ).toBeTruthy() expect( tokenResult.token_type?.toLowerCase(), '🚨 Token type should be Bearer', ).toBe('bearer') return tokenResult } async function initialize(accessToken: string) { const authTestResponse = await fetch(`${mcpServerUrl}/mcp`, { method: 'POST', headers: { 'Content-Type': 'application/json', Accept: 'application/json, text/event-stream', Authorization: `Bearer ${accessToken}`, }, body: JSON.stringify({ jsonrpc: '2.0', id: crypto.randomUUID(), method: 'initialize', params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'Test Client', version: '1.0.0', }, }, }), }) expect( authTestResponse.status, '🚨 Should not get 401 Unauthorized with valid token', ).not.toBe(401) return authTestResponse } async function handleStreamableResponse(response: Response) { if (response.headers.get('content-type')?.includes('text/event-stream')) { const stream = response.body if (!stream) { throw new Error('No response body available for streaming') } const messages: Array<JSONRPCMessage> = [] try { // Create a pipeline: binary stream -> text decoder const reader = stream.pipeThrough(new TextDecoderStream()).getReader() let buffer = '' let messageReceived = false while (true) { const { value: chunk, done } = await reader.read() if (done) { break } buffer += chunk // Process complete SSE messages const lines = buffer.split('\n') buffer = lines.pop() || '' // Keep incomplete line in buffer let eventData = '' let inData = false for (const line of lines) { if (line.trim() === '') { // Empty line indicates end of event if (eventData && inData) { try { const message = JSONRPCMessageSchema.parse( JSON.parse(eventData), ) messages.push(message) messageReceived = true // Close the connection after receiving the first message // to prevent hanging if the server doesn't close the stream void reader.cancel().catch(() => {}) break } catch (error) { console.error('Failed to parse SSE message:', error) // Continue processing other messages even if one fails } } eventData = '' inData = false } else if (line.startsWith('data: ')) { eventData += line.slice(6) // Remove 'data: ' prefix inData = true } // Ignore other SSE fields like 'event:', 'id:', etc. } // Break out of the main loop if we've received a message and cancelled if (messageReceived) { break } } } catch (error) { console.error('SSE stream error:', error) throw new Error(`SSE stream disconnected: ${error}`) } return messages } else { return response.json() } } // TypeScript interfaces for API responses interface AuthServerConfig { authorization_endpoint: string token_endpoint: string [key: string]: unknown } interface ClientRegistration { client_id: string client_secret?: string [key: string]: unknown } interface AuthResult { redirectTo: string [key: string]: unknown } interface TokenResult { access_token: string token_type: string [key: string]: unknown } // Helper function to generate PKCE challenge function generateCodeChallenge() { const codeVerifier = btoa( String.fromCharCode(...crypto.getRandomValues(new Uint8Array(32))), ) .replace(/\+/g, '-') .replace(/\//g, '_') .replace(/=/g, '') return { codeVerifier, codeChallenge: codeVerifier, // For simplicity, using plain method codeChallengeMethod: 'plain', } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rubenpenap/mcp-auth'

If you have feedback or need assistance with the MCP directory API, please join our Discord server