#!/bin/bash
# 测试 Cognito OAuth 认证的脚本
# 使用方法: ./test-cognito-auth.sh
set -e
echo "=== AWS Athena MCP - Cognito OAuth 测试 ==="
echo ""
# 从 CloudFormation 获取配置
STACK_NAME="aws-athena-mcp-stack"
echo "1. 获取 Cognito 配置..."
CLIENT_ID=$(aws cloudformation describe-stacks --stack-name $STACK_NAME --query "Stacks[0].Outputs[?OutputKey=='CognitoAppClientId'].OutputValue" --output text)
TOKEN_URL=$(aws cloudformation describe-stacks --stack-name $STACK_NAME --query "Stacks[0].Outputs[?OutputKey=='CognitoTokenUrl'].OutputValue" --output text)
API_ENDPOINT=$(aws cloudformation describe-stacks --stack-name $STACK_NAME --query "Stacks[0].Outputs[?OutputKey=='ApiEndpoint'].OutputValue" --output text)
USER_POOL_ID=$(aws cloudformation describe-stacks --stack-name $STACK_NAME --query "Stacks[0].Outputs[?OutputKey=='CognitoUserPoolId'].OutputValue" --output text)
echo "Client ID: $CLIENT_ID"
echo "Token URL: $TOKEN_URL"
echo "API Endpoint: $API_ENDPOINT"
echo ""
# 获取 Client Secret
echo "2. 获取 Client Secret..."
CLIENT_SECRET=$(aws cognito-idp describe-user-pool-client --user-pool-id $USER_POOL_ID --client-id $CLIENT_ID --query "UserPoolClient.ClientSecret" --output text)
if [ -z "$CLIENT_SECRET" ]; then
echo "错误: 无法获取 Client Secret"
exit 1
fi
echo "Client Secret: ${CLIENT_SECRET:0:10}..."
echo ""
# 获取 Access Token
echo "3. 获取 OAuth Access Token..."
AUTH_HEADER=$(echo -n "$CLIENT_ID:$CLIENT_SECRET" | base64)
TOKEN_RESPONSE=$(curl -s -X POST "$TOKEN_URL" \
-H "Content-Type: application/x-www-form-urlencoded" \
-H "Authorization: Basic $AUTH_HEADER" \
-d "grant_type=client_credentials&scope=athena-mcp-api/read+athena-mcp-api/write")
ACCESS_TOKEN=$(echo $TOKEN_RESPONSE | jq -r '.access_token')
if [ "$ACCESS_TOKEN" == "null" ] || [ -z "$ACCESS_TOKEN" ]; then
echo "错误: 无法获取 Access Token"
echo "响应: $TOKEN_RESPONSE"
exit 1
fi
echo "Access Token: ${ACCESS_TOKEN:0:20}..."
echo ""
# 测试 MCP 协议
echo "4. 测试 MCP initialize..."
INIT_RESPONSE=$(curl -s -X POST "$API_ENDPOINT" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $ACCESS_TOKEN" \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "test-client",
"version": "1.0.0"
}
}
}')
echo "Initialize 响应:"
echo $INIT_RESPONSE | jq '.'
echo ""
# 测试 tools/list
echo "5. 测试 tools/list..."
TOOLS_RESPONSE=$(curl -s -X POST "$API_ENDPOINT" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $ACCESS_TOKEN" \
-d '{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list"
}')
echo "Tools 列表:"
echo $TOOLS_RESPONSE | jq '.result.tools[] | {name: .name, description: .description}'
echo ""
# 测试未授权访问
echo "6. 测试未授权访问(应该失败)..."
UNAUTH_RESPONSE=$(curl -s -w "\nHTTP_CODE:%{http_code}" -X POST "$API_ENDPOINT" \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"id": 3,
"method": "tools/list"
}')
HTTP_CODE=$(echo "$UNAUTH_RESPONSE" | grep "HTTP_CODE:" | cut -d: -f2)
echo "HTTP 状态码: $HTTP_CODE"
if [ "$HTTP_CODE" == "401" ]; then
echo "✓ 未授权访问被正确拒绝"
else
echo "✗ 警告: 未授权访问未被拒绝 (状态码: $HTTP_CODE)"
fi
echo ""
echo "=== 测试完成 ==="
echo ""
echo "保存以下信息用于客户端配置:"
echo "CLIENT_ID=$CLIENT_ID"
echo "CLIENT_SECRET=$CLIENT_SECRET"
echo "TOKEN_URL=$TOKEN_URL"
echo "API_ENDPOINT=$API_ENDPOINT"