Skip to main content
Glama

TimeLooker MCP Server

lambda_function.py9.07 kB
""" AWS Lambda function for TimeLooker monitoring system. """ import json import os from typing import Dict, Any import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.core.search_engine_claude import SearchEngine from src.api.task_manager_client import TaskManager from src.core.models import db_manager from src.utils.logging_config import setup_logging # Configure logging logger = setup_logging(__name__) def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: """ AWS Lambda handler function for TimeLooker monitoring. Expected event structure: { "task_id": 123, "action": "execute_search" | "create_task" | "list_tasks", "task_config": { # Only for create_task action "task_description": "...", "frequency_minutes": 30, "runtime_minutes": 120, "recipient_email": "user@example.com" } } Args: event: Lambda event containing task information context: Lambda context object Returns: Dictionary with execution results """ try: # Initialize database tables if needed db_manager.create_tables() # Parse event action = event.get('action', 'execute_search') task_id = event.get('task_id') logger.info(f"Lambda invoked with action: {action}, task_id: {task_id}") if action == 'create_task': return handle_create_task(event) elif action == 'list_tasks': return handle_list_tasks() elif action == 'execute_search': if not task_id: return { 'statusCode': 400, 'body': json.dumps({'error': 'task_id is required for execute_search action'}) } return handle_execute_search(task_id) else: return { 'statusCode': 400, 'body': json.dumps({'error': f'Unknown action: {action}'}) } except Exception as e: logger.error(f"Lambda execution failed: {e}", exc_info=True) return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } def handle_create_task(event: Dict[str, Any]) -> Dict[str, Any]: """ Handle task creation. Args: event: Lambda event containing task configuration Returns: Response with created task ID """ try: task_config = event.get('task_config', {}) required_fields = ['task_description', 'frequency_minutes', 'runtime_minutes', 'recipient_email'] for field in required_fields: if field not in task_config: return { 'statusCode': 400, 'body': json.dumps({'error': f'Missing required field: {field}'}) } task_manager = TaskManager() task_id = task_manager.create_task( task_description=task_config['task_description'], frequency_minutes=task_config['frequency_minutes'], runtime_minutes=task_config['runtime_minutes'], recipient_email=task_config['recipient_email'], sender_email=task_config.get('sender_email') or os.getenv('DEFAULT_SENDER_EMAIL') ) logger.info(f"Created task {task_id}") return { 'statusCode': 200, 'body': json.dumps({ 'task_id': task_id, 'message': 'Task created successfully' }) } except Exception as e: logger.error(f"Error creating task: {e}") return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } def handle_list_tasks() -> Dict[str, Any]: """ Handle listing all active tasks. Returns: Response with list of active tasks """ try: task_manager = TaskManager() tasks = task_manager.get_active_tasks() return { 'statusCode': 200, 'body': json.dumps({ 'tasks': tasks, 'count': len(tasks) }) } except Exception as e: logger.error(f"Error listing tasks: {e}") return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } def handle_execute_search(task_id: int) -> Dict[str, Any]: """ Handle search execution for a specific task. Args: task_id: ID of the task to execute Returns: Response with execution results """ task_manager = TaskManager() execution_id = None try: # Get task configuration task_config = task_manager.get_task(task_id) if not task_config: return { 'statusCode': 404, 'body': json.dumps({'error': f'Task {task_id} not found'}) } # Check if task should run (frequency check) if not task_manager.should_task_run(task_id): logger.info(f"Task {task_id} skipped - frequency not met") return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Task execution skipped - frequency not met', 'task_id': task_id }) } # Create execution record execution_id = task_manager.create_execution_record(task_id) # Initialize search engine search_engine = SearchEngine() # Get previous search results previous_results = task_manager.get_previous_search_results(task_id) logger.info(f"Executing search for task {task_id} with {len(previous_results)} previous results") # Execute search and comparison current_items, new_items = search_engine.search_and_compare( task_config['task_description'], previous_results ) # Save current results to database task_manager.save_search_results(task_id, current_items) # Send notification if new items found notification_sent = False if new_items: notification_sent = task_manager.send_email_notification(task_config, new_items) logger.info(f"Found {len(new_items)} new items for task {task_id}, notification sent: {notification_sent}") # Update execution record task_manager.update_execution_record( execution_id=execution_id, status='completed', items_found=len(current_items), new_items_count=len(new_items), notification_sent=notification_sent ) return { 'statusCode': 200, 'body': json.dumps({ 'task_id': task_id, 'execution_id': execution_id, 'items_found': len(current_items), 'new_items_count': len(new_items), 'notification_sent': notification_sent, 'message': 'Search executed successfully' }) } except Exception as e: logger.error(f"Error executing search for task {task_id}: {e}") # Update execution record with error if execution_id: task_manager.update_execution_record( execution_id=execution_id, status='failed', error_message=str(e) ) return { 'statusCode': 500, 'body': json.dumps({ 'error': str(e), 'task_id': task_id, 'execution_id': execution_id }) } # For local testing if __name__ == "__main__": # Example events for testing # Test create task create_task_event = { "action": "create_task", "task_config": { "task_description": "AI Ethics and Safety openings fit for a PhD in Computer Science", "frequency_minutes": 60, "runtime_minutes": 300, "recipient_email": "user@example.com" } } # Test execute search execute_search_event = { "action": "execute_search", "task_id": 1 } # Test list tasks list_tasks_event = { "action": "list_tasks" } print("Testing Lambda function locally...") # Test create task print("\n1. Testing create task...") response = lambda_handler(create_task_event, None) print(f"Response: {response}") # Test list tasks print("\n2. Testing list tasks...") response = lambda_handler(list_tasks_event, None) print(f"Response: {response}") # Test execute search print("\n3. Testing execute search...") response = lambda_handler(execute_search_event, None) print(f"Response: {response}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fortnightly-devs/mcp-x402-task-scheduler'

If you have feedback or need assistance with the MCP directory API, please join our Discord server