"""
AWS Lambda deployment system for TimeLooker tasks.
Based on the CodeHook repository pattern for dynamic Lambda function creation.
"""
import json
import os
import tempfile
import zipfile
import shutil
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import boto3
from botocore.exceptions import ClientError
from ..utils.logging_config import setup_logging
logger = setup_logging(__name__)
class LambdaDeployer:
"""
Manages deployment and lifecycle of Lambda functions for TimeLooker tasks.
Based on the CodeHook repository pattern.
"""
def __init__(self, region: str = 'eu-west-2'):
"""
Initialize Lambda deployer.
Args:
region: AWS region for deployment
"""
self.region = region
self.lambda_client = boto3.client('lambda', region_name=region)
self.scheduler_client = boto3.client('scheduler', region_name=region)
self.iam_client = boto3.client('iam', region_name=region)
# Configuration from infrastructure stack
self.lambda_execution_role_arn = os.getenv('LAMBDA_EXECUTION_ROLE_ARN')
self.database_secret_arn = os.getenv('DATABASE_SECRET_ARN')
self.anthropic_secret_arn = os.getenv('ANTHROPIC_SECRET_ARN')
self.email_templates_bucket = os.getenv('EMAIL_TEMPLATES_BUCKET')
def deploy_task_function(self, task_id: int, task_config: Dict[str, Any]) -> str:
"""
Deploy a Lambda function for a specific task.
Args:
task_id: ID of the task
task_config: Task configuration dictionary
Returns:
ARN of the deployed Lambda function
"""
function_name = f"timelooker-task-{task_id}"
try:
# Create deployment package
deployment_package = self._create_deployment_package()
# Environment variables for the Lambda function
# Note: AWS_REGION is automatically set by Lambda runtime, don't include it
environment = {
'TASK_ID': str(task_id),
'DATABASE_SECRET_ARN': self.database_secret_arn,
'ANTHROPIC_SECRET_ARN': self.anthropic_secret_arn,
'EMAIL_TEMPLATES_BUCKET': self.email_templates_bucket
}
# Check if function already exists
try:
self.lambda_client.get_function(FunctionName=function_name)
# Function exists, update it
logger.info(f"Updating existing Lambda function: {function_name}")
response = self.lambda_client.update_function_code(
FunctionName=function_name,
ZipFile=deployment_package
)
# Update environment variables and architecture
self.lambda_client.update_function_configuration(
FunctionName=function_name,
Environment={'Variables': environment},
Architectures=['arm64'] # Ensure existing functions use ARM64
)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
# Function doesn't exist, create it
logger.info(f"Creating new Lambda function: {function_name}")
response = self.lambda_client.create_function(
FunctionName=function_name,
Runtime='python3.11',
Role=self.lambda_execution_role_arn,
Handler='task_executor.lambda_handler',
Code={'ZipFile': deployment_package},
Description=f'TimeLooker task executor for: {task_config["task_description"][:100]}...',
Timeout=300, # 5 minute timeout
MemorySize=256, # 256 MB memory
Architectures=['arm64'], # Use ARM64 architecture
Environment={'Variables': environment},
Tags={
'Project': 'TimeLooker',
'TaskId': str(task_id),
'CreatedBy': 'TimeLookerMCP'
}
)
else:
raise
function_arn = response['FunctionArn']
logger.info(f"Lambda function deployed: {function_arn}")
# Create EventBridge schedule
self._create_schedule(task_id, task_config, function_arn)
# Add permission for EventBridge to invoke Lambda
self._add_eventbridge_permission(function_name, task_id)
return function_arn
except Exception as e:
logger.error(f"Error deploying Lambda function for task {task_id}: {e}")
raise
def delete_task_function(self, task_id: int) -> bool:
"""
Delete Lambda function and associated schedule for a task.
Args:
task_id: ID of the task
Returns:
True if successful, False otherwise
"""
function_name = f"timelooker-task-{task_id}"
schedule_name = f"timelooker-schedule-{task_id}"
try:
# Delete EventBridge schedule first
try:
self.scheduler_client.delete_schedule(Name=schedule_name)
logger.info(f"Deleted EventBridge schedule: {schedule_name}")
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceNotFoundException':
logger.warning(f"Error deleting schedule {schedule_name}: {e}")
# Delete Lambda function
try:
self.lambda_client.delete_function(FunctionName=function_name)
logger.info(f"Deleted Lambda function: {function_name}")
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceNotFoundException':
logger.warning(f"Error deleting function {function_name}: {e}")
return True
except Exception as e:
logger.error(f"Error deleting Lambda function for task {task_id}: {e}")
return False
def schedule_cleanup(self, task_id: int, runtime_minutes: int) -> None:
"""
Schedule automatic cleanup of Lambda function after runtime expires.
Args:
task_id: ID of the task
runtime_minutes: Runtime duration in minutes
"""
cleanup_time = datetime.utcnow() + timedelta(minutes=runtime_minutes)
cleanup_schedule_name = f"timelooker-cleanup-{task_id}"
try:
# Create one-time schedule for cleanup
self.scheduler_client.create_schedule(
Name=cleanup_schedule_name,
ScheduleExpression=f"at({cleanup_time.strftime('%Y-%m-%dT%H:%M:%S')})",
Target={
'Arn': 'arn:aws:lambda:eu-west-2:123456789012:function:timelooker-cleanup', # Cleanup function
'RoleArn': self.lambda_execution_role_arn,
'Input': json.dumps({'task_id': task_id, 'action': 'cleanup'})
},
FlexibleTimeWindow={'Mode': 'OFF'},
Description=f'Cleanup TimeLooker task {task_id} after runtime expires'
)
logger.info(f"Scheduled cleanup for task {task_id} at {cleanup_time}")
except Exception as e:
logger.warning(f"Failed to schedule cleanup for task {task_id}: {e}")
def _create_deployment_package(self) -> bytes:
"""
Create a deployment package containing the Lambda function and dependencies.
Following AWS best practices: https://docs.aws.amazon.com/lambda/latest/dg/python-package.html
Returns:
Zip file contents as bytes
"""
# Create temporary directory for packaging
with tempfile.TemporaryDirectory() as temp_dir:
logger.info(f"Creating deployment package in {temp_dir}")
# Step 1: Install dependencies first in root of package
logger.info("Installing dependencies...")
self._install_dependencies(temp_dir)
# Step 2: Copy Lambda function template to root
lambda_template_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
'lambda_templates',
'task_executor.py'
)
shutil.copy2(lambda_template_path, temp_dir)
logger.info("Copied Lambda handler")
# Step 3: Copy source code
src_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
'src'
)
dest_src_dir = os.path.join(temp_dir, 'src')
shutil.copytree(src_dir, dest_src_dir, ignore=shutil.ignore_patterns('__pycache__', '*.pyc'))
logger.info("Copied source code")
# Step 4: List contents for debugging
contents = []
for root, dirs, files in os.walk(temp_dir):
level = root.replace(temp_dir, '').count(os.sep)
indent = ' ' * 2 * level
contents.append(f"{indent}{os.path.basename(root)}/")
subindent = ' ' * 2 * (level + 1)
for file in files[:5]: # Show first 5 files per directory
contents.append(f"{subindent}{file}")
if len(files) > 5:
contents.append(f"{subindent}... and {len(files) - 5} more files")
logger.info(f"Package contents:\n" + "\n".join(contents[:20])) # Show first 20 lines
# Step 5: Create zip file
zip_buffer = tempfile.NamedTemporaryFile()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for root, dirs, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
arc_name = os.path.relpath(file_path, temp_dir)
zip_file.write(file_path, arc_name)
zip_buffer.seek(0)
zip_contents = zip_buffer.read()
zip_buffer.close()
logger.info(f"Created deployment package ({len(zip_contents):,} bytes)")
return zip_contents
def _install_dependencies(self, package_dir: str) -> None:
"""
Install Python dependencies for Lambda function.
Following AWS Lambda Python packaging guide.
Args:
package_dir: Directory to install dependencies to (should be package root)
"""
import subprocess
import sys
import platform
# Key packages needed (exclude boto3/botocore as they're included in Lambda runtime)
# Using versions that work locally from pyproject.toml
packages = [
'psycopg2-binary>=2.9.0', # Database adapter
'sqlalchemy>=2.0.41', # Use same version as local
'anthropic>=0.55.0', # Anthropic Claude API client
'httpx>=0.28.1', # HTTP client (required by anthropic)
'requests>=2.31.0', # HTTP client
'pystache>=0.6.5', # Template engine for emails
'python-dotenv>=1.0.0' # Environment loading
]
logger.info(f"Installing {len(packages)} packages to {package_dir}")
logger.info(f"Python version: {sys.version}")
logger.info(f"Platform: {platform.platform()}")
# Install each package individually with verbose output
for package in packages:
try:
logger.info(f"Installing {package}...")
# Try different pip executables in order of preference
pip_executables = [
'pip', # System pip
'pip3', # System pip3
'python3 -m pip', # System python3 with pip module
'python -m pip' # System python with pip module
]
pip_cmd = None
for pip_exec in pip_executables:
try:
# Test if this pip works
test_cmd = pip_exec.split() + ['--version']
test_result = subprocess.run(test_cmd, capture_output=True, text=True, timeout=10)
if test_result.returncode == 0:
pip_cmd = pip_exec.split()
logger.info(f"Using pip: {pip_exec}")
break
except:
continue
if not pip_cmd:
logger.error("No working pip installation found")
continue
# Install packages for ARM64 Linux (Lambda Graviton2) architecture
cmd = pip_cmd + [
'install',
package,
'--target', package_dir,
'--only-binary=:all:', # Use binary wheels only
'--platform', 'linux_aarch64', # Target ARM64 Linux platform
'--implementation', 'cp',
'--python-version', '3.11',
'--abi', 'cp311'
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0:
logger.info(f"✓ Successfully installed {package}")
else:
logger.error(f"✗ Failed to install {package}")
logger.error(f"Command: {' '.join(cmd)}")
logger.error(f"Stdout: {result.stdout}")
logger.error(f"Stderr: {result.stderr}")
except subprocess.TimeoutExpired:
logger.error(f"Timeout installing {package}")
except Exception as e:
logger.error(f"Exception installing {package}: {e}")
# Verify installation
try:
installed_items = os.listdir(package_dir)
logger.info(f"Package directory contains {len(installed_items)} items")
# Check for key packages
key_packages = ['openai', 'sqlalchemy', 'psycopg2']
found_packages = []
for item in installed_items:
for pkg in key_packages:
if pkg in item.lower():
found_packages.append(item)
break
logger.info(f"Found package directories: {found_packages}")
# Calculate total size
total_size = 0
for root, dirs, files in os.walk(package_dir):
for file in files:
try:
total_size += os.path.getsize(os.path.join(root, file))
except:
pass
logger.info(f"Total package size: {total_size:,} bytes ({total_size/1024/1024:.1f} MB)")
except Exception as e:
logger.warning(f"Error verifying installation: {e}")
def _create_schedule(self, task_id: int, task_config: Dict[str, Any], function_arn: str) -> None:
"""
Create EventBridge schedule for periodic task execution.
Args:
task_id: ID of the task
task_config: Task configuration
function_arn: ARN of the Lambda function
"""
schedule_name = f"timelooker-schedule-{task_id}"
frequency_minutes = task_config['frequency_minutes']
# Convert frequency to rate expression
if frequency_minutes >= 60:
if frequency_minutes % 60 == 0:
rate_expression = f"rate({frequency_minutes // 60} hours)"
else:
rate_expression = f"rate({frequency_minutes} minutes)"
else:
rate_expression = f"rate({frequency_minutes} minutes)"
try:
# Delete existing schedule if it exists
try:
self.scheduler_client.delete_schedule(Name=schedule_name)
except ClientError:
pass # Schedule doesn't exist, which is fine
# Create new schedule
self.scheduler_client.create_schedule(
Name=schedule_name,
ScheduleExpression=rate_expression,
Target={
'Arn': function_arn,
'RoleArn': self.lambda_execution_role_arn,
'Input': json.dumps({'task_id': task_id})
},
FlexibleTimeWindow={'Mode': 'OFF'},
Description=f'Execute TimeLooker task {task_id} every {frequency_minutes} minutes'
)
logger.info(f"Created EventBridge schedule: {schedule_name} ({rate_expression})")
except Exception as e:
logger.error(f"Error creating schedule for task {task_id}: {e}")
raise
def _add_eventbridge_permission(self, function_name: str, task_id: int) -> None:
"""
Add permission for EventBridge Scheduler to invoke Lambda function.
Args:
function_name: Name of the Lambda function
task_id: ID of the task
"""
statement_id = f"timelooker-eventbridge-{task_id}"
schedule_name = f"timelooker-schedule-{task_id}"
try:
# Remove existing permission if it exists
try:
self.lambda_client.remove_permission(
FunctionName=function_name,
StatementId=statement_id
)
except ClientError:
pass # Permission doesn't exist, which is fine
# Add permission for EventBridge Scheduler to invoke Lambda
self.lambda_client.add_permission(
FunctionName=function_name,
StatementId=statement_id,
Action='lambda:InvokeFunction',
Principal='scheduler.amazonaws.com',
SourceArn=f'arn:aws:scheduler:{self.region}:{self._get_account_id()}:schedule/{schedule_name}'
)
logger.info(f"Added EventBridge permission for function {function_name}")
except Exception as e:
logger.error(f"Error adding EventBridge permission: {e}")
raise
def _get_account_id(self) -> str:
"""Get the AWS account ID."""
try:
sts_client = boto3.client('sts', region_name=self.region)
return sts_client.get_caller_identity()['Account']
except Exception as e:
logger.error(f"Error getting account ID: {e}")
raise