@mcp.tool()
async def athena_create_cloudtrail_table(
cloudtrail_bucket: str,
is_org_trail: bool,
account_id: str,
output_bucket: str,
output_region: str,
partition_region: str,
database: str = "default",
) -> str:
"""
Create an Athena table for CloudTrail logs with partition projection.
<IMPORTANT>
Before using this tool ask the user for OUTPUT bucket, unless it is provided.
This is necessary to create the table correctly. If the API fails, interrupt the process and ask the user for the OUTPUT BUCKET.
</IMPORTANT>
Parameters:
cloudtrail_bucket (str): The S3 bucket for CloudTrail logs - you can retrieve it using the 'cloudtrail_describe_trails' tool.
is_org_trail (bool): Indicates if the trail is for the organization.
account_id (str): Your AWS account ID - you can retrieve it.
output_bucket (str): Ask the user if not specified, S3 bucket URI (e.g. 's3://my-athena-query-results/') for query results - different from cloudtrail_bucket.
output_region (str): The AWS region for the output bucket - use 'us-east-1' if not specified.
partition_region (str): The region of the events to be queried. It is used to create the S3 path for the Athena table.
database (str): Athena database name to be used.
Returns:
str: An empty result if successful, or an error message if there was an issue.
"""
# craft the Athena table name with date+hour suffix
current_time = datetime.datetime.now()
date_hour_suffix = current_time.strftime("%Y%m%d_%H%M")
global athena_table
athena_table = f"cloudtrail_logs_pp_{date_hour_suffix}" # Athena table name with date+hour suffix
# Get the organization ID if the trail is for the organization.
if is_org_trail:
org_id = get_organization_details().get('Id')
trail_location = f's3://{cloudtrail_bucket}/AWSLogs/{org_id}/{account_id}/CloudTrail/{partition_region}/'
else:
trail_location = f's3://{cloudtrail_bucket}/AWSLogs/{account_id}/CloudTrail/{partition_region}/'
# Set the start date for partition projection.
start_date = (datetime.datetime.now() - datetime.timedelta(days=10)).strftime("%Y/%m/%d") # Format: yyyy/MM/dd, 10 days ago
# Construct the CREATE TABLE query with partition projection.
query = f"""
CREATE EXTERNAL TABLE {athena_table}(
eventversion STRING,
useridentity STRUCT<
type: STRING,
principalid: STRING,
arn: STRING,
accountid: STRING,
invokedby: STRING,
accesskeyid: STRING,
username: STRING,
onbehalfof: STRUCT<
userid: STRING,
identitystorearn: STRING>,
sessioncontext: STRUCT<
attributes: STRUCT<
mfaauthenticated: STRING,
creationdate: STRING>,
sessionissuer: STRUCT<
type: STRING,
principalid: STRING,
arn: STRING,
accountid: STRING,
username: STRING>,
ec2roledelivery:STRING,
webidfederationdata: STRUCT<
federatedprovider: STRING,
attributes: map<string,string>>
>
>,
eventtime STRING,
eventsource STRING,
eventname STRING,
awsregion STRING,
sourceipaddress STRING,
useragent STRING,
errorcode STRING,
errormessage STRING,
requestparameters STRING,
responseelements STRING,
additionaleventdata STRING,
requestid STRING,
eventid STRING,
readonly STRING,
resources ARRAY<STRUCT<
arn: STRING,
accountid: STRING,
type: STRING>>,
eventtype STRING,
apiversion STRING,
recipientaccountid STRING,
serviceeventdetails STRING,
sharedeventid STRING,
vpcendpointid STRING,
vpcendpointaccountid STRING,
eventcategory STRING,
addendum STRUCT<
reason:STRING,
updatedfields:STRING,
originalrequestid:STRING,
originaleventid:STRING>,
sessioncredentialfromconsole STRING,
edgedevicedetails STRING,
tlsdetails STRUCT<
tlsversion:STRING,
ciphersuite:STRING,
clientprovidedhostheader:STRING>
)
PARTITIONED BY (
`timestamp` string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{trail_location}'
TBLPROPERTIES (
'projection.enabled'='true',
'projection.timestamp.format'='yyyy/MM/dd',
'projection.timestamp.interval'='1',
'projection.timestamp.interval.unit'='DAYS',
'projection.timestamp.range'='{start_date},NOW',
'projection.timestamp.type'='date',
'storage.location.template'='{trail_location}/${{timestamp}}')
"""
try:
athena_client = boto3.client('athena', region_name=output_region)
result = await run_athena_query(athena_client, database, output_bucket, query)
return result
except Exception as e:
return f"Error creating Athena table: {str(e)}"