athena_query_events
Query AWS CloudTrail logs with Athena to search for specific events, users, IPs, or resources using granular filters for security analysis.
Instructions
Query Athena for granular granular searches on CloudTrail logs.
<IMPORTANT>
Before calling this tool, you must call the athena_create_cloudtrail_table tool to create the table.
If the user asks for a different region, you must call the athena_create_cloudtrail_table tool to create the table in that region.
</IMPORTANT>
Parameters:
aws_region: The AWS region - use 'us-east-1' if not specified.
start_time: ISO string of the start time
end_time: ISO string of the end time
database: Athena database name to be used - use 'default' if not specified.
output_bucket: S3 bucket URI (e.g. 's3://my-athena-query-results/') for query results - different from cloudtrail_bucket.
event_names: List of event names to filter on (e.g. ["GetObject", "DeleteObject"])
principal_id: Optional principal ID to filter on. Use the percent sign (%) as a wildcard character.
user_arn: Optional user ARN to filter on. Use the percent sign (%) as a wildcard character. This is the ARN of the user performing the action.
user_name: Optional user name to filter on. This is the name of the user performing the action.
role_name: Optional role name to filter on. This is the name of the role assumed by the user performing the action.
ip_address: Optional IP address to filter on. Use the percent sign (%) as a wildcard character. This is the IP address of the user performing the action.
bucket_name: Optional bucket name to filter on. Use the percent sign (%) as a wildcard character.
limit: Maximum number of results to return (default is 50).
Returns:
str: JSON-formatted result of the Athena query.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| aws_region | Yes | ||
| start_time | Yes | ||
| end_time | Yes | ||
| database | Yes | ||
| output_bucket | Yes | ||
| event_names | Yes | ||
| principal_id | No | ||
| user_arn | No | ||
| user_name | No | ||
| role_name | No | ||
| ip_address | No | ||
| bucket_name | No | ||
| limit | No |
Implementation Reference
- server.py:286-385 (handler)The primary handler function for the 'athena_query_events' tool. It is decorated with @mcp.tool() which serves as both the handler implementation and registration in FastMCP. Constructs a dynamic SQL query for Athena on CloudTrail logs, filtering by time range, event names, and optional fields like principal_id, user_arn, etc. Executes via helper run_athena_query and returns JSON results. The function signature and docstring define the input schema.@mcp.tool() async def athena_query_events( aws_region: str, start_time: str, end_time: str, database: str, output_bucket: str, event_names: list, principal_id: str = None, user_arn: str = None, user_name: str = None, role_name: str = None, ip_address: str = None, bucket_name: str = None, limit: int = 50, ) -> str: """ Query Athena for granular granular searches on CloudTrail logs. <IMPORTANT> Before calling this tool, you must call the athena_create_cloudtrail_table tool to create the table. If the user asks for a different region, you must call the athena_create_cloudtrail_table tool to create the table in that region. </IMPORTANT> Parameters: aws_region: The AWS region - use 'us-east-1' if not specified. start_time: ISO string of the start time end_time: ISO string of the end time database: Athena database name to be used - use 'default' if not specified. output_bucket: S3 bucket URI (e.g. 's3://my-athena-query-results/') for query results - different from cloudtrail_bucket. event_names: List of event names to filter on (e.g. ["GetObject", "DeleteObject"]) principal_id: Optional principal ID to filter on. Use the percent sign (%) as a wildcard character. user_arn: Optional user ARN to filter on. Use the percent sign (%) as a wildcard character. This is the ARN of the user performing the action. user_name: Optional user name to filter on. This is the name of the user performing the action. role_name: Optional role name to filter on. This is the name of the role assumed by the user performing the action. ip_address: Optional IP address to filter on. Use the percent sign (%) as a wildcard character. This is the IP address of the user performing the action. bucket_name: Optional bucket name to filter on. Use the percent sign (%) as a wildcard character. limit: Maximum number of results to return (default is 50). Returns: str: JSON-formatted result of the Athena query. """ # Construct an SQL query event_filter = ", ".join([f"'{name}'" for name in event_names]) events_comment = session_comment = principal_comment = ip_comment = bucket_comment = user_comment = arn_comment = "--" if event_names: events_comment = "" if role_name: session_comment = "" if principal_id: principal_comment = "" if user_arn: arn_comment = "" if user_name: user_comment = "" if ip_address: ip_comment = "" if bucket_name: bucket_comment = "" try: # Convert ISO format timestamps to Athena-compatible format start_dt = datetime.datetime.fromisoformat(start_time.replace('T', ' ').replace('Z', '')) end_dt = datetime.datetime.fromisoformat(end_time.replace('T', ' ').replace('Z', '')) query = f""" WITH flat_logs AS ( SELECT eventTime, eventName, userIdentity.principalId, userIdentity.arn, userIdentity.userName, userIdentity.sessionContext.sessionIssuer.userName as sessionUserName, sourceIPAddress, eventSource, json_extract_scalar(requestParameters, '$.bucketName') as bucketName, json_extract_scalar(requestParameters, '$.key') as object FROM {athena_table} ) SELECT * FROM flat_logs WHERE date(from_iso8601_timestamp(eventTime)) BETWEEN timestamp '{start_dt}' AND timestamp '{end_dt}' {events_comment}AND eventName IN ({event_filter}) {user_comment}AND userName = '{user_name}' {session_comment}AND sessionUserName = '{role_name}' {principal_comment}AND principalId LIKE '{principal_id}' {arn_comment}AND arn LIKE '{user_arn}' {ip_comment}AND sourceIPAddress LIKE '{ip_address}' {bucket_comment}AND bucketName LIKE '{bucket_name}' LIMIT {limit}; """ except ValueError as e: return f"Error processing timestamp formats: {str(e)}" try: athena_client = boto3.client('athena', region_name=aws_region) result = await run_athena_query(athena_client, database, output_bucket, query) except Exception as e: return f"Error querying Athena: {str(e)}" return result
- server.py:99-138 (helper)Supporting helper function 'run_athena_query' called by athena_query_events to start, poll, and retrieve results from an Athena query execution, parsing results into JSON format.async def run_athena_query(athena_client, database: str, output_bucket: str, query: str): response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': output_bucket} ) query_execution_id = response['QueryExecutionId'] # Wait for the query to complete max_attempts = 10 attempts = 0 while attempts < max_attempts: status = athena_client.get_query_execution(QueryExecutionId=query_execution_id) state = status['QueryExecution']['Status']['State'] if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']: break time.sleep(3) attempts += 1 if state != 'SUCCEEDED': raise Exception(f"Athena query {query_execution_id} did not succeed; state: {state}") # Retrieve query results results_paginator = athena_client.get_paginator('get_query_results') result_rows = [] for page in results_paginator.paginate(QueryExecutionId=query_execution_id): result_rows.extend(page['ResultSet']['Rows']) # Check if there are results if len(result_rows) < 2: return "No results found." # Extract the header and data # Process rows (skipping header row) and convert to JSON header = [col['VarCharValue'] for col in result_rows[0]['Data']] data = [] for row in result_rows[1:]: row_data = {} for idx, col in enumerate(row['Data']): row_data[header[idx]] = col.get('VarCharValue', '') data.append(row_data) return json.dumps(data, indent=2)
- server.py:16-16 (helper)Global variable 'athena_table' used by athena_query_events to reference the CloudTrail logs table name (set in athena_create_cloudtrail_table tool).athena_table = None