Skip to main content
Glama

athena_create_cloudtrail_table

Create an Athena table with partition projection to query CloudTrail logs stored in S3, enabling efficient log analysis in AWS environments.

Instructions

Create an Athena table for CloudTrail logs with partition projection. <IMPORTANT> Before using this tool ask the user for OUTPUT bucket, unless it is provided. This is necessary to create the table correctly. If the API fails, interrupt the process and ask the user for the OUTPUT BUCKET. </IMPORTANT> Parameters: cloudtrail_bucket (str): The S3 bucket for CloudTrail logs - you can retrieve it using the 'cloudtrail_describe_trails' tool. is_org_trail (bool): Indicates if the trail is for the organization. account_id (str): Your AWS account ID - you can retrieve it. output_bucket (str): Ask the user if not specified, S3 bucket URI (e.g. 's3://my-athena-query-results/') for query results - different from cloudtrail_bucket. output_region (str): The AWS region for the output bucket - use 'us-east-1' if not specified. partition_region (str): The region of the events to be queried. It is used to create the S3 path for the Athena table. database (str): Athena database name to be used. Returns: str: An empty result if successful, or an error message if there was an issue.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
cloudtrail_bucketYes
is_org_trailYes
account_idYes
output_bucketYes
output_regionYes
partition_regionYes
databaseNodefault

