"""GraphQL Resolver for API Key Operations
This Lambda handles GraphQL queries and mutations for API key management:
- getApiKey: Returns the current API key (admin only)
- regenerateApiKey: Creates a new API key and deletes old ones (admin only)
"""
import logging
import os
import time
from datetime import UTC, datetime
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
appsync = None
def _initialize_client():
"""Initialize AppSync client (lazy initialization for testing)."""
global appsync
if appsync is None:
appsync = boto3.client("appsync")
def lambda_handler(event, context):
"""
AWS Lambda handler for GraphQL API key operations.
Args:
event: AppSync event with operation details
context: Lambda context
Returns:
Response data matching GraphQL schema (ApiKeyResponse)
"""
_initialize_client()
operation = event["info"]["fieldName"]
logger.info(f"Processing operation: {operation}")
api_id = os.environ.get("APPSYNC_API_ID")
if not api_id:
return {"apiKey": "", "id": "", "expires": "", "error": "APPSYNC_API_ID not configured"}
try:
if operation == "getApiKey":
return handle_get_api_key(api_id)
if operation == "regenerateApiKey":
return handle_regenerate_api_key(api_id)
raise ValueError(f"Unsupported operation: {operation}")
except Exception as e:
logger.exception(f"Error processing {operation}")
return {"apiKey": "", "id": "", "expires": "", "error": str(e)}
def handle_get_api_key(api_id: str) -> dict:
"""
Get the current API key.
Returns the most recent (longest-lived) API key.
"""
try:
keys = list_api_keys(api_id)
if not keys:
return {"apiKey": "", "id": "", "expires": "", "error": "No API key found"}
# Sort by expiration (descending) to get the newest key
keys.sort(key=lambda k: k.get("expires", 0), reverse=True)
key = keys[0]
expires_ts = key.get("expires", 0)
expires_str = datetime.fromtimestamp(expires_ts, tz=UTC).isoformat()
return {
"apiKey": key.get("id", ""), # Note: id is the key value for API access
"id": key.get("id", ""),
"expires": expires_str,
"error": None,
}
except ClientError as e:
logger.exception("Failed to get API key")
return {"apiKey": "", "id": "", "expires": "", "error": str(e)}
def handle_regenerate_api_key(api_id: str) -> dict:
"""
Regenerate the API key.
Creates a new key with 365-day expiration and deletes old keys.
"""
try:
# List existing keys
existing_keys = list_api_keys(api_id)
logger.info(f"Found {len(existing_keys)} existing API key(s)")
# Create new API key with 365-day expiration
new_expiry = int(time.time()) + (365 * 24 * 60 * 60)
new_key_response = appsync.create_api_key(
apiId=api_id,
description="Public API key for search and theme access (regenerated by admin)",
expires=new_expiry,
)
new_key = new_key_response["apiKey"]
new_key_id = new_key["id"]
logger.info(f"Created new API key: {new_key_id}")
# Delete old keys
for key in existing_keys:
old_key_id = key["id"]
try:
appsync.delete_api_key(apiId=api_id, id=old_key_id)
logger.info(f"Deleted old API key: {old_key_id}")
except ClientError as e:
logger.warning(f"Failed to delete key {old_key_id}: {e}")
expires_str = datetime.fromtimestamp(new_expiry, tz=UTC).isoformat()
return {
"apiKey": new_key_id,
"id": new_key_id,
"expires": expires_str,
"error": None,
}
except ClientError as e:
logger.exception("Failed to regenerate API key")
return {"apiKey": "", "id": "", "expires": "", "error": str(e)}
def list_api_keys(api_id: str) -> list:
"""List all API keys for the AppSync API."""
keys = []
paginator = appsync.get_paginator("list_api_keys")
for page in paginator.paginate(apiId=api_id):
keys.extend(page.get("apiKeys", []))
return keys