Skip to main content
Glama

TimeLooker MCP Server

task_executor.py7.08 kB
""" Lambda function template for executing TimeLooker search tasks. This function reuses the existing TaskManager and SearchEngine from the main codebase. """ import json import os import sys from datetime import datetime from typing import Dict, Any # Add the packaged source code to the path sys.path.insert(0, '/opt/python') sys.path.insert(0, '/var/task') def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: """ AWS Lambda handler for executing a TimeLooker search task. Environment Variables Required: - TASK_ID: ID of the task to execute - DATABASE_SECRET_ARN: ARN of RDS credentials secret - ANTHROPIC_SECRET_ARN: ARN of Anthropic API key secret - EMAIL_TEMPLATES_BUCKET: S3 bucket containing email templates - REGION: AWS region """ try: # Import AWS SDK import boto3 # Import our existing TimeLooker components (packaged with Lambda) from src.core.search_engine_claude import SearchEngine from src.core.task_manager import TaskManager as CoreTaskManager from src.aws.lambda_database_manager import LambdaDatabaseManager from src.aws.ses_email_service import SESEmailService # Get environment variables task_id = int(os.environ['TASK_ID']) database_secret_arn = os.environ['DATABASE_SECRET_ARN'] anthropic_secret_arn = os.environ['ANTHROPIC_SECRET_ARN'] email_templates_bucket = os.environ['EMAIL_TEMPLATES_BUCKET'] # AWS_REGION is automatically available in Lambda runtime region = os.environ.get('AWS_REGION', os.environ.get('AWS_DEFAULT_REGION', 'eu-west-2')) print(f"Executing search task {task_id}") # Initialize AWS clients secrets_client = boto3.client('secretsmanager', region_name=region) s3_client = boto3.client('s3', region_name=region) ses_client = boto3.client('ses', region_name=region) # Get secrets db_secret = secrets_client.get_secret_value(SecretId=database_secret_arn) db_credentials = json.loads(db_secret['SecretString']) anthropic_secret = secrets_client.get_secret_value(SecretId=anthropic_secret_arn) anthropic_credentials = json.loads(anthropic_secret['SecretString']) # Setup database connection for Lambda database_url = f"postgresql://{db_credentials['username']}:{db_credentials['password']}@{db_credentials['host']}:{db_credentials['port']}/{db_credentials['dbname']}" os.environ['DATABASE_URL'] = database_url os.environ['ANTHROPIC_API_KEY'] = anthropic_credentials['api_key'] print(f"Database URL configured: {db_credentials['host']}:{db_credentials['port']}") print(f"Anthropic API key configured: {anthropic_credentials['api_key'][:10]}...") # Initialize existing components using core (direct database) task manager task_manager = CoreTaskManager() search_engine = SearchEngine() email_service = SESEmailService( ses_client=ses_client, s3_client=s3_client, templates_bucket=email_templates_bucket ) # Get task configuration task_config = task_manager.get_task(task_id) if not task_config: return { 'statusCode': 404, 'body': json.dumps({ 'error': f'Task {task_id} not found' }) } print(f"Task description: {task_config['task_description']}") # Check if task should run (frequency check) if not task_manager.should_task_run(task_id): return { 'statusCode': 200, 'body': json.dumps({ 'task_id': task_id, 'status': 'skipped', 'reason': 'Frequency not met' }) } # Create execution record execution_id = task_manager.create_execution_record(task_id) print(f"Created execution record: {execution_id}") try: # Get previous search results previous_results = task_manager.get_previous_search_results(task_id) print(f"Found {len(previous_results)} previous results") # Execute search and comparison using existing search engine current_items, new_items = search_engine.search_and_compare( task_config['task_description'], previous_results ) print(f"Search completed: {len(current_items)} total, {len(new_items)} new") # Save current results to database save_success = task_manager.save_search_results(task_id, current_items) if not save_success: print("Warning: Failed to save search results") # Send notification if new items found notification_sent = False if new_items: notification_sent = email_service.send_notification( task_config=task_config, new_items=new_items ) print(f"Email notification sent: {notification_sent}") # Update execution record task_manager.update_execution_record( execution_id=execution_id, status='completed', items_found=len(current_items), new_items_count=len(new_items), notification_sent=notification_sent ) return { 'statusCode': 200, 'body': json.dumps({ 'task_id': task_id, 'execution_id': execution_id, 'total_items': len(current_items), 'new_items': len(new_items), 'notification_sent': notification_sent, 'status': 'completed' }) } except Exception as e: # Update execution record with error error_message = str(e) print(f"Error during execution: {error_message}") task_manager.update_execution_record( execution_id=execution_id, status='failed', error_message=error_message ) return { 'statusCode': 500, 'body': json.dumps({ 'task_id': task_id, 'execution_id': execution_id, 'status': 'failed', 'error': error_message }) } except Exception as e: print(f"Fatal error in lambda handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'status': 'fatal_error', 'error': str(e) }) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fortnightly-devs/mcp-x402-task-scheduler'

If you have feedback or need assistance with the MCP directory API, please join our Discord server