Skip to main content
Glama

MCP Auth Server

test_step10.sh22.1 kB
#!/bin/bash # Step 10: Keycloak Integration Test Script # This script tests the MCP server with Keycloak integration set -e # Colors for output RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' BLUE='\033[0;34m' NC='\033[0m' # No Color # Configuration KEYCLOAK_URL="http://localhost:8080" KEYCLOAK_REALM="mcp-realm" MCP_SERVER_URL="http://localhost:9000" CLIENT_ID="mcp-test-client" CLIENT_SECRET="mcp-secret-key-change-me" # Global variables MCP_SERVER_PID="" INSPECT_MODE=false # Parse command line arguments while [[ $# -gt 0 ]]; do case $1 in --inspect) INSPECT_MODE=true shift ;; -h|--help) echo "Usage: $0 [OPTIONS]" echo "" echo "Options:" echo " --inspect Pause after each test scenario to examine results" echo " -h, --help Show this help message" echo "" echo "Examples:" echo " $0 # Run all tests without pausing" echo " $0 --inspect # Run tests with pauses for inspection" exit 0 ;; *) echo "Unknown option: $1" echo "Use -h or --help for usage information" exit 1 ;; esac done echo -e "${BLUE}=== Step 10: Keycloak Integration Test ===${NC}" if [ "$INSPECT_MODE" = true ]; then echo -e "${YELLOW}Inspection mode enabled - will pause after each test scenario${NC}" fi # Helper functions print_info() { echo -e "\033[1;34mℹ️ \033[0m $1" >&2 } print_status() { echo -e "\033[1;32m✅\033[0m $1" >&2 } print_error() { echo -e "\033[1;31m❌\033[0m $1" >&2 } print_warning() { echo -e "${YELLOW}⚠️ $1${NC}" } decode_jwt() { local token=$1 print_info "Decoding JWT token..." >&2 print_info "Token length: ${#token}" >&2 print_info "Token starts with: ${token:0:20}..." >&2 # Split the token into parts IFS='.' read -r header_b64 payload_b64 signature_b64 <<< "$token" print_info "Header base64: $header_b64" >&2 print_info "Payload base64: $payload_b64" >&2 # Add padding to base64 if needed (JWT uses URL-safe base64 without padding) header_b64_padded=$(printf '%s' "$header_b64" | sed 's/-/+/g; s/_/\//g') payload_b64_padded=$(printf '%s' "$payload_b64" | sed 's/-/+/g; s/_/\//g') # Add padding while [ $((${#header_b64_padded} % 4)) -ne 0 ]; do header_b64_padded="${header_b64_padded}=" done while [ $((${#payload_b64_padded} % 4)) -ne 0 ]; do payload_b64_padded="${payload_b64_padded}=" done print_info "Header base64 (padded): $header_b64_padded" >&2 print_info "Payload base64 (padded): $payload_b64_padded" >&2 # Decode header local header=$(echo "$header_b64_padded" | base64 -d 2>/dev/null | jq '.' 2>/dev/null) print_info "Header: $header" >&2 # Decode payload local payload=$(echo "$payload_b64_padded" | base64 -d 2>/dev/null | jq '.' 2>/dev/null) print_info "Payload: $payload" >&2 # Check key fields local issuer=$(echo "$payload" | jq -r '.iss // empty') local audience=$(echo "$payload" | jq -r '.aud // empty') local subject=$(echo "$payload" | jq -r '.sub // empty') local username=$(echo "$payload" | jq -r '.preferred_username // empty') local scope=$(echo "$payload" | jq -r '.scope // empty') local azp=$(echo "$payload" | jq -r '.azp // empty') print_info "Key fields:" >&2 print_info " Issuer: $issuer" >&2 print_info " Audience: $audience" >&2 print_info " Subject: $subject" >&2 print_info " Username: $username" >&2 print_info " Scope: $scope" >&2 print_info " AZP: $azp" >&2 } # Helper function to extract 'scope' claim from JWT extract_scope_claim() { local token=$1 IFS='.' read -r _ payload_b64 _ <<< "$token" payload_b64_padded=$(printf '%s' "$payload_b64" | sed 's/-/+/g; s/_/\//g') while [ $((${#payload_b64_padded} % 4)) -ne 0 ]; do payload_b64_padded="${payload_b64_padded}=" done local scope=$(echo "$payload_b64_padded" | base64 -d 2>/dev/null | jq -r '.scope // empty' 2>/dev/null) echo "$scope" } # Assertion function for scopes assert_scopes() { local token="$1" local expected_scopes="$2" local user="$3" local actual_scopes=$(extract_scope_claim "$token") print_info "User: $user" print_info "Expected scopes: $expected_scopes" print_info "Actual scopes: $actual_scopes" if [[ "$actual_scopes" != "$expected_scopes" ]]; then print_error "Scope assertion failed for $user: expected '$expected_scopes', got '$actual_scopes'" exit 1 else print_status "Scope assertion passed for $user: $actual_scopes" fi } # Assertion function for mcp: scopes only assert_mcp_scopes() { local token="$1" local expected_mcp_scopes="$2" # e.g. "mcp:read mcp:tools" local user="$3" local actual_scopes=$(extract_scope_claim "$token") local actual_mcp_scopes=$(for s in $actual_scopes; do [[ $s == mcp:* ]] && echo $s; done | sort | xargs) local expected_sorted=$(for s in $expected_mcp_scopes; do echo $s; done | sort | xargs) print_info "User: $user" print_info "Expected mcp: scopes: $expected_sorted" print_info "Actual mcp: scopes: $actual_mcp_scopes" if [[ "$actual_mcp_scopes" != "$expected_sorted" ]]; then print_error "MCP scope assertion failed for $user: expected '$expected_sorted', got '$actual_mcp_scopes'" exit 1 else print_status "MCP scope assertion passed for $user: $actual_mcp_scopes" fi } # Cleanup function to stop MCP server cleanup() { if [ ! -z "$MCP_SERVER_PID" ]; then print_info "Stopping MCP server (PID: $MCP_SERVER_PID)..." kill $MCP_SERVER_PID 2>/dev/null || true wait $MCP_SERVER_PID 2>/dev/null || true print_status "MCP server stopped" fi } # Set up trap to cleanup on script exit trap cleanup EXIT # Check if Keycloak is running check_keycloak() { print_info "Checking if Keycloak is running..." if curl -s "$KEYCLOAK_URL/realms/master" > /dev/null 2>&1; then print_status "Keycloak is running" else print_error "Keycloak is not running. Please start Keycloak first:" echo " cd keycloak && docker-compose up -d" exit 1 fi } # Setup Keycloak with MCP configuration setup_keycloak() { print_info "Setting up Keycloak with MCP configuration..." if [ ! -f "keycloak/config.json" ]; then print_error "MCP Keycloak configuration file not found: keycloak/config.json" exit 1 fi cd keycloak uv run python setup_keycloak.py --config config.json --url "$KEYCLOAK_URL" --summary cd .. print_status "Keycloak setup completed" } # Start MCP server start_mcp_server() { print_info "Starting MCP server..." # Check if the step10.py file exists if [ ! -f "src/mcp_http/step10.py" ]; then print_error "MCP server file not found: src/mcp_http/step10.py" exit 1 fi # Start the MCP server in the background uv run python src/mcp_http/step10.py & MCP_SERVER_PID=$! print_info "MCP server started with PID: $MCP_SERVER_PID" # Wait for server to start print_info "Waiting for MCP server to start..." local max_attempts=30 local attempt=1 while [ $attempt -le $max_attempts ]; do if curl -s "$MCP_SERVER_URL/health" > /dev/null 2>&1; then print_status "MCP server is ready" return 0 fi print_info "Waiting for MCP server... (attempt $attempt/$max_attempts)" sleep 2 attempt=$((attempt + 1)) done print_error "MCP server failed to start within expected time" exit 1 } # Get access token from Keycloak get_token() { local username=$1 local password=$2 local scopes=$3 print_info "Getting token for user: $username with scopes: $scopes" >&2 local token_response=$(curl -s -X POST \ "$KEYCLOAK_URL/realms/$KEYCLOAK_REALM/protocol/openid-connect/token" \ -H "Content-Type: application/x-www-form-urlencoded" \ -d "grant_type=password" \ -d "client_id=$CLIENT_ID" \ -d "username=$username" \ -d "password=$password" \ -d "scope=$scopes") # Debug: Check the token response print_info "Token response from Keycloak:" >&2 echo "$token_response" | jq '.' >&2 2>/dev/null || echo "Raw response: $token_response" >&2 if echo "$token_response" | grep -q "access_token"; then # Extract just the access_token, ensuring it's clean local access_token=$(echo "$token_response" | jq -r '.access_token // empty') # Debug: Check the extracted token print_info "Extracted token details:" >&2 print_info "Token length: ${#access_token}" >&2 print_info "Token starts with: ${access_token:0:20}..." >&2 print_info "Token ends with: ...${access_token: -20}" >&2 # Check for common issues if [[ "$access_token" == "null" ]] || [[ "$access_token" == "empty" ]]; then print_error "Token is null or empty" >&2 return 1 fi if [[ -z "$access_token" ]]; then print_error "Token is empty" >&2 return 1 fi # Verify it looks like a JWT token (should start with eyJ) if [[ ! "$access_token" =~ ^eyJ ]]; then print_error "Token doesn't look like a valid JWT (should start with 'eyJ')" >&2 print_error "Token starts with: ${access_token:0:10}" >&2 return 1 fi print_status "Token obtained successfully" >&2 # Output only the token to stdout, nothing else echo -n "$access_token" else print_error "Failed to get token:" >&2 echo "$token_response" | jq '.' >&2 2>/dev/null || echo "Raw response: $token_response" >&2 return 1 fi } # Test MCP server health test_mcp_health() { print_info "Testing MCP server health..." local health_response=$(curl -s "$MCP_SERVER_URL/health") if echo "$health_response" | grep -q "keycloak_integration.*true"; then print_status "MCP server health check passed" echo "$health_response" | jq '.' else print_error "MCP server health check failed:" echo "$health_response" | jq '.' exit 1 fi } # Test MCP server without token (should fail) test_unauthorized() { print_info "Testing MCP server without token (should fail)..." local response=$(curl -s -w "%{http_code}" \ -X POST "$MCP_SERVER_URL/mcp" \ -H "Content-Type: application/json" \ -d '{"jsonrpc": "2.0", "id": 1, "method": "ping"}') local http_code="${response: -3}" local body="${response%???}" if [ "$http_code" = "401" ]; then print_status "Unauthorized request correctly rejected" else print_error "Expected 401, got $http_code" echo "$body" | jq '.' exit 1 fi } # Test MCP server with token test_authorized() { local token=$1 local method=$2 local params=$3 print_info "Testing MCP server with token for method: $method" # Build the JSON request local json_request="{\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"$method\"" # Add params if provided if [ ! -z "$params" ]; then json_request="$json_request, \"params\": $params" fi json_request="$json_request}" # Debug: print the JSON being sent print_info "Sending JSON: $json_request" # Debug: Check token before sending print_info "Token being used:" print_info "Token length: ${#token}" print_info "Token starts with: ${token:0:20}..." print_info "Token ends with: ...${token: -20}" # Use a temporary file for the JSON to avoid shell escaping issues local temp_json=$(mktemp) echo "$json_request" > "$temp_json" # Debug: Show the curl command being executed print_info "Executing curl command:" print_info "curl -s -X POST $MCP_SERVER_URL/mcp -H 'Content-Type: application/json' -H 'Authorization: Bearer ${token:0:20}...' -d @$temp_json" local response=$(curl -s \ -X POST "$MCP_SERVER_URL/mcp" \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $token" \ -d "@$temp_json") # Clean up temp file rm -f "$temp_json" # Debug: print raw response print_info "Raw response: $response" if echo "$response" | grep -q "result"; then print_status "Authorized request successful for $method" echo "$response" | jq '.result' # Check if scopes are included in response if echo "$response" | grep -q "userScopes"; then local response_scopes=$(echo "$response" | jq -r '.result.userScopes[]? // empty') print_info "User scopes in response: $response_scopes" fi else print_error "Authorized request failed for $method:" echo "$response" | jq '.' 2>/dev/null || echo "Raw response: $response" exit 1 fi } # Test scope-based authorization test_scope_authorization() { local token=$1 local method=$2 local should_succeed=$3 local params=$4 print_info "Testing scope authorization for $method (should $should_succeed)..." # Build the JSON request local json_request="{\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"$method\"" # Add params if provided if [ ! -z "$params" ]; then json_request="$json_request, \"params\": $params" fi json_request="$json_request}" # Use a temporary file for the JSON to avoid shell escaping issues local temp_json=$(mktemp) echo "$json_request" > "$temp_json" local response=$(curl -s \ -X POST "$MCP_SERVER_URL/mcp" \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $token" \ -d "@$temp_json") # Clean up temp file rm -f "$temp_json" if [ "$should_succeed" = "succeed" ]; then if echo "$response" | grep -q "result"; then print_status "Scope authorization test passed for $method" else print_error "Scope authorization test failed for $method (expected success):" echo "$response" | jq '.' 2>/dev/null || echo "Raw response: $response" exit 1 fi else if echo "$response" | grep -q "error.*Forbidden"; then print_status "Scope authorization test passed for $method (correctly denied)" else print_error "Scope authorization test failed for $method (expected failure):" echo "$response" | jq '.' 2>/dev/null || echo "Raw response: $response" exit 1 fi fi } # Test azp claim verification test_azp_verification() { local token=$1 local expected_azp=$2 print_info "Testing AZP claim verification (expected: $expected_azp)..." # Decode token to get azp claim IFS='.' read -r header_b64 payload_b64 signature_b64 <<< "$token" payload_b64_padded=$(printf '%s' "$payload_b64" | sed 's/-/+/g; s/_/\//g') while [ $((${#payload_b64_padded} % 4)) -ne 0 ]; do payload_b64_padded="${payload_b64_padded}=" done local actual_azp=$(echo "$payload_b64_padded" | base64 -d 2>/dev/null | jq -r '.azp // empty' 2>/dev/null) if [ "$actual_azp" = "$expected_azp" ]; then print_status "AZP claim verification passed: $actual_azp" else print_error "AZP claim verification failed. Expected: $expected_azp, Got: $actual_azp" exit 1 fi } # Pause function for inspection mode pause_for_inspection() { local scenario_name="$1" if [ "$INSPECT_MODE" = true ]; then echo "" echo -e "${BLUE}=== Test scenario completed: $scenario_name ===${NC}" echo -e "${YELLOW}Press Enter to continue to the next test scenario, or type 'n' to exit:${NC}" read -r response if [[ "$response" =~ ^[Nn]$ ]]; then echo -e "${YELLOW}Exiting tests as requested...${NC}" exit 0 fi echo "" fi } # Main test execution main() { print_info "Starting Step 10 Keycloak integration test..." # Check prerequisites check_keycloak pause_for_inspection "Keycloak health check" # Start MCP server start_mcp_server pause_for_inspection "MCP server startup" # Test MCP server health test_mcp_health pause_for_inspection "MCP server health check" # Test unauthorized access test_unauthorized pause_for_inspection "Unauthorized access test" # Test with admin user (full access) print_info "=== Testing with admin user (full access) ===" # Get admin token with debug print_info "About to call get_token..." >&2 admin_token=$(get_token "mcp-admin" "admin123" "openid profile email mcp:read mcp:tools mcp:prompts") token_exit_code=$? print_info "get_token exit code: $token_exit_code" >&2 print_info "Captured token length: ${#admin_token}" >&2 print_info "Captured token starts with: ${admin_token:0:50}..." >&2 print_info "Captured token ends with: ...${admin_token: -50}" >&2 if [ $token_exit_code -ne 0 ]; then print_error "Failed to get admin token" exit 1 fi decode_jwt "$admin_token" assert_mcp_scopes "$admin_token" "mcp:read mcp:tools mcp:prompts" "mcp-admin" # Test azp verification for admin token test_azp_verification "$admin_token" "mcp-test-client" test_authorized "$admin_token" "ping" test_authorized "$admin_token" "tools/list" test_authorized "$admin_token" "tools/call" '{"name": "echo", "arguments": {"message": "Hello from admin", "repeat_count": 2}}' test_authorized "$admin_token" "prompts/list" test_authorized "$admin_token" "prompts/get" '{"name": "echo_prompt", "arguments": {"message": "Admin test"}}' pause_for_inspection "Admin user tests (full access)" # Test with regular user (limited access) print_info "=== Testing with regular user (limited access) ===" # Get user token user_token=$(get_token "mcp-user" "user123" "openid profile email mcp:read mcp:tools mcp:prompts") if [ $? -ne 0 ]; then print_error "Failed to get user token" exit 1 fi decode_jwt "$user_token" assert_mcp_scopes "$user_token" "mcp:read mcp:tools" "mcp-user" # Test azp verification for user token test_azp_verification "$user_token" "mcp-test-client" test_authorized "$user_token" "ping" test_authorized "$user_token" "tools/list" test_authorized "$user_token" "tools/call" '{"name": "echo", "arguments": {"message": "Hello from user", "repeat_count": 1}}' test_scope_authorization "$user_token" "prompts/list" "fail" test_scope_authorization "$user_token" "prompts/get" "fail" pause_for_inspection "Regular user tests (limited access)" # Test with readonly user (minimal access) print_info "=== Testing with readonly user (minimal access) ===" # Get readonly token readonly_token=$(get_token "mcp-readonly" "readonly123" "openid profile email mcp:read mcp:tools mcp:prompts") if [ $? -ne 0 ]; then print_error "Failed to get readonly token" exit 1 fi decode_jwt "$readonly_token" assert_mcp_scopes "$readonly_token" "mcp:read" "mcp-readonly" # Test azp verification for readonly token test_azp_verification "$readonly_token" "mcp-test-client" test_authorized "$readonly_token" "ping" test_scope_authorization "$readonly_token" "tools/list" "fail" test_scope_authorization "$readonly_token" "tools/call" "fail" test_scope_authorization "$readonly_token" "prompts/list" "fail" test_scope_authorization "$readonly_token" "prompts/get" "fail" pause_for_inspection "Readonly user tests (minimal access)" # Test OAuth metadata endpoints print_info "=== Testing OAuth metadata endpoints ===" # Test MCP server's protected resource metadata (RFC9728) local protected_resource=$(curl -s "$MCP_SERVER_URL/.well-known/oauth-protected-resource") if echo "$protected_resource" | grep -q "authorization_servers"; then print_status "OAuth protected resource metadata endpoint working" echo "$protected_resource" | jq '.' else print_error "OAuth protected resource metadata endpoint failed" echo "$protected_resource" | jq '.' fi # Test Keycloak's authorization server metadata (RFC8414) local auth_server_base=$(echo "$protected_resource" | jq -r '.authorization_servers[0] // empty') if [ ! -z "$auth_server_base" ]; then local oauth_metadata_url="${auth_server_base}/.well-known/oauth-authorization-server" print_info "Testing Keycloak OAuth 2.0 Authorization Server Metadata at: $oauth_metadata_url" local oauth_metadata=$(curl -s "$oauth_metadata_url") if echo "$oauth_metadata" | grep -q '"issuer"'; then print_status "Keycloak OAuth 2.0 Authorization Server metadata endpoint working" echo "$oauth_metadata" | jq '.' else print_error "Keycloak OAuth 2.0 Authorization Server metadata endpoint failed" echo "$oauth_metadata" | jq '.' fi else print_error "No authorization server URL found in protected resource metadata" fi pause_for_inspection "OAuth metadata endpoints tests" print_status "=== Step 10 Keycloak integration test completed successfully! ===" print_info "Keycloak URL: $KEYCLOAK_URL" print_info "Keycloak Realm: $KEYCLOAK_REALM" print_info "MCP Server URL: $MCP_SERVER_URL" print_info "Test users: mcp-admin, mcp-user, mcp-readonly" } # Run main function main "$@"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/christian-posta/mcp-auth-step-by-step'

If you have feedback or need assistance with the MCP directory API, please join our Discord server