task_executor.py•7.08 kB
"""
Lambda function template for executing TimeLooker search tasks.
This function reuses the existing TaskManager and SearchEngine from the main codebase.
"""
import json
import os
import sys
from datetime import datetime
from typing import Dict, Any
# Add the packaged source code to the path
sys.path.insert(0, '/opt/python')
sys.path.insert(0, '/var/task')
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
AWS Lambda handler for executing a TimeLooker search task.
Environment Variables Required:
- TASK_ID: ID of the task to execute
- DATABASE_SECRET_ARN: ARN of RDS credentials secret
- ANTHROPIC_SECRET_ARN: ARN of Anthropic API key secret
- EMAIL_TEMPLATES_BUCKET: S3 bucket containing email templates
- REGION: AWS region
"""
try:
# Import AWS SDK
import boto3
# Import our existing TimeLooker components (packaged with Lambda)
from src.core.search_engine_claude import SearchEngine
from src.core.task_manager import TaskManager as CoreTaskManager
from src.aws.lambda_database_manager import LambdaDatabaseManager
from src.aws.ses_email_service import SESEmailService
# Get environment variables
task_id = int(os.environ['TASK_ID'])
database_secret_arn = os.environ['DATABASE_SECRET_ARN']
anthropic_secret_arn = os.environ['ANTHROPIC_SECRET_ARN']
email_templates_bucket = os.environ['EMAIL_TEMPLATES_BUCKET']
# AWS_REGION is automatically available in Lambda runtime
region = os.environ.get('AWS_REGION', os.environ.get('AWS_DEFAULT_REGION', 'eu-west-2'))
print(f"Executing search task {task_id}")
# Initialize AWS clients
secrets_client = boto3.client('secretsmanager', region_name=region)
s3_client = boto3.client('s3', region_name=region)
ses_client = boto3.client('ses', region_name=region)
# Get secrets
db_secret = secrets_client.get_secret_value(SecretId=database_secret_arn)
db_credentials = json.loads(db_secret['SecretString'])
anthropic_secret = secrets_client.get_secret_value(SecretId=anthropic_secret_arn)
anthropic_credentials = json.loads(anthropic_secret['SecretString'])
# Setup database connection for Lambda
database_url = f"postgresql://{db_credentials['username']}:{db_credentials['password']}@{db_credentials['host']}:{db_credentials['port']}/{db_credentials['dbname']}"
os.environ['DATABASE_URL'] = database_url
os.environ['ANTHROPIC_API_KEY'] = anthropic_credentials['api_key']
print(f"Database URL configured: {db_credentials['host']}:{db_credentials['port']}")
print(f"Anthropic API key configured: {anthropic_credentials['api_key'][:10]}...")
# Initialize existing components using core (direct database) task manager
task_manager = CoreTaskManager()
search_engine = SearchEngine()
email_service = SESEmailService(
ses_client=ses_client,
s3_client=s3_client,
templates_bucket=email_templates_bucket
)
# Get task configuration
task_config = task_manager.get_task(task_id)
if not task_config:
return {
'statusCode': 404,
'body': json.dumps({
'error': f'Task {task_id} not found'
})
}
print(f"Task description: {task_config['task_description']}")
# Check if task should run (frequency check)
if not task_manager.should_task_run(task_id):
return {
'statusCode': 200,
'body': json.dumps({
'task_id': task_id,
'status': 'skipped',
'reason': 'Frequency not met'
})
}
# Create execution record
execution_id = task_manager.create_execution_record(task_id)
print(f"Created execution record: {execution_id}")
try:
# Get previous search results
previous_results = task_manager.get_previous_search_results(task_id)
print(f"Found {len(previous_results)} previous results")
# Execute search and comparison using existing search engine
current_items, new_items = search_engine.search_and_compare(
task_config['task_description'],
previous_results
)
print(f"Search completed: {len(current_items)} total, {len(new_items)} new")
# Save current results to database
save_success = task_manager.save_search_results(task_id, current_items)
if not save_success:
print("Warning: Failed to save search results")
# Send notification if new items found
notification_sent = False
if new_items:
notification_sent = email_service.send_notification(
task_config=task_config,
new_items=new_items
)
print(f"Email notification sent: {notification_sent}")
# Update execution record
task_manager.update_execution_record(
execution_id=execution_id,
status='completed',
items_found=len(current_items),
new_items_count=len(new_items),
notification_sent=notification_sent
)
return {
'statusCode': 200,
'body': json.dumps({
'task_id': task_id,
'execution_id': execution_id,
'total_items': len(current_items),
'new_items': len(new_items),
'notification_sent': notification_sent,
'status': 'completed'
})
}
except Exception as e:
# Update execution record with error
error_message = str(e)
print(f"Error during execution: {error_message}")
task_manager.update_execution_record(
execution_id=execution_id,
status='failed',
error_message=error_message
)
return {
'statusCode': 500,
'body': json.dumps({
'task_id': task_id,
'execution_id': execution_id,
'status': 'failed',
'error': error_message
})
}
except Exception as e:
print(f"Fatal error in lambda handler: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({
'status': 'fatal_error',
'error': str(e)
})
}