#!/usr/bin/env node
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
import fs from "fs";
import path from "path";
import { Auth } from "@aws-amplify/auth";
// Parse command line arguments
const args = process.argv.slice(2);
let amplifyOutputsPath: string | null = null;
// Read credentials from environment variables instead of command line
const username: string | null = process.env.AMPLIFY_USERNAME || null;
const password: string | null = process.env.AMPLIFY_PASSWORD || null;
// Look for the amplify outputs path argument
for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (arg === "--amplify-outputs") {
amplifyOutputsPath = args[++i];
} else if (arg === "--help" || arg === "-h") {
console.log(`
Amplify Data API MCP Server
Options:
--amplify-outputs <path> Path to the amplify_outputs.json file
--help, -h Show this help message
Environment Variables:
AMPLIFY_USERNAME Cognito username for automatic login
AMPLIFY_PASSWORD Cognito password for automatic login
`);
process.exit(0);
}
}
// Validate amplify outputs path
if (!amplifyOutputsPath) {
console.error(
"Error: Amplify outputs path is required (--amplify-outputs <path>)"
);
process.exit(1);
}
// Resolve to absolute path
const absolutePath = path.resolve(process.cwd(), amplifyOutputsPath);
// Check if file exists
if (!fs.existsSync(absolutePath)) {
console.error(`Error: Amplify outputs file not found at ${absolutePath}`);
process.exit(1);
}
console.log(
`Starting Amplify Data API MCP server with outputs file: ${absolutePath}`
);
// Load the Amplify outputs file
let amplifyOutputs;
try {
const fileContent = fs.readFileSync(absolutePath, "utf8");
amplifyOutputs = JSON.parse(fileContent);
} catch (error: unknown) {
console.error(
`Error reading Amplify outputs file: ${
error instanceof Error ? error.message : String(error)
}`
);
process.exit(1);
}
// Extract key information from outputs
const apiUrl = amplifyOutputs.data?.url;
const region = amplifyOutputs.data?.aws_region || "us-east-1";
const defaultAuthType =
amplifyOutputs.data?.default_authorization_type || "API_KEY";
const authTypes = amplifyOutputs.data?.authorization_types || [];
const models = amplifyOutputs.data?.model_introspection?.models || {};
const enums = amplifyOutputs.data?.model_introspection?.enums || {};
// Validate required fields
if (!apiUrl) {
console.error("Error: Missing API URL in Amplify outputs file");
process.exit(1);
}
// Initialize authentication with Cognito if available
let isAuthInitialized = false;
if (amplifyOutputs.auth) {
const {
user_pool_id,
aws_region: authRegion,
user_pool_client_id,
identity_pool_id,
} = amplifyOutputs.auth;
if (user_pool_id && user_pool_client_id) {
try {
Auth.configure({
region: authRegion || region,
userPoolId: user_pool_id,
userPoolWebClientId: user_pool_client_id,
identityPoolId: identity_pool_id,
});
isAuthInitialized = true;
console.log("Cognito authentication configured successfully");
} catch (error: unknown) {
console.error(
`Error configuring Cognito authentication: ${
error instanceof Error ? error.message : String(error)
}`
);
}
}
}
// Create server instance
const server = new McpServer({
name: "amplify-data-api",
version: "1.0.0",
capabilities: {
resources: {},
tools: {},
},
});
// Current authentication state
let currentUser: any = null;
let idToken: string | null = null;
// Store credentials globally for re-login purposes
let storedUsername: string | null = null;
let storedPassword: string | null = null;
// Helper function to make authenticated GraphQL requests
async function executeGraphQLQuery(
query: string,
variables: Record<string, any> = {}
): Promise<any> {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
// Add authentication header if available
if (idToken) {
headers["Authorization"] = idToken;
}
try {
const response = await fetch(apiUrl, {
method: "POST",
headers,
body: JSON.stringify({
query,
variables,
}),
});
// Check for 401 Unauthorized status specifically
if (response.status === 401) {
console.error(
"Received 401 Unauthorized, attempting to refresh authentication"
);
return await handleAuthRefresh(query, variables);
}
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const result = await response.json();
// Also check response for auth errors
if (
result.errors &&
result.errors.some(
(e: any) =>
e.errorType === "UnauthorizedException" ||
e.message?.includes("Authentication failed") ||
e.message?.includes("token is expired")
)
) {
console.error("Authentication error in GraphQL response");
return await handleAuthRefresh(query, variables);
}
return result;
} catch (error: unknown) {
console.error("Error executing GraphQL query:", error);
throw error;
}
}
// Separate function to handle authentication refresh and retry
async function handleAuthRefresh(
query: string,
variables: Record<string, any> = {}
): Promise<any> {
// Try to refresh session or re-login
if (isAuthInitialized) {
try {
console.error("Attempting to refresh session...");
// First try to refresh the session without parameters
await Auth.currentSession();
const session = await Auth.currentSession();
idToken = session.getIdToken().getJwtToken();
console.error("Session refreshed successfully");
// Retry the query with the new token
return executeGraphQLQuery(query, variables);
} catch (refreshError) {
console.error("Session refresh failed:", refreshError);
// If we have stored or environment credentials, try to login again
const loginUsername = storedUsername || username;
const loginPassword = storedPassword || password;
if (loginUsername && loginPassword) {
try {
console.error(`Attempting re-login for user: ${loginUsername}`);
const user = await Auth.signIn(loginUsername, loginPassword);
currentUser = user;
const session = await Auth.currentSession();
idToken = session.getIdToken().getJwtToken();
console.error("Re-login successful");
// Retry the query with the new token
return executeGraphQLQuery(query, variables);
} catch (loginError) {
console.error("Re-login failed:", loginError);
throw new Error(
"Authentication expired and automatic re-login failed"
);
}
} else {
throw new Error(
"Authentication expired and no stored credentials for re-login"
);
}
}
} else {
throw new Error("Authentication system not initialized");
}
}
// Utility function to generate GraphQL fields for a model
function generateGraphQLFields(modelName: string, depth = 0): string {
if (depth > 1) return "id"; // Reduced depth limit from 2 to 1
const model = models[modelName];
if (!model) return "id";
const fields = Object.entries(model.fields)
.map(([fieldName, field]: [string, any]) => {
// Skip read-only fields for mutations
if (field.isReadOnly) return null;
// Handle scalar fields
if (!field.type.model && !field.type.enum) {
return fieldName;
}
// Handle enum fields
if (field.type.enum) {
return fieldName;
}
// Handle model associations - simplified to avoid complex relationships
if (field.type.model) {
// For connections and arrays, just include the ID to avoid validation errors
if (depth >= 1 || field.isArray) {
return null; // Skip nested connections entirely
}
// For direct associations at depth 0, include only id and basic fields
return `${fieldName} { id name }`;
}
return null;
})
.filter(Boolean);
return fields.join("\n");
}
// Tool: Authenticate with Cognito
server.tool(
"login",
"Login with Cognito username and password",
{
username: z.string().describe("Cognito username (usually email)"),
password: z.string().describe("Cognito password"),
},
async ({ username, password }) => {
if (!isAuthInitialized) {
return {
content: [
{
type: "text",
text: "Error: Cognito authentication is not configured",
},
],
};
}
try {
const user = await Auth.signIn(username, password);
currentUser = user;
// Get the current session to extract tokens
const session = await Auth.currentSession();
idToken = session.getIdToken().getJwtToken();
// Store credentials for potential re-login
storedUsername = username;
storedPassword = password;
return {
content: [
{
type: "text",
text: `Successfully logged in as ${username}`,
},
],
};
} catch (error: unknown) {
return {
content: [
{
type: "text",
text: `Login failed: ${
error instanceof Error ? error.message : String(error)
}`,
},
],
};
}
}
);
// Tool: Get current user info
server.tool(
"get-current-user",
"Get information about the currently logged in user",
{},
async () => {
if (!currentUser) {
return {
content: [
{
type: "text",
text: "Not logged in. Use the login tool to authenticate first.",
},
],
};
}
try {
// Get the current authenticated user
const userInfo = await Auth.currentUserInfo();
return {
content: [
{
type: "text",
text: `Current User:\n${JSON.stringify(userInfo, null, 2)}`,
},
],
};
} catch (error: unknown) {
return {
content: [
{
type: "text",
text: `Error getting user info: ${
error instanceof Error ? error.message : String(error)
}`,
},
],
};
}
}
);
// Tool: List all data models
server.tool(
"list-models",
"List all data models in the Amplify app",
{},
async () => {
if (Object.keys(models).length === 0) {
return {
content: [
{
type: "text",
text: "No data models found in the Amplify outputs file",
},
],
};
}
const modelDetailsArray = Object.entries(models).map(
([name, model]: [string, any]) => {
const fields = Object.entries(model.fields)
.map(([fieldName, field]: [string, any]) => {
const typeInfo = field.type.model
? `${field.type.model} (model)`
: field.type.enum
? `${field.type.enum} (enum)`
: field.type;
return ` - ${fieldName}: ${typeInfo}${
field.isRequired ? " (required)" : ""
}${field.isArray ? " (array)" : ""}`;
})
.join("\n");
return `Model: ${name}\nFields:\n${fields}`;
}
);
return {
content: [
{
type: "text",
text: `Data Models:\n\n${modelDetailsArray.join("\n\n")}`,
},
],
};
}
);
// Tool: List all enums
server.tool(
"list-enums",
"List all enum types in the Amplify app",
{},
async () => {
if (Object.keys(enums).length === 0) {
return {
content: [
{
type: "text",
text: "No enum types found in the Amplify outputs file",
},
],
};
}
const enumDetailsArray = Object.entries(enums).map(
([name, enumType]: [string, any]) => {
const values = enumType.values
.map((value: string) => ` - ${value}`)
.join("\n");
return `Enum: ${name}\nValues:\n${values}`;
}
);
return {
content: [
{
type: "text",
text: `Enum Types:\n\n${enumDetailsArray.join("\n\n")}`,
},
],
};
}
);
// Tool: Get model details
server.tool(
"get-model-details",
"Get detailed information about a specific data model",
{
modelName: z.string().describe("Name of the model to get details for"),
},
async ({ modelName }) => {
const model = models[modelName];
if (!model) {
return {
content: [
{
type: "text",
text: `Model '${modelName}' not found. Use list-models to see available models.`,
},
],
};
}
// Extract field details
const fields = Object.entries(model.fields)
.map(([fieldName, field]: [string, any]) => {
let typeInfo = field.type.model
? `${field.type.model} (model)`
: field.type.enum
? `${field.type.enum} (enum)`
: field.type;
let relationshipInfo = "";
if (field.association) {
relationshipInfo = `\n Association: ${field.association.connectionType}`;
if (field.association.targetNames) {
relationshipInfo += `\n Target fields: ${field.association.targetNames.join(
", "
)}`;
}
}
return ` - ${fieldName}:\n Type: ${typeInfo}${
field.isRequired ? "\n Required: Yes" : ""
}${field.isArray ? "\n Array: Yes" : ""}${relationshipInfo}`;
})
.join("\n\n");
// Extract auth rules
const authRules = model.attributes
.filter((attr: any) => attr.type === "auth")
.flatMap((attr: any) =>
attr.properties.rules.map((rule: any) => {
return ` - Provider: ${rule.provider}\n Allow: ${
rule.allow
}\n Operations: ${rule.operations.join(", ")}`;
})
)
.join("\n\n");
return {
content: [
{
type: "text",
text: `Model: ${modelName}\n\nFields:\n${fields}\n\nAuthorization Rules:\n${authRules}`,
},
],
};
}
);
// Tool: Execute a custom GraphQL query
server.tool(
"run-graphql",
"Execute a custom GraphQL query or mutation",
{
query: z.string().describe("GraphQL query or mutation to execute"),
variables: z
.string()
.optional()
.describe("JSON string of variables for the query"),
},
async ({ query, variables }) => {
try {
// Parse variables if provided
let variablesObj = {};
if (variables) {
try {
variablesObj = JSON.parse(variables);
} catch (e: unknown) {
return {
content: [
{
type: "text",
text: `Invalid JSON variables: ${
e instanceof Error ? e.message : String(e)
}`,
},
],
};
}
}
// Even if not authenticated, we'll now try the request anyway
// as the automatic re-login will handle authentication if needed
const result = await executeGraphQLQuery(query, variablesObj);
return {
content: [
{
type: "text",
text: `Query Result:\n\n${JSON.stringify(result, null, 2)}`,
},
],
};
} catch (error: unknown) {
return {
content: [
{
type: "text",
text: `Error executing GraphQL query: ${
error instanceof Error ? error.message : String(error)
}`,
},
],
};
}
}
);
// Tool: Get relationship details
server.tool(
"get-relationships",
"Get relationships for a specific model",
{
modelName: z
.string()
.describe("Name of the model to get relationships for"),
},
async ({ modelName }) => {
const model = models[modelName];
if (!model) {
return {
content: [
{
type: "text",
text: `Model '${modelName}' not found. Use list-models to see available models.`,
},
],
};
}
// Extract relationships from model
const relationships = Object.entries(model.fields)
.filter(([_, field]: [string, any]) => field.association)
.map(([fieldName, field]: [string, any]) => {
const association = field.association;
return {
fieldName,
type: field.type.model,
relationshipType: association.connectionType,
associatedWith: association.associatedWith?.join(", ") || "N/A",
targetNames: association.targetNames?.join(", ") || "N/A",
};
});
if (relationships.length === 0) {
return {
content: [
{
type: "text",
text: `No relationships found for model: ${modelName}`,
},
],
};
}
const relationshipText = relationships
.map((rel) => {
return `Field: ${rel.fieldName}\nType: ${rel.type}\nRelationship: ${rel.relationshipType}\nAssociated With: ${rel.associatedWith}\nTarget Names: ${rel.targetNames}`;
})
.join("\n\n");
return {
content: [
{
type: "text",
text: `Relationships for ${modelName}:\n\n${relationshipText}`,
},
],
};
}
);
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Amplify Data API MCP Server running on stdio");
// Attempt automatic login if credentials are provided
if (isAuthInitialized && username && password) {
try {
console.error(`Attempting automatic login for user: ${username}`);
const user = await Auth.signIn(username, password);
currentUser = user;
// Get the current session to extract tokens
const session = await Auth.currentSession();
idToken = session.getIdToken().getJwtToken();
// Store credentials for potential re-login
storedUsername = username;
storedPassword = password;
console.error("Automatic login successful");
} catch (error: unknown) {
console.error(
`Automatic login failed: ${
error instanceof Error ? error.message : String(error)
}`
);
console.error(
"You can still use the login tool to authenticate manually"
);
}
}
}
main().catch((error) => {
console.error("Fatal error in main():", error);
process.exit(1);
});