AWS Resources MCP Server
by baryhuang
- src
- mcp_server_aws_resources
import argparse
import logging
import json
from typing import Any, Dict, List, Optional
import boto3
import os
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from pydantic import AnyUrl
import ast
from operator import itemgetter
logger = logging.getLogger('mcp_aws_resources_server')
def parse_arguments() -> argparse.Namespace:
"""Use argparse to allow values to be set as CLI switches
or environment variables
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--access-key-id', default=os.environ.get('AWS_ACCESS_KEY_ID')
)
parser.add_argument(
'--secret-access-key', default=os.environ.get('AWS_SECRET_ACCESS_KEY')
)
parser.add_argument(
'--session-token', default=os.environ.get('AWS_SESSION_TOKEN')
)
parser.add_argument(
'--profile', default=os.environ.get('AWS_PROFILE')
)
parser.add_argument(
'--region',
default=os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
)
return parser.parse_args()
class CodeExecutor(ast.NodeTransformer):
"""Custom AST NodeTransformer to validate and transform the code"""
def __init__(self):
self.has_result = False
self.imported_modules = set()
def visit_Assign(self, node):
"""Track if 'result' variable is assigned"""
for target in node.targets:
if isinstance(target, ast.Name) and target.id == 'result':
self.has_result = True
return node
def visit_Import(self, node):
"""Track imported modules"""
for alias in node.names:
self.imported_modules.add(alias.name)
return node
def visit_ImportFrom(self, node):
"""Track imported modules"""
self.imported_modules.add(node.module)
return node
class AWSResourceQuerier:
def __init__(self):
"""Initialize AWS session using environment variables"""
args = parse_arguments()
self.session = boto3.Session(
aws_access_key_id=args.access_key_id,
aws_secret_access_key=args.secret_access_key,
aws_session_token=args.session_token,
profile_name=args.profile,
region_name=args.region
)
if (not args.profile and
(not args.access_key_id or not args.secret_access_key)):
logger.warning("AWS credentials not found in environment variables")
def execute_query(self, code_snippet: str) -> str:
"""
Execute a boto3 code snippet and return the results
Args:
code_snippet (str): Python code using boto3 to query AWS resources
Returns:
str: JSON string containing the query results or error message
"""
try:
# Parse the code into an AST
tree = ast.parse(code_snippet)
# Analyze the code
executor = CodeExecutor()
executor.visit(tree)
# Validate imports
allowed_modules = {'boto3', 'operator', 'json', 'datetime', 'pytz', 'dateutil', 're', 'time'}
unauthorized_imports = executor.imported_modules - allowed_modules
if unauthorized_imports:
return json.dumps({
"error": f"Unauthorized imports: {', '.join(unauthorized_imports)}. "
f"Only {', '.join(allowed_modules)} are allowed."
})
# Create execution namespace
local_ns = {
'boto3': boto3,
'session': self.session,
'result': None,
'itemgetter': itemgetter,
'__builtins__': {
name: getattr(__builtins__, name)
for name in [
'dict', 'list', 'tuple', 'set', 'str', 'int', 'float', 'bool',
'len', 'max', 'min', 'sorted', 'filter', 'map', 'sum', 'any', 'all',
'__import__', 'hasattr', 'getattr', 'isinstance', 'print'
]
}
}
# Compile and execute the code
compiled_code = compile(tree, '<string>', 'exec')
exec(compiled_code, local_ns)
# Get the result
result = local_ns.get('result')
# Validate result was set
if not executor.has_result:
return json.dumps({
"error": "Code must set a 'result' variable with the query output"
})
# Convert result to JSON-serializable format
if result is not None:
if hasattr(result, 'to_dict'):
result = result.to_dict()
return json.dumps(result, default=str)
else:
return json.dumps({"error": "Result cannot be None"})
except SyntaxError as e:
logger.error(f"Syntax error in code: {str(e)}")
return json.dumps({"error": f"Syntax error: {str(e)}"})
except Exception as e:
logger.error(f"Error executing query: {str(e)}")
return json.dumps({"error": str(e)})
async def main():
"""Run the AWS Resources MCP server."""
logger.info("Server starting")
aws_querier = AWSResourceQuerier()
server = Server("aws-resources-manager")
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
return [
types.Resource(
uri=AnyUrl("aws://query_resources"),
name="AWS Resources Query",
description="Execute boto3 queries to fetch AWS resources",
mimeType="application/json",
)
]
@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> str:
if uri.scheme != "aws":
raise ValueError(f"Unsupported URI scheme: {uri.scheme}")
path = str(uri).replace("aws://", "")
if path == "query_resources":
# Return empty result as this endpoint requires a specific query
return json.dumps({"message": "Please use the query_aws_resources tool to execute specific queries"})
else:
raise ValueError(f"Unknown resource path: {path}")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="query_aws_resources",
description="Execute a boto3 code snippet to query AWS resources",
inputSchema={
"type": "object",
"properties": {
"code_snippet": {
"type": "string",
"description": "Python code using boto3 to query AWS resources. The code should have default execution setting variable named 'result'. Example code: 'result = boto3.client('s3').list_buckets()'"
}
},
"required": ["code_snippet"]
},
)
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict[str, Any] | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""Handle tool execution requests"""
try:
if name == "query_aws_resources":
if not arguments or "code_snippet" not in arguments:
raise ValueError("Missing code_snippet argument")
results = aws_querier.execute_query(arguments["code_snippet"])
return [types.TextContent(type="text", text=str(results))]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server running with stdio transport")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="aws-resources",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())