import axios from 'axios';
/**
* AWS Athena MCP 客户端示例 - 使用 Cognito OAuth 认证
*/
interface CognitoConfig {
clientId: string;
clientSecret: string;
tokenUrl: string;
apiEndpoint: string;
}
interface TokenResponse {
access_token: string;
expires_in: number;
token_type: string;
}
class AthenaMcpClient {
private config: CognitoConfig;
private accessToken: string | null = null;
private tokenExpiry: number = 0;
constructor(config: CognitoConfig) {
this.config = config;
}
/**
* 获取 OAuth Access Token
*/
private async getAccessToken(): Promise<string> {
// 如果 token 还有效,直接返回
if (this.accessToken && Date.now() < this.tokenExpiry) {
return this.accessToken;
}
// 获取新的 token
const auth = Buffer.from(
`${this.config.clientId}:${this.config.clientSecret}`
).toString('base64');
const response = await axios.post<TokenResponse>(
this.config.tokenUrl,
'grant_type=client_credentials&scope=athena-mcp-api/read+athena-mcp-api/write',
{
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': `Basic ${auth}`,
},
}
);
this.accessToken = response.data.access_token;
// 提前 5 分钟过期
this.tokenExpiry = Date.now() + (response.data.expires_in - 300) * 1000;
return this.accessToken;
}
/**
* 发送 MCP JSON-RPC 请求
*/
private async sendRequest(method: string, params?: any): Promise<any> {
const token = await this.getAccessToken();
const response = await axios.post(
this.config.apiEndpoint,
{
jsonrpc: '2.0',
id: Date.now(),
method,
params,
},
{
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`,
},
}
);
if (response.data.error) {
throw new Error(`MCP Error: ${response.data.error.message}`);
}
return response.data.result;
}
/**
* 初始化 MCP 连接
*/
async initialize(): Promise<any> {
return this.sendRequest('initialize', {
protocolVersion: '2024-11-05',
capabilities: {},
clientInfo: {
name: 'athena-mcp-client',
version: '1.0.0',
},
});
}
/**
* 获取可用工具列表
*/
async listTools(): Promise<any> {
return this.sendRequest('tools/list');
}
/**
* 执行 Athena 查询
*/
async runQuery(database: string, query: string, maxRows?: number): Promise<any> {
const result = await this.sendRequest('tools/call', {
name: 'run_query',
arguments: {
database,
query,
maxRows,
},
});
return JSON.parse(result.content[0].text);
}
/**
* 获取查询结果
*/
async getResult(queryExecutionId: string, maxRows?: number): Promise<any> {
const result = await this.sendRequest('tools/call', {
name: 'get_result',
arguments: {
queryExecutionId,
maxRows,
},
});
return JSON.parse(result.content[0].text);
}
/**
* 获取查询状态
*/
async getStatus(queryExecutionId: string): Promise<any> {
const result = await this.sendRequest('tools/call', {
name: 'get_status',
arguments: {
queryExecutionId,
},
});
return JSON.parse(result.content[0].text);
}
}
// 使用示例
async function main() {
// 从环境变量或配置文件读取
const config: CognitoConfig = {
clientId: process.env.COGNITO_CLIENT_ID || '',
clientSecret: process.env.COGNITO_CLIENT_SECRET || '',
tokenUrl: process.env.COGNITO_TOKEN_URL || '',
apiEndpoint: process.env.API_ENDPOINT || '',
};
const client = new AthenaMcpClient(config);
try {
// 1. 初始化
console.log('1. 初始化 MCP 连接...');
const initResult = await client.initialize();
console.log('服务器信息:', initResult.serverInfo);
// 2. 列出工具
console.log('\n2. 获取可用工具...');
const tools = await client.listTools();
console.log('可用工具:', tools.tools.map((t: any) => t.name));
// 3. 执行查询
console.log('\n3. 执行 Athena 查询...');
const queryResult = await client.runQuery(
'quickdemo',
'SELECT * FROM dws_product_daily ORDER BY sales_amount DESC LIMIT 5',
10
);
if (queryResult.queryExecutionId) {
console.log('查询已提交:', queryResult.queryExecutionId);
// 等待查询完成
let status;
do {
await new Promise(resolve => setTimeout(resolve, 2000));
status = await client.getStatus(queryResult.queryExecutionId);
console.log('查询状态:', status.state);
} while (status.state === 'RUNNING' || status.state === 'QUEUED');
if (status.state === 'SUCCEEDED') {
const result = await client.getResult(queryResult.queryExecutionId, 10);
console.log('查询结果:', result.rows);
}
} else {
console.log('查询结果:', queryResult.rows);
}
} catch (error) {
console.error('错误:', error);
}
}
// 如果直接运行此文件
if (require.main === module) {
main();
}
export { AthenaMcpClient, CognitoConfig };