"""
Data sanitization for Athena query results.
This module redacts sensitive data from query results before returning to user.
Strategy: Replace values in sensitive columns with '***REDACTED***'
## Why sanitization is necessary
Query results may contain:
- Passwords (hashed or plain)
- API tokens and keys
- OAuth credentials
- Private keys
- Session tokens
- Auth tokens
Even with query validation, a legitimate SELECT could accidentally expose
sensitive data if user queries wrong table or joins with credentials table.
## Sanitization strategy (from user requirements)
Pattern matching on column names (case-insensitive):
- password, passwd, pwd
- token, auth_token, api_token, session_token
- secret, api_secret, client_secret
- key, api_key, private_key, secret_key
- credential, credentials
- auth, authentication (too broad - excluded)
Replace values with: '***REDACTED***'
## Usage
from sanitizer import sanitize_results
results = [
{'user_id': 123, 'email': 'test@example.com', 'password': 'hashed123'},
{'user_id': 456, 'email': 'user@example.com', 'password': 'hashed456'},
]
safe_results = sanitize_results(results)
# [{'user_id': 123, 'email': 'test@example.com', 'password': '***REDACTED***'}, ...]
## Performance
- O(n * m) where n = rows, m = columns
- For 100K rows with 10 columns: ~1M operations (fast enough)
- No regex compilation per row (compiled once at module load)
"""
import re
from typing import Any
# Sensitive column name patterns (case-insensitive)
# Matches column names like: password, user_password, hashed_pwd, api_token, etc
SENSITIVE_PATTERNS = [
r'\bpassword\b',
r'\bpasswd\b',
r'\bpwd\b',
r'\btoken\b',
r'\bsecret\b',
r'\bapi_key\b',
r'\bprivate_key\b',
r'\bsecret_key\b',
r'\bcredential', # Matches credential, credentials
r'\bauth_token\b',
r'\bsession_token\b',
r'\baccess_token\b',
r'\brefresh_token\b',
]
# Compile pattern once (performance optimization)
SENSITIVE_COLUMN_REGEX = re.compile('|'.join(SENSITIVE_PATTERNS), re.IGNORECASE)
# Redaction placeholder
REDACTED_VALUE = '***REDACTED***'
def is_sensitive_column(column_name: str) -> bool:
"""
Check if column name matches sensitive data patterns.
Args:
column_name: Column name from query results
Returns:
True if column contains sensitive data, False otherwise
Example:
>>> is_sensitive_column('user_password')
True
>>> is_sensitive_column('api_token')
True
>>> is_sensitive_column('user_id')
False
"""
if not column_name:
return False
return SENSITIVE_COLUMN_REGEX.search(column_name) is not None
def sanitize_row(row: dict[str, Any]) -> dict[str, Any]:
"""
Sanitize single row by redacting sensitive columns.
Args:
row: Dictionary representing one query result row
Returns:
New dictionary with sensitive values redacted
Example:
>>> row = {'id': 1, 'email': 'test@example.com', 'password': 'secret123'}
>>> sanitize_row(row)
{'id': 1, 'email': 'test@example.com', 'password': '***REDACTED***'}
"""
sanitized = {}
for column_name, value in row.items():
if is_sensitive_column(column_name):
sanitized[column_name] = REDACTED_VALUE
else:
sanitized[column_name] = value
return sanitized
def sanitize_results(results: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Sanitize all rows in query results.
Args:
results: List of dictionaries from Athena query
(format returned by AthenaHelper.get_query_results())
Returns:
New list with sensitive values redacted
Example:
>>> results = [
... {'id': 1, 'password': 'hash1'},
... {'id': 2, 'password': 'hash2'},
... ]
>>> sanitize_results(results)
[{'id': 1, 'password': '***REDACTED***'},
{'id': 2, 'password': '***REDACTED***'}]
"""
if not results:
return []
return [sanitize_row(row) for row in results]
def get_sensitive_columns(results: list[dict[str, Any]]) -> list[str]:
"""
Identify which columns in results contain sensitive data.
Useful for logging/debugging - shows what was redacted.
Args:
results: List of dictionaries from Athena query
Returns:
List of sensitive column names found
Example:
>>> results = [{'id': 1, 'password': 'x', 'api_token': 'y'}]
>>> get_sensitive_columns(results)
['password', 'api_token']
"""
if not results:
return []
# Check first row for column names (all rows have same structure)
first_row = results[0]
sensitive = []
for column_name in first_row.keys():
if is_sensitive_column(column_name):
sensitive.append(column_name)
return sensitive