AWS MCP Server
- src
- mcp_server_aws
import os
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Sequence
from functools import lru_cache
import base64
import io
import boto3
import asyncio
from dotenv import load_dotenv
import mcp.server.stdio
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.types import Resource, Tool, TextContent, ImageContent, EmbeddedResource
from pydantic import AnyUrl
from .tools import get_aws_tools
from .utils import get_dynamodb_type
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("aws-mcp-server")
def custom_json_serializer(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
class AWSManager:
def __init__(self):
logger.info("Initializing AWSManager")
self.audit_entries: list[dict] = []
@lru_cache(maxsize=None)
def get_boto3_client(self, service_name: str, region_name: str = None):
"""Get a boto3 client using explicit credentials if available"""
try:
logger.info(f"Creating boto3 client for service: {service_name}")
region_name = region_name or os.getenv("AWS_REGION", "us-east-1")
if not region_name:
raise ValueError(
"AWS region is not specified and not set in the environment.")
aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
if aws_access_key and aws_secret_key:
logger.debug("Using explicit AWS credentials")
session = boto3.Session(
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=region_name
)
else:
logger.debug("Using default AWS credential chain")
session = boto3.Session(region_name=region_name)
return session.client(service_name)
except Exception as e:
logger.error(f"Failed to create boto3 client for {
service_name}: {e}")
raise RuntimeError(f"Failed to create boto3 client: {e}")
def _synthesize_audit_log(self) -> str:
"""Generate formatted audit log from entries"""
logger.debug("Synthesizing audit log")
if not self.audit_entries:
return "No AWS operations have been performed yet."
report = "📋 AWS Operations Audit Log 📋\n\n"
for entry in self.audit_entries:
report += f"[{entry['timestamp']}]\n"
report += f"Service: {entry['service']}\n"
report += f"Operation: {entry['operation']}\n"
report += f"Parameters: {json.dumps(
entry['parameters'], indent=2)}\n"
report += "-" * 50 + "\n"
return report
def log_operation(self, service: str, operation: str, parameters: dict) -> None:
"""Log an AWS operation to the audit log"""
logger.info(
f"Logging operation - Service: {service}, Operation: {operation}")
audit_entry = {
"timestamp": datetime.utcnow().isoformat(),
"service": service,
"operation": operation,
"parameters": parameters
}
self.audit_entries.append(audit_entry)
async def main():
logger.info("Starting AWS MCP Server")
aws = AWSManager()
server = Server("aws-mcp-server")
logger.debug("Registering handlers")
@server.list_resources()
async def handle_list_resources() -> list[Resource]:
logger.debug("Handling list_resources request")
return [
Resource(
uri=AnyUrl("audit://aws-operations"),
name="AWS Operations Audit Log",
description="A log of all AWS operations performed through this server",
mimeType="text/plain",
)
]
@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> str:
logger.debug(f"Handling read_resource request for URI: {uri}")
if uri.scheme != "audit":
logger.error(f"Unsupported URI scheme: {uri.scheme}")
raise ValueError(f"Unsupported URI scheme: {uri.scheme}")
path = str(uri).replace("audit://", "")
if path != "aws-operations":
logger.error(f"Unknown resource path: {path}")
raise ValueError(f"Unknown resource path: {path}")
return aws._synthesize_audit_log()
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available AWS tools"""
logger.debug("Handling list_tools request")
return get_aws_tools()
async def handle_s3_operations(aws: AWSManager, name: str, arguments: dict) -> list[TextContent]:
"""Handle S3-specific operations"""
s3_client = aws.get_boto3_client('s3')
response = None
if name == "s3_bucket_create":
response = s3_client.create_bucket(Bucket=arguments["bucket_name"],
CreateBucketConfiguration={
'LocationConstraint': os.getenv("AWS_REGION") or 'us-east-1'
})
elif name == "s3_bucket_list":
response = s3_client.list_buckets()
elif name == "s3_bucket_delete":
response = s3_client.delete_bucket(Bucket=arguments["bucket_name"])
elif name == "s3_object_upload":
response = s3_client.upload_fileobj(
io.BytesIO(base64.b64decode(arguments["file_content"])),
arguments["bucket_name"],
arguments["object_key"])
elif name == "s3_object_delete":
response = s3_client.delete_object(
Bucket=arguments["bucket_name"],
Key=arguments["object_key"]
)
elif name == "s3_object_list":
response = s3_client.list_objects_v2(
Bucket=arguments["bucket_name"])
elif name == "s3_object_read":
logging.info(f"Reading object: {arguments['object_key']}")
response = s3_client.get_object(
Bucket=arguments["bucket_name"],
Key=arguments["object_key"]
)
content = response['Body'].read().decode('utf-8')
return [TextContent(type="text", text=content)]
else:
raise ValueError(f"Unknown S3 operation: {name}")
aws.log_operation("s3", name.replace("s3_", ""), arguments)
return [TextContent(type="text", text=f"Operation Result:\n{json.dumps(response, indent=2, default=custom_json_serializer)}")]
async def handle_dynamodb_operations(aws: AWSManager, name: str, arguments: dict) -> list[TextContent]:
"""Handle DynamoDB-specific operations"""
dynamodb_client = aws.get_boto3_client('dynamodb')
response = None
if name == "dynamodb_table_create":
response = dynamodb_client.create_table(
TableName=arguments["table_name"],
KeySchema=arguments["key_schema"],
AttributeDefinitions=arguments["attribute_definitions"],
BillingMode="PAY_PER_REQUEST"
)
elif name == "dynamodb_table_describe":
response = dynamodb_client.describe_table(
TableName=arguments["table_name"])
elif name == "dynamodb_table_list":
response = dynamodb_client.list_tables()
elif name == "dynamodb_table_delete":
response = dynamodb_client.delete_table(
TableName=arguments["table_name"])
elif name == "dynamodb_table_update":
update_params = {
"TableName": arguments["table_name"],
"AttributeDefinitions": arguments["attribute_definitions"]
}
response = dynamodb_client.update_table(**update_params)
elif name == "dynamodb_describe_ttl":
response = dynamodb_client.describe_time_to_live(
TableName=arguments["table_name"]
)
elif name == "dynamodb_update_ttl":
response = dynamodb_client.update_time_to_live(
TableName=arguments["table_name"],
TimeToLiveSpecification={
'Enabled': arguments["ttl_enabled"],
'AttributeName': arguments["ttl_attribute"]
}
)
elif name == "dynamodb_item_put":
response = dynamodb_client.put_item(
TableName=arguments["table_name"],
Item=arguments["item"]
)
elif name == "dynamodb_item_get":
response = dynamodb_client.get_item(
TableName=arguments["table_name"],
Key=arguments["key"]
)
elif name == "dynamodb_item_update":
response = dynamodb_client.update_item(
TableName=arguments["table_name"],
Key=arguments["key"],
AttributeUpdates=arguments["item"]
)
elif name == "dynamodb_item_delete":
response = dynamodb_client.delete_item(
TableName=arguments["table_name"],
Key=arguments["key"]
)
elif name == "dynamodb_item_query":
response = dynamodb_client.query(
TableName=arguments["table_name"],
KeyConditionExpression=arguments["key_condition"],
ExpressionAttributeValues=arguments["expression_values"]
)
elif name == "dynamodb_item_scan":
scan_params = {"TableName": arguments["table_name"]}
if "filter_expression" in arguments:
scan_params["FilterExpression"] = arguments["filter_expression"]
if "expression_attributes" in arguments:
attrs = arguments["expression_attributes"]
if "names" in attrs:
scan_params["ExpressionAttributeNames"] = attrs["names"]
if "values" in attrs:
scan_params["ExpressionAttributeValues"] = attrs["values"]
response = dynamodb_client.scan(**scan_params)
elif name == "dynamodb_batch_get":
response = dynamodb_client.batch_get_item(
RequestItems=arguments["request_items"]
)
elif name == "dynamodb_item_batch_write":
table_name = arguments["table_name"]
operation = arguments["operation"]
items = arguments["items"]
if not items:
raise ValueError("No items provided for batch operation")
batch_size = 25
total_items = len(items)
processed_items = 0
failed_items = []
for i in range(0, total_items, batch_size):
batch = items[i:i + batch_size]
request_items = {table_name: []}
for item in batch:
if operation == "put":
formatted_item = {k: get_dynamodb_type(
v) for k, v in item.items()}
request_items[table_name].append({
'PutRequest': {'Item': formatted_item}
})
elif operation == "delete":
key_attrs = arguments.get(
"key_attributes", list(item.keys()))
formatted_key = {k: get_dynamodb_type(
item[k]) for k in key_attrs}
request_items[table_name].append({
'DeleteRequest': {'Key': formatted_key}
})
try:
response = dynamodb_client.batch_write_item(
RequestItems=request_items)
processed_items += len(batch) - len(
response.get('UnprocessedItems', {}
).get(table_name, [])
)
unprocessed = response.get('UnprocessedItems', {})
retry_count = 0
max_retries = 3
while unprocessed and retry_count < max_retries:
await asyncio.sleep(2 ** retry_count)
retry_response = dynamodb_client.batch_write_item(
RequestItems=unprocessed)
unprocessed = retry_response.get(
'UnprocessedItems', {})
retry_count += 1
if unprocessed:
failed_items.extend([
item['PutRequest']['Item'] if 'PutRequest' in item else item['DeleteRequest']['Key']
for item in unprocessed.get(table_name, [])
])
except Exception as e:
logger.error(f"Error processing batch: {str(e)}")
failed_items.extend(batch)
response = {
"total_items": total_items,
"processed_items": processed_items,
"failed_items": len(failed_items),
"failed_items_details": failed_items if failed_items else None
}
elif name == "dynamodb_batch_execute":
response = dynamodb_client.batch_execute_statement(
Statements=[{
'Statement': statement,
'Parameters': params
} for statement, params in zip(arguments["statements"], arguments["parameters"])]
)
else:
raise ValueError(f"Unknown DynamoDB operation: {name}")
aws.log_operation("dynamodb", name.replace("dynamodb_", ""), arguments)
return [TextContent(type="text", text=f"Operation Result:\n{json.dumps(response, indent=2, default=custom_json_serializer)}")]
@server.call_tool()
async def call_tool(name: str, arguments: Any) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
"""Handle AWS tool operations"""
logger.info(f"Handling tool call: {name}")
logger.debug(f"Tool arguments: {arguments}")
if not isinstance(arguments, dict):
logger.error("Invalid arguments: not a dictionary")
raise ValueError("Invalid arguments")
try:
if name.startswith("s3_"):
return await handle_s3_operations(aws, name, arguments)
elif name.startswith("dynamodb_"):
return await handle_dynamodb_operations(aws, name, arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Operation failed: {str(e)}")
raise RuntimeError(f"Operation failed: {str(e)}")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server running with stdio transport")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="mcp-server-aws",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(main())