lambda_function.py•9.07 kB
"""
AWS Lambda function for TimeLooker monitoring system.
"""
import json
import os
from typing import Dict, Any
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.core.search_engine_claude import SearchEngine
from src.api.task_manager_client import TaskManager
from src.core.models import db_manager
from src.utils.logging_config import setup_logging
# Configure logging
logger = setup_logging(__name__)
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
AWS Lambda handler function for TimeLooker monitoring.
Expected event structure:
{
"task_id": 123,
"action": "execute_search" | "create_task" | "list_tasks",
"task_config": { # Only for create_task action
"task_description": "...",
"frequency_minutes": 30,
"runtime_minutes": 120,
"recipient_email": "user@example.com"
}
}
Args:
event: Lambda event containing task information
context: Lambda context object
Returns:
Dictionary with execution results
"""
try:
# Initialize database tables if needed
db_manager.create_tables()
# Parse event
action = event.get('action', 'execute_search')
task_id = event.get('task_id')
logger.info(f"Lambda invoked with action: {action}, task_id: {task_id}")
if action == 'create_task':
return handle_create_task(event)
elif action == 'list_tasks':
return handle_list_tasks()
elif action == 'execute_search':
if not task_id:
return {
'statusCode': 400,
'body': json.dumps({'error': 'task_id is required for execute_search action'})
}
return handle_execute_search(task_id)
else:
return {
'statusCode': 400,
'body': json.dumps({'error': f'Unknown action: {action}'})
}
except Exception as e:
logger.error(f"Lambda execution failed: {e}", exc_info=True)
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def handle_create_task(event: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle task creation.
Args:
event: Lambda event containing task configuration
Returns:
Response with created task ID
"""
try:
task_config = event.get('task_config', {})
required_fields = ['task_description', 'frequency_minutes', 'runtime_minutes', 'recipient_email']
for field in required_fields:
if field not in task_config:
return {
'statusCode': 400,
'body': json.dumps({'error': f'Missing required field: {field}'})
}
task_manager = TaskManager()
task_id = task_manager.create_task(
task_description=task_config['task_description'],
frequency_minutes=task_config['frequency_minutes'],
runtime_minutes=task_config['runtime_minutes'],
recipient_email=task_config['recipient_email'],
sender_email=task_config.get('sender_email') or os.getenv('DEFAULT_SENDER_EMAIL')
)
logger.info(f"Created task {task_id}")
return {
'statusCode': 200,
'body': json.dumps({
'task_id': task_id,
'message': 'Task created successfully'
})
}
except Exception as e:
logger.error(f"Error creating task: {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def handle_list_tasks() -> Dict[str, Any]:
"""
Handle listing all active tasks.
Returns:
Response with list of active tasks
"""
try:
task_manager = TaskManager()
tasks = task_manager.get_active_tasks()
return {
'statusCode': 200,
'body': json.dumps({
'tasks': tasks,
'count': len(tasks)
})
}
except Exception as e:
logger.error(f"Error listing tasks: {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def handle_execute_search(task_id: int) -> Dict[str, Any]:
"""
Handle search execution for a specific task.
Args:
task_id: ID of the task to execute
Returns:
Response with execution results
"""
task_manager = TaskManager()
execution_id = None
try:
# Get task configuration
task_config = task_manager.get_task(task_id)
if not task_config:
return {
'statusCode': 404,
'body': json.dumps({'error': f'Task {task_id} not found'})
}
# Check if task should run (frequency check)
if not task_manager.should_task_run(task_id):
logger.info(f"Task {task_id} skipped - frequency not met")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Task execution skipped - frequency not met',
'task_id': task_id
})
}
# Create execution record
execution_id = task_manager.create_execution_record(task_id)
# Initialize search engine
search_engine = SearchEngine()
# Get previous search results
previous_results = task_manager.get_previous_search_results(task_id)
logger.info(f"Executing search for task {task_id} with {len(previous_results)} previous results")
# Execute search and comparison
current_items, new_items = search_engine.search_and_compare(
task_config['task_description'],
previous_results
)
# Save current results to database
task_manager.save_search_results(task_id, current_items)
# Send notification if new items found
notification_sent = False
if new_items:
notification_sent = task_manager.send_email_notification(task_config, new_items)
logger.info(f"Found {len(new_items)} new items for task {task_id}, notification sent: {notification_sent}")
# Update execution record
task_manager.update_execution_record(
execution_id=execution_id,
status='completed',
items_found=len(current_items),
new_items_count=len(new_items),
notification_sent=notification_sent
)
return {
'statusCode': 200,
'body': json.dumps({
'task_id': task_id,
'execution_id': execution_id,
'items_found': len(current_items),
'new_items_count': len(new_items),
'notification_sent': notification_sent,
'message': 'Search executed successfully'
})
}
except Exception as e:
logger.error(f"Error executing search for task {task_id}: {e}")
# Update execution record with error
if execution_id:
task_manager.update_execution_record(
execution_id=execution_id,
status='failed',
error_message=str(e)
)
return {
'statusCode': 500,
'body': json.dumps({
'error': str(e),
'task_id': task_id,
'execution_id': execution_id
})
}
# For local testing
if __name__ == "__main__":
# Example events for testing
# Test create task
create_task_event = {
"action": "create_task",
"task_config": {
"task_description": "AI Ethics and Safety openings fit for a PhD in Computer Science",
"frequency_minutes": 60,
"runtime_minutes": 300,
"recipient_email": "user@example.com"
}
}
# Test execute search
execute_search_event = {
"action": "execute_search",
"task_id": 1
}
# Test list tasks
list_tasks_event = {
"action": "list_tasks"
}
print("Testing Lambda function locally...")
# Test create task
print("\n1. Testing create task...")
response = lambda_handler(create_task_event, None)
print(f"Response: {response}")
# Test list tasks
print("\n2. Testing list tasks...")
response = lambda_handler(list_tasks_event, None)
print(f"Response: {response}")
# Test execute search
print("\n3. Testing execute search...")
response = lambda_handler(execute_search_event, None)
print(f"Response: {response}")