athena_create_cloudtrail_table
Create an Athena table with partition projection for querying CloudTrail logs. Requires CloudTrail S3 bucket, account details, and an output bucket for query results.
Instructions
Create an Athena table for CloudTrail logs with partition projection.
<IMPORTANT>
Before using this tool ask the user for OUTPUT bucket, unless it is provided.
This is necessary to create the table correctly. If the API fails, interrupt the process and ask the user for the OUTPUT BUCKET.
</IMPORTANT>
Parameters:
cloudtrail_bucket (str): The S3 bucket for CloudTrail logs - you can retrieve it using the 'cloudtrail_describe_trails' tool.
is_org_trail (bool): Indicates if the trail is for the organization.
account_id (str): Your AWS account ID - you can retrieve it.
output_bucket (str): Ask the user if not specified, S3 bucket URI (e.g. 's3://my-athena-query-results/') for query results - different from cloudtrail_bucket.
output_region (str): The AWS region for the output bucket - use 'us-east-1' if not specified.
partition_region (str): The region of the events to be queried. It is used to create the S3 path for the Athena table.
database (str): Athena database name to be used.
Returns:
str: An empty result if successful, or an error message if there was an issue.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| account_id | Yes | ||
| cloudtrail_bucket | Yes | ||
| database | No | default | |
| is_org_trail | Yes | ||
| output_bucket | Yes | ||
| output_region | Yes | ||
| partition_region | Yes |
Input Schema (JSON Schema)
{
"properties": {
"account_id": {
"title": "Account Id",
"type": "string"
},
"cloudtrail_bucket": {
"title": "Cloudtrail Bucket",
"type": "string"
},
"database": {
"default": "default",
"title": "Database",
"type": "string"
},
"is_org_trail": {
"title": "Is Org Trail",
"type": "boolean"
},
"output_bucket": {
"title": "Output Bucket",
"type": "string"
},
"output_region": {
"title": "Output Region",
"type": "string"
},
"partition_region": {
"title": "Partition Region",
"type": "string"
}
},
"required": [
"cloudtrail_bucket",
"is_org_trail",
"account_id",
"output_bucket",
"output_region",
"partition_region"
],
"title": "athena_create_cloudtrail_tableArguments",
"type": "object"
}
Implementation Reference
- server.py:147-284 (handler)The handler function decorated with @mcp.tool(), which creates an Athena table for CloudTrail logs. It generates a unique table name, constructs the S3 location based on org trail or not, builds a detailed CREATE EXTERNAL TABLE query with partition projection, and executes it using the run_athena_query helper.@mcp.tool() async def athena_create_cloudtrail_table( cloudtrail_bucket: str, is_org_trail: bool, account_id: str, output_bucket: str, output_region: str, partition_region: str, database: str = "default", ) -> str: """ Create an Athena table for CloudTrail logs with partition projection. <IMPORTANT> Before using this tool ask the user for OUTPUT bucket, unless it is provided. This is necessary to create the table correctly. If the API fails, interrupt the process and ask the user for the OUTPUT BUCKET. </IMPORTANT> Parameters: cloudtrail_bucket (str): The S3 bucket for CloudTrail logs - you can retrieve it using the 'cloudtrail_describe_trails' tool. is_org_trail (bool): Indicates if the trail is for the organization. account_id (str): Your AWS account ID - you can retrieve it. output_bucket (str): Ask the user if not specified, S3 bucket URI (e.g. 's3://my-athena-query-results/') for query results - different from cloudtrail_bucket. output_region (str): The AWS region for the output bucket - use 'us-east-1' if not specified. partition_region (str): The region of the events to be queried. It is used to create the S3 path for the Athena table. database (str): Athena database name to be used. Returns: str: An empty result if successful, or an error message if there was an issue. """ # craft the Athena table name with date+hour suffix current_time = datetime.datetime.now() date_hour_suffix = current_time.strftime("%Y%m%d_%H%M") global athena_table athena_table = f"cloudtrail_logs_pp_{date_hour_suffix}" # Athena table name with date+hour suffix # Get the organization ID if the trail is for the organization. if is_org_trail: org_id = get_organization_details().get('Id') trail_location = f's3://{cloudtrail_bucket}/AWSLogs/{org_id}/{account_id}/CloudTrail/{partition_region}/' else: trail_location = f's3://{cloudtrail_bucket}/AWSLogs/{account_id}/CloudTrail/{partition_region}/' # Set the start date for partition projection. start_date = (datetime.datetime.now() - datetime.timedelta(days=10)).strftime("%Y/%m/%d") # Format: yyyy/MM/dd, 10 days ago # Construct the CREATE TABLE query with partition projection. query = f""" CREATE EXTERNAL TABLE {athena_table}( eventversion STRING, useridentity STRUCT< type: STRING, principalid: STRING, arn: STRING, accountid: STRING, invokedby: STRING, accesskeyid: STRING, username: STRING, onbehalfof: STRUCT< userid: STRING, identitystorearn: STRING>, sessioncontext: STRUCT< attributes: STRUCT< mfaauthenticated: STRING, creationdate: STRING>, sessionissuer: STRUCT< type: STRING, principalid: STRING, arn: STRING, accountid: STRING, username: STRING>, ec2roledelivery:STRING, webidfederationdata: STRUCT< federatedprovider: STRING, attributes: map<string,string>> > >, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, readonly STRING, resources ARRAY<STRUCT< arn: STRING, accountid: STRING, type: STRING>>, eventtype STRING, apiversion STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING, vpcendpointaccountid STRING, eventcategory STRING, addendum STRUCT< reason:STRING, updatedfields:STRING, originalrequestid:STRING, originaleventid:STRING>, sessioncredentialfromconsole STRING, edgedevicedetails STRING, tlsdetails STRUCT< tlsversion:STRING, ciphersuite:STRING, clientprovidedhostheader:STRING> ) PARTITIONED BY ( `timestamp` string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '{trail_location}' TBLPROPERTIES ( 'projection.enabled'='true', 'projection.timestamp.format'='yyyy/MM/dd', 'projection.timestamp.interval'='1', 'projection.timestamp.interval.unit'='DAYS', 'projection.timestamp.range'='{start_date},NOW', 'projection.timestamp.type'='date', 'storage.location.template'='{trail_location}/${{timestamp}}') """ try: athena_client = boto3.client('athena', region_name=output_region) result = await run_athena_query(athena_client, database, output_bucket, query) return result except Exception as e: return f"Error creating Athena table: {str(e)}"
- server.py:148-156 (schema)Function signature and docstring defining the input schema (parameters with types and descriptions) and output (str: success or error message). Used by FastMCP for tool schema.async def athena_create_cloudtrail_table( cloudtrail_bucket: str, is_org_trail: bool, account_id: str, output_bucket: str, output_region: str, partition_region: str, database: str = "default", ) -> str:
- server.py:147-147 (registration)The @mcp.tool() decorator registers this function as an MCP tool.@mcp.tool()
- server.py:99-138 (helper)Helper function used by the tool to execute Athena queries, wait for completion, and retrieve JSON results.async def run_athena_query(athena_client, database: str, output_bucket: str, query: str): response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': output_bucket} ) query_execution_id = response['QueryExecutionId'] # Wait for the query to complete max_attempts = 10 attempts = 0 while attempts < max_attempts: status = athena_client.get_query_execution(QueryExecutionId=query_execution_id) state = status['QueryExecution']['Status']['State'] if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']: break time.sleep(3) attempts += 1 if state != 'SUCCEEDED': raise Exception(f"Athena query {query_execution_id} did not succeed; state: {state}") # Retrieve query results results_paginator = athena_client.get_paginator('get_query_results') result_rows = [] for page in results_paginator.paginate(QueryExecutionId=query_execution_id): result_rows.extend(page['ResultSet']['Rows']) # Check if there are results if len(result_rows) < 2: return "No results found." # Extract the header and data # Process rows (skipping header row) and convert to JSON header = [col['VarCharValue'] for col in result_rows[0]['Data']] data = [] for row in result_rows[1:]: row_data = {} for idx, col in enumerate(row['Data']): row_data[header[idx]] = col.get('VarCharValue', '') data.append(row_data) return json.dumps(data, indent=2)
- server.py:139-146 (helper)Helper function called if is_org_trail=True to get AWS Organization ID for constructing the S3 path.def get_organization_details(): """ Returns the Organization object from AWS Organizations. """ client = boto3.client('organizations') resp = client.describe_organization() return resp['Organization']