Skip to main content
Glama
authentication.ts17 kB
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { z } from "zod"; import axios from "axios"; import { AuthManager, BasicAuthCredentials, TokenAuthCredentials, OAuth2Config } from "../utils/authManager.js"; /** * Register authentication security testing tools */ export function registerAuthenticationTools(server: McpServer) { // Basic authentication tool server.tool( "basic_auth", { username: z.string().describe("Username for authentication"), password: z.string().describe("Password for authentication"), }, async ({ username, password }) => { try { const authManager = AuthManager.getInstance(); const authState = await authManager.setBasicAuth({ username, password }); return { content: [ { type: "text", text: `Successfully set Basic authentication with username: ${username}\nAuthentication type: ${authState.type}\nHeader: Authorization: Basic ***`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Error setting Basic authentication: ${(error as Error).message}`, }, ], }; } } ); // Token authentication tool server.tool( "token_auth", { token: z.string().describe("Authentication token"), token_type: z.string().default("Bearer").describe("Token type (Bearer, JWT, etc.)"), refresh_token: z.string().optional().describe("Refresh token (if available)"), expires_in: z.number().optional().describe("Token expiration time in seconds"), }, async ({ token, token_type, refresh_token, expires_in }) => { try { const authManager = AuthManager.getInstance(); const authState = await authManager.setTokenAuth({ token, tokenType: token_type, refreshToken: refresh_token, expiresIn: expires_in, }); return { content: [ { type: "text", text: `Successfully set Token authentication\nAuthentication type: ${authState.type}\nToken type: ${token_type}\nHeader: Authorization: ${token_type} ***\n${authState.tokenExpiry ? `Token expires: ${authState.tokenExpiry.toISOString()}` : ''}`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Error setting Token authentication: ${(error as Error).message}`, }, ], }; } } ); // OAuth2 authentication tool server.tool( "oauth2_auth", { client_id: z.string().describe("OAuth2 client ID"), client_secret: z.string().optional().describe("OAuth2 client secret"), token_url: z.string().url().describe("OAuth2 token endpoint URL"), authorization_url: z.string().url().optional().describe("OAuth2 authorization endpoint URL (for authorization code flow)"), grant_type: z.enum(['client_credentials', 'password', 'authorization_code', 'refresh_token']).default('client_credentials').describe("OAuth2 grant type"), username: z.string().optional().describe("Username (for password grant type)"), password: z.string().optional().describe("Password (for password grant type)"), scope: z.string().optional().describe("OAuth2 scope"), redirect_uri: z.string().optional().describe("Redirect URI (for authorization code flow)"), }, async ({ client_id, client_secret, token_url, authorization_url, grant_type, username, password, scope, redirect_uri }) => { try { const authManager = AuthManager.getInstance(); // Validate required parameters for specific grant types if (grant_type === 'password' && (!username || !password)) { throw new Error("Username and password are required for password grant type"); } if (grant_type === 'authorization_code' && !redirect_uri) { throw new Error("Redirect URI is required for authorization code grant type"); } // Configure OAuth2 const config: OAuth2Config = { clientId: client_id, clientSecret: client_secret, tokenUrl: token_url, authorizationUrl: authorization_url || "", grantType: grant_type as any, username, password, scope, redirectUri: redirect_uri }; const authState = await authManager.authenticateWithOAuth2(config); return { content: [ { type: "text", text: `Successfully authenticated with OAuth2\nGrant type: ${grant_type}\nToken type: ${authState.token ? 'Bearer' : 'Unknown'}\n${authState.tokenExpiry ? `Token expires: ${authState.tokenExpiry.toISOString()}` : ''}`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Error authenticating with OAuth2: ${(error as Error).message}`, }, ], }; } } ); // Custom API login tool server.tool( "api_login", { login_url: z.string().url().describe("API login endpoint URL"), credentials: z.record(z.string()).describe("Login credentials as key-value pairs"), method: z.enum(['post', 'get']).default('post').describe("HTTP method to use"), token_path: z.string().default("token").describe("Path to token in the response (e.g., 'data.accessToken')"), token_prefix: z.string().default("Bearer").describe("Token prefix to use in Authorization header"), header_name: z.string().default("Authorization").describe("Header name to use for the token"), }, async ({ login_url, credentials, method, token_path, token_prefix, header_name }) => { try { const authManager = AuthManager.getInstance(); const authState = await authManager.authenticateWithApi( login_url, credentials, { method, tokenPath: token_path, tokenPrefix: token_prefix, headerName: header_name } ); return { content: [ { type: "text", text: `Successfully authenticated with API login\nEndpoint: ${login_url}\nToken header: ${header_name}: ${token_prefix} ***\n${authState.tokenExpiry ? `Token expires: ${authState.tokenExpiry.toISOString()}` : ''}`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Error logging in to API: ${(error as Error).message}`, }, ], }; } } ); // Get current auth status server.tool( "auth_status", {}, async () => { const authManager = AuthManager.getInstance(); const authState = authManager.getAuthState(); let statusText = ""; if (authState.type === 'none') { statusText = "No authentication configured. Use basic_auth, token_auth, oauth2_auth, or api_login to authenticate."; } else { statusText = `Current authentication type: ${authState.type}\n`; if (authState.type === 'basic') { statusText += `Username: ${authState.username}\n`; statusText += `Authentication header: Authorization: Basic ***\n`; } else if (authState.type === 'token' || authState.type === 'oauth2') { statusText += `Token: ${authState.token?.substring(0, 10)}***\n`; if (authState.refreshToken) { statusText += `Refresh token: Available\n`; } if (authState.tokenExpiry) { const now = new Date(); const isExpired = now > authState.tokenExpiry; statusText += `Token expires: ${authState.tokenExpiry.toISOString()} (${isExpired ? 'EXPIRED' : 'Valid'})\n`; } if (authState.headers) { statusText += `Authentication headers: ${Object.keys(authState.headers).join(', ')}\n`; } } } return { content: [ { type: "text", text: statusText, }, ], }; } ); // Clear authentication server.tool( "clear_auth", {}, async () => { const authManager = AuthManager.getInstance(); authManager.clearAuth(); return { content: [ { type: "text", text: "Authentication cleared. The server is no longer authenticated.", }, ], }; } ); // Test for JWT weakness server.tool( "jwt_vulnerability_check", { jwt_token: z.string().describe("JWT token to analyze for vulnerabilities"), }, async ({ jwt_token }) => { try { // Split the token const parts = jwt_token.split("."); if (parts.length !== 3) { return { content: [ { type: "text", text: "Invalid JWT format. Expected 3 parts (header.payload.signature).", }, ], }; } // Decode header const headerBase64 = parts[0]; const headerJson = Buffer.from(headerBase64, "base64").toString(); const header = JSON.parse(headerJson); // Decode payload const payloadBase64 = parts[1]; const payloadJson = Buffer.from(payloadBase64, "base64").toString(); const payload = JSON.parse(payloadJson); // Check for security issues const issues = []; // Check algorithm if (header.alg === "none") { issues.push("Critical: 'none' algorithm used - authentication can be bypassed"); } if (header.alg === "HS256" || header.alg === "RS256") { // These are generally good, but we'll note it } else { issues.push(`Warning: Unusual algorithm ${header.alg} - verify if intended`); } // Check expiration if (!payload.exp) { issues.push("High: No expiration claim (exp) - token never expires"); } else { const expDate = new Date(payload.exp * 1000); const now = new Date(); if (expDate < now) { issues.push(`Info: Token expired on ${expDate.toISOString()}`); } else { const daysDiff = Math.floor((expDate.getTime() - now.getTime()) / (1000 * 60 * 60 * 24)); if (daysDiff > 30) { issues.push(`Medium: Long expiration time (${daysDiff} days) - consider shorter lifetime`); } } } // Check for missing recommended claims if (!payload.iat) { issues.push("Low: Missing 'issued at' claim (iat)"); } if (!payload.iss) { issues.push("Low: Missing 'issuer' claim (iss)"); } if (!payload.sub) { issues.push("Low: Missing 'subject' claim (sub)"); } return { content: [ { type: "text", text: issues.length > 0 ? `JWT Analysis Results:\n\nHeader: ${JSON.stringify(header, null, 2)}\n\nPayload: ${JSON.stringify(payload, null, 2)}\n\nSecurity Issues:\n${issues.join("\n")}` : `JWT Analysis Results:\n\nHeader: ${JSON.stringify(header, null, 2)}\n\nPayload: ${JSON.stringify(payload, null, 2)}\n\nNo security issues detected.`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Error analyzing JWT: ${(error as Error).message}`, }, ], }; } } ); // Test for authentication bypass server.tool( "auth_bypass_check", { endpoint: z.string().url().describe("API endpoint to test"), auth_header: z.string().optional().describe("Authentication header name (if different from standard)"), auth_token: z.string().optional().describe("Authentication token (if not using the currently authenticated session)"), http_method: z.enum(["GET", "POST", "PUT", "DELETE", "PATCH"]).default("GET").describe("HTTP method to use"), use_session_auth: z.boolean().default(true).describe("Whether to use the current session authentication if available"), }, async ({ endpoint, auth_header, auth_token, http_method, use_session_auth }) => { const results = []; const authManager = AuthManager.getInstance(); const currentAuthState = authManager.getAuthState(); const hasCurrentAuth = currentAuthState.type !== 'none' && use_session_auth; try { // Test 1: No authentication const noAuthResponse = await axios({ method: http_method.toLowerCase(), url: endpoint, validateStatus: () => true, // Accept any status code }); results.push({ test: "No Authentication", status: noAuthResponse.status, vulnerable: noAuthResponse.status < 400, // Vulnerable if not returning 4xx error details: `Response without authentication returned status code ${noAuthResponse.status}`, }); // Test 2: Invalid token const headerName = auth_header || "Authorization"; const invalidTokenResponse = await axios({ method: http_method.toLowerCase(), url: endpoint, headers: { [headerName]: "Bearer invalid_token_here", }, validateStatus: () => true, }); results.push({ test: "Invalid Token", status: invalidTokenResponse.status, vulnerable: invalidTokenResponse.status < 400, details: `Response with invalid token returned status code ${invalidTokenResponse.status}`, }); // Test 3: Empty token const emptyTokenResponse = await axios({ method: http_method.toLowerCase(), url: endpoint, headers: { [headerName]: "", }, validateStatus: () => true, }); results.push({ test: "Empty Token", status: emptyTokenResponse.status, vulnerable: emptyTokenResponse.status < 400, details: `Response with empty token returned status code ${emptyTokenResponse.status}`, }); // Test 4: If we have current auth or a provided token, test with valid auth if (hasCurrentAuth || auth_token) { let authHeaders = {}; if (hasCurrentAuth) { authHeaders = authManager.getAuthHeaders(); } else if (auth_token) { authHeaders = { [headerName]: `Bearer ${auth_token}`, }; } const validAuthResponse = await axios({ method: http_method.toLowerCase(), url: endpoint, headers: authHeaders, validateStatus: () => true, }); results.push({ test: "Valid Authentication", status: validAuthResponse.status, authorized: validAuthResponse.status < 400, details: `Response with valid authentication returned status code ${validAuthResponse.status}`, }); // Check if we get the same response with and without auth const authBypassRisk = noAuthResponse.status === validAuthResponse.status && noAuthResponse.status < 400 && JSON.stringify(noAuthResponse.data) === JSON.stringify(validAuthResponse.data); if (authBypassRisk) { results.push({ test: "Authentication Effectiveness", vulnerable: true, details: "CRITICAL: Endpoint returns the same response with and without authentication. Authentication may be ineffective.", }); } } return { content: [ { type: "text", text: `Authentication Bypass Test Results for ${endpoint}:\n\n${results.map(r => `Test: ${r.test}\n${r.status ? `Status: ${r.status}\n` : ''}${r.vulnerable !== undefined ? `Vulnerable: ${r.vulnerable}\n` : ''}${r.authorized !== undefined ? `Authorized: ${r.authorized}\n` : ''}Details: ${r.details}\n` ).join("\n")}`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Error testing authentication bypass: ${(error as Error).message}`, }, ], }; } } ); }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ricauts/CyberMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server