aws-athena-mcp
by lishenxydlgzs
Verified
#!/usr/bin/env node
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ErrorCode,
ListToolsRequestSchema,
McpError,
} from "@modelcontextprotocol/sdk/types.js";
import { AthenaService } from "./athena.js";
import { QueryInput, AthenaError } from "./types.js";
class AthenaServer {
private server: Server;
private athenaService: AthenaService;
constructor() {
this.server = new Server(
{
name: "aws-athena-mcp",
version: "0.1.0",
},
{
capabilities: {
tools: {},
},
}
);
this.athenaService = new AthenaService();
this.setupToolHandlers();
// Error handling
this.server.onerror = (error) => console.error("[MCP Error]", error);
process.on("SIGINT", async () => {
await this.server.close();
process.exit(0);
});
}
private setupToolHandlers() {
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: "run_query",
description: "Execute a SQL query using AWS Athena. Returns full results if query completes before timeout, otherwise returns queryExecutionId.",
inputSchema: {
type: "object",
properties: {
database: {
type: "string",
description: "The Athena database to query",
},
query: {
type: "string",
description: "SQL query to execute",
},
maxRows: {
type: "number",
description: "Maximum number of rows to return (default: 1000)",
minimum: 1,
maximum: 10000,
},
timeoutMs: {
type: "number",
description: "Timeout in milliseconds (default: 60000)",
minimum: 1000,
},
},
required: ["database", "query"],
},
},
{
name: "get_result",
description: "Get results for a completed query. Returns error if query is still running.",
inputSchema: {
type: "object",
properties: {
queryExecutionId: {
type: "string",
description: "The query execution ID",
},
maxRows: {
type: "number",
description: "Maximum number of rows to return (default: 1000)",
minimum: 1,
maximum: 10000,
},
},
required: ["queryExecutionId"],
},
},
{
name: "get_status",
description: "Get the current status of a query execution",
inputSchema: {
type: "object",
properties: {
queryExecutionId: {
type: "string",
description: "The query execution ID",
},
},
required: ["queryExecutionId"],
},
},
],
}));
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
try {
switch (request.params.name) {
case "run_query": {
if (!request.params.arguments ||
typeof request.params.arguments.database !== 'string' ||
typeof request.params.arguments.query !== 'string') {
throw new McpError(
ErrorCode.InvalidParams,
"Missing or invalid required parameters: database (string) and query (string)"
);
}
const queryInput: QueryInput = {
database: request.params.arguments.database,
query: request.params.arguments.query,
maxRows: typeof request.params.arguments.maxRows === 'number' ?
request.params.arguments.maxRows : undefined,
timeoutMs: typeof request.params.arguments.timeoutMs === 'number' ?
request.params.arguments.timeoutMs : undefined,
};
const result = await this.athenaService.executeQuery(queryInput);
return {
content: [
{
type: "text",
text: JSON.stringify(result, null, 2),
},
],
};
}
case "get_result": {
if (!request.params.arguments?.queryExecutionId ||
typeof request.params.arguments.queryExecutionId !== 'string') {
throw new McpError(
ErrorCode.InvalidParams,
"Missing or invalid required parameter: queryExecutionId (string)"
);
}
const maxRows = typeof request.params.arguments.maxRows === 'number' ?
request.params.arguments.maxRows : undefined;
const result = await this.athenaService.getQueryResults(
request.params.arguments.queryExecutionId,
maxRows
);
return {
content: [
{
type: "text",
text: JSON.stringify(result, null, 2),
},
],
};
}
case "get_status": {
if (!request.params.arguments?.queryExecutionId ||
typeof request.params.arguments.queryExecutionId !== 'string') {
throw new McpError(
ErrorCode.InvalidParams,
"Missing or invalid required parameter: queryExecutionId (string)"
);
}
const status = await this.athenaService.getQueryStatus(
request.params.arguments.queryExecutionId
);
return {
content: [
{
type: "text",
text: JSON.stringify(status, null, 2),
},
],
};
}
default:
throw new McpError(
ErrorCode.MethodNotFound,
`Unknown tool: ${request.params.name}`
);
}
} catch (error) {
if (error && typeof error === "object" && "code" in error && "message" in error) {
const athenaError = error as AthenaError;
return {
content: [
{
type: "text",
text: `Error: ${athenaError.message}`,
},
],
isError: true,
};
}
throw error;
}
});
}
async run() {
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error("AWS Athena MCP server running on stdio");
}
}
const server = new AthenaServer();
server.run().catch(console.error);