/**
* MCP 客户端示例 - 使用 HTTP 传输连接到 Lambda 部署的 Athena MCP Server
*/
interface McpRequest {
jsonrpc: string;
id: number;
method: string;
params: any;
}
interface McpResponse {
jsonrpc: string;
id: number;
result?: any;
error?: {
code: number;
message: string;
};
}
class AthenaMcpClient {
private endpoint: string;
private requestId: number = 0;
constructor(endpoint: string) {
this.endpoint = endpoint;
}
private async sendRequest(method: string, params: any = {}): Promise<any> {
const request: McpRequest = {
jsonrpc: "2.0",
id: ++this.requestId,
method,
params,
};
const response = await fetch(this.endpoint, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(request),
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data: McpResponse = await response.json();
if (data.error) {
throw new Error(`MCP Error: ${data.error.message}`);
}
return data.result;
}
async listTools() {
return this.sendRequest("tools/list");
}
async runQuery(database: string, query: string, maxRows?: number, timeoutMs?: number) {
return this.sendRequest("tools/call", {
name: "run_query",
arguments: {
database,
query,
maxRows,
timeoutMs,
},
});
}
async getQueryStatus(queryExecutionId: string) {
return this.sendRequest("tools/call", {
name: "get_status",
arguments: {
queryExecutionId,
},
});
}
async getQueryResults(queryExecutionId: string, maxRows?: number) {
return this.sendRequest("tools/call", {
name: "get_result",
arguments: {
queryExecutionId,
maxRows,
},
});
}
async listSavedQueries() {
return this.sendRequest("tools/call", {
name: "list_saved_queries",
arguments: {},
});
}
async runSavedQuery(
namedQueryId: string,
databaseOverride?: string,
maxRows?: number,
timeoutMs?: number
) {
return this.sendRequest("tools/call", {
name: "run_saved_query",
arguments: {
namedQueryId,
databaseOverride,
maxRows,
timeoutMs,
},
});
}
}
// 使用示例
async function main() {
// 替换为你的 API Gateway 端点
const endpoint = "https://your-api-id.execute-api.us-east-1.amazonaws.com/prod/mcp";
const client = new AthenaMcpClient(endpoint);
try {
// 1. 列出所有可用工具
console.log("=== 可用工具 ===");
const tools = await client.listTools();
console.log(JSON.stringify(tools, null, 2));
// 2. 执行简单查询
console.log("\n=== 执行查询: SHOW DATABASES ===");
const queryResult = await client.runQuery("default", "SHOW DATABASES");
console.log(JSON.stringify(queryResult, null, 2));
// 3. 执行带超时的查询
console.log("\n=== 执行长时间查询 ===");
const longQuery = await client.runQuery(
"my_database",
"SELECT * FROM large_table LIMIT 100",
100,
30000 // 30 秒超时
);
if ('queryExecutionId' in longQuery) {
console.log(`查询超时,执行 ID: ${longQuery.queryExecutionId}`);
// 4. 检查查询状态
console.log("\n=== 检查查询状态 ===");
const status = await client.getQueryStatus(longQuery.queryExecutionId);
console.log(JSON.stringify(status, null, 2));
// 5. 获取查询结果(如果已完成)
if (status.content[0].text.includes('"state":"SUCCEEDED"')) {
console.log("\n=== 获取查询结果 ===");
const results = await client.getQueryResults(longQuery.queryExecutionId);
console.log(JSON.stringify(results, null, 2));
}
} else {
console.log("查询立即完成:");
console.log(JSON.stringify(longQuery, null, 2));
}
// 6. 列出保存的查询
console.log("\n=== 保存的查询 ===");
const savedQueries = await client.listSavedQueries();
console.log(JSON.stringify(savedQueries, null, 2));
// 7. 执行保存的查询(如果有)
const savedQueriesData = JSON.parse(savedQueries.content[0].text);
if (savedQueriesData.namedQueries && savedQueriesData.namedQueries.length > 0) {
const firstQuery = savedQueriesData.namedQueries[0];
console.log(`\n=== 执行保存的查询: ${firstQuery.name} ===`);
const savedQueryResult = await client.runSavedQuery(firstQuery.id);
console.log(JSON.stringify(savedQueryResult, null, 2));
}
} catch (error) {
console.error("错误:", error);
}
}
// 运行示例
if (require.main === module) {
main();
}
export { AthenaMcpClient };