Implementation Reference

  • The main handler function for the 'athena_create_cloudtrail_table' tool. It is decorated with @mcp.tool() for registration, generates a unique table name, constructs the S3 location for CloudTrail logs based on whether it's an org trail, builds a detailed CREATE EXTERNAL TABLE query with partition projection, and executes it via the run_athena_query helper.
    @mcp.tool() async def athena_create_cloudtrail_table( cloudtrail_bucket: str, is_org_trail: bool, account_id: str, output_bucket: str, output_region: str, partition_region: str, database: str = "default", ) -> str: """ Create an Athena table for CloudTrail logs with partition projection. <IMPORTANT> Before using this tool ask the user for OUTPUT bucket, unless it is provided. This is necessary to create the table correctly. If the API fails, interrupt the process and ask the user for the OUTPUT BUCKET. </IMPORTANT> Parameters: cloudtrail_bucket (str): The S3 bucket for CloudTrail logs - you can retrieve it using the 'cloudtrail_describe_trails' tool. is_org_trail (bool): Indicates if the trail is for the organization. account_id (str): Your AWS account ID - you can retrieve it. output_bucket (str): Ask the user if not specified, S3 bucket URI (e.g. 's3://my-athena-query-results/') for query results - different from cloudtrail_bucket. output_region (str): The AWS region for the output bucket - use 'us-east-1' if not specified. partition_region (str): The region of the events to be queried. It is used to create the S3 path for the Athena table. database (str): Athena database name to be used. Returns: str: An empty result if successful, or an error message if there was an issue. """ # craft the Athena table name with date+hour suffix current_time = datetime.datetime.now() date_hour_suffix = current_time.strftime("%Y%m%d_%H%M") global athena_table athena_table = f"cloudtrail_logs_pp_{date_hour_suffix}" # Athena table name with date+hour suffix # Get the organization ID if the trail is for the organization. if is_org_trail: org_id = get_organization_details().get('Id') trail_location = f's3://{cloudtrail_bucket}/AWSLogs/{org_id}/{account_id}/CloudTrail/{partition_region}/' else: trail_location = f's3://{cloudtrail_bucket}/AWSLogs/{account_id}/CloudTrail/{partition_region}/' # Set the start date for partition projection. start_date = (datetime.datetime.now() - datetime.timedelta(days=10)).strftime("%Y/%m/%d") # Format: yyyy/MM/dd, 10 days ago # Construct the CREATE TABLE query with partition projection. query = f""" CREATE EXTERNAL TABLE {athena_table}( eventversion STRING, useridentity STRUCT< type: STRING, principalid: STRING, arn: STRING, accountid: STRING, invokedby: STRING, accesskeyid: STRING, username: STRING, onbehalfof: STRUCT< userid: STRING, identitystorearn: STRING>, sessioncontext: STRUCT< attributes: STRUCT< mfaauthenticated: STRING, creationdate: STRING>, sessionissuer: STRUCT< type: STRING, principalid: STRING, arn: STRING, accountid: STRING, username: STRING>, ec2roledelivery:STRING, webidfederationdata: STRUCT< federatedprovider: STRING, attributes: map<string,string>> > >, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, readonly STRING, resources ARRAY<STRUCT< arn: STRING, accountid: STRING, type: STRING>>, eventtype STRING, apiversion STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING, vpcendpointaccountid STRING, eventcategory STRING, addendum STRUCT< reason:STRING, updatedfields:STRING, originalrequestid:STRING, originaleventid:STRING>, sessioncredentialfromconsole STRING, edgedevicedetails STRING, tlsdetails STRUCT< tlsversion:STRING, ciphersuite:STRING, clientprovidedhostheader:STRING> ) PARTITIONED BY ( `timestamp` string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '{trail_location}' TBLPROPERTIES ( 'projection.enabled'='true', 'projection.timestamp.format'='yyyy/MM/dd', 'projection.timestamp.interval'='1', 'projection.timestamp.interval.unit'='DAYS', 'projection.timestamp.range'='{start_date},NOW', 'projection.timestamp.type'='date', 'storage.location.template'='{trail_location}/${{timestamp}}') """ try: athena_client = boto3.client('athena', region_name=output_region) result = await run_athena_query(athena_client, database, output_bucket, query) return result except Exception as e: return f"Error creating Athena table: {str(e)}"
  • Helper function used by the tool to execute an Athena DDL query (CREATE TABLE) and retrieve/parse results as JSON.
    async def run_athena_query(athena_client, database: str, output_bucket: str, query: str): response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': output_bucket} ) query_execution_id = response['QueryExecutionId'] # Wait for the query to complete max_attempts = 10 attempts = 0 while attempts < max_attempts: status = athena_client.get_query_execution(QueryExecutionId=query_execution_id) state = status['QueryExecution']['Status']['State'] if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']: break time.sleep(3) attempts += 1 if state != 'SUCCEEDED': raise Exception(f"Athena query {query_execution_id} did not succeed; state: {state}") # Retrieve query results results_paginator = athena_client.get_paginator('get_query_results') result_rows = [] for page in results_paginator.paginate(QueryExecutionId=query_execution_id): result_rows.extend(page['ResultSet']['Rows']) # Check if there are results if len(result_rows) < 2: return "No results found." # Extract the header and data # Process rows (skipping header row) and convert to JSON header = [col['VarCharValue'] for col in result_rows[0]['Data']] data = [] for row in result_rows[1:]: row_data = {} for idx, col in enumerate(row['Data']): row_data[header[idx]] = col.get('VarCharValue', '') data.append(row_data) return json.dumps(data, indent=2)
  • Helper function called conditionally (if is_org_trail) to fetch organization ID for constructing the correct S3 trail location.
    def get_organization_details(): """ Returns the Organization object from AWS Organizations. """ client = boto3.client('organizations') resp = client.describe_organization() return resp['Organization']
  • server.py:147-147 (registration)
    The @mcp.tool() decorator registers the athena_create_cloudtrail_table function as an MCP tool.
    @mcp.tool()
  • Function signature with type hints defining the input schema for the tool (used by FastMCP for validation). Detailed parameters documented in the docstring.
    async def athena_create_cloudtrail_table( cloudtrail_bucket: str, is_org_trail: bool, account_id: str, output_bucket: str, output_region: str, partition_region: str, database: str = "default", ) -> str:

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Brucedh/aws-ireveal-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server