Skip to main content
Glama

TimeLooker MCP Server

cloud_task_manager.py9.38 kB
""" Cloud-enabled TaskManager that integrates with AWS Lambda deployment. Extends the base TaskManager with Lambda function management. """ import os from typing import Dict, Any, Optional, List from ..core.task_manager import TaskManager as CoreTaskManager from ..api.task_manager_client import TaskManager as HTTPTaskManager from .lambda_deployer import LambdaDeployer from ..utils.logging_config import setup_logging from ..utils.aws_secrets import setup_environment_from_secrets, is_cloud_mode logger = setup_logging(__name__) class CloudTaskManager: """ Enhanced TaskManager that deploys Lambda functions for each task. Uses HTTP TaskManager for payment processing, then handles cloud deployment. """ def __init__(self, deploy_to_cloud: bool = True): """ Initialize CloudTaskManager. Args: deploy_to_cloud: Whether to deploy Lambda functions (False for local testing) """ # Setup AWS secrets if in cloud mode if deploy_to_cloud and is_cloud_mode(): logger.info("Setting up environment from AWS Secrets Manager...") setup_environment_from_secrets() # Use HTTP client for payment processing and core operations self.http_client = HTTPTaskManager() # Use core TaskManager for any direct database operations if needed self.core_client = CoreTaskManager() self.deploy_to_cloud = deploy_to_cloud if self.deploy_to_cloud: self.lambda_deployer = LambdaDeployer() logger.info("CloudTaskManager initialized with Lambda deployment enabled") else: logger.info("CloudTaskManager initialized in local mode (no Lambda deployment)") async def create_task(self, task_description: str, frequency_minutes: int, runtime_minutes: int, recipient_email: str, sender_email: str = None) -> int: """ Create a new monitoring task with x402 payment and deploy Lambda function. Uses HTTP client for payment processing, then handles cloud deployment. Args: task_description: Natural language description of what to search for frequency_minutes: How often to check for updates (in minutes) runtime_minutes: How long the task should run (in minutes) recipient_email: Email address to send notifications to sender_email: Email address to send notifications from Returns: Task ID of the created task Raises: Exception: If payment validation fails or deployment fails """ # Create task via HTTP client - this handles x402 payment automatically task_id = await self.http_client.create_task( task_description=task_description, frequency_minutes=frequency_minutes, runtime_minutes=runtime_minutes, recipient_email=recipient_email, sender_email=sender_email ) logger.info(f"Task {task_id} created successfully with payment via HTTP API") # Deploy Lambda function if cloud deployment is enabled if self.deploy_to_cloud: try: task_config = { 'task_description': task_description, 'frequency_minutes': frequency_minutes, 'runtime_minutes': runtime_minutes, 'recipient_email': recipient_email, 'sender_email': sender_email or os.getenv("DEFAULT_SENDER_EMAIL") } function_arn = self.lambda_deployer.deploy_task_function(task_id, task_config) logger.info(f"Deployed Lambda function for task {task_id}: {function_arn}") # Schedule cleanup after runtime expires self.lambda_deployer.schedule_cleanup(task_id, runtime_minutes) except Exception as e: logger.error(f"Failed to deploy Lambda function for task {task_id}: {e}") # Task was already created and paid for, so we shouldn't delete it # Let user know deployment failed but task exists raise Exception(f"Task {task_id} created successfully but Lambda deployment failed: {e}") return task_id async def deactivate_task(self, task_id: int) -> bool: """ Deactivate a task and clean up Lambda function. Args: task_id: ID of the task to deactivate Returns: True if successful, False otherwise """ # Deactivate task via HTTP client success = await self.http_client.deactivate_task(task_id) if success and self.deploy_to_cloud: try: # Delete Lambda function and schedule cleanup_success = self.lambda_deployer.delete_task_function(task_id) if cleanup_success: logger.info(f"Cleaned up Lambda function for task {task_id}") else: logger.warning(f"Task {task_id} deactivated but Lambda cleanup failed") except Exception as e: logger.error(f"Error cleaning up Lambda function for task {task_id}: {e}") # Task is still deactivated in database, but Lambda cleanup failed return success # Delegate methods to HTTP client to maintain interface compatibility async def get_task(self, task_id: int) -> Optional[Dict[str, Any]]: """Get task via HTTP client.""" return await self.http_client.get_task(task_id) async def should_task_run(self, task_id: int) -> bool: """Check if task should run via HTTP client.""" return await self.http_client.should_task_run(task_id) async def create_execution_record(self, task_id: int) -> int: """Create execution record via HTTP client.""" return await self.http_client.create_execution_record(task_id) async def get_previous_search_results(self, task_id: int, limit: int = 100) -> List[Dict[str, Any]]: """Get previous search results via HTTP client.""" return await self.http_client.get_previous_search_results(task_id, limit) async def save_search_results(self, task_id: int, search_results: List[Dict[str, Any]]) -> bool: """Save search results via HTTP client.""" return await self.http_client.save_search_results(task_id, search_results) async def send_email_notification(self, task_config: Dict[str, Any], new_items: List[Dict[str, Any]]) -> bool: """Send email notification via HTTP client.""" return await self.http_client.send_email_notification(task_config, new_items) async def update_execution_record(self, execution_id: int, status: str, items_found: int = 0, new_items_count: int = 0, notification_sent: bool = False, error_message: str = None) -> bool: """Update execution record via HTTP client.""" return await self.http_client.update_execution_record(execution_id, status, items_found, new_items_count, notification_sent, error_message) async def get_active_tasks(self) -> List[Dict[str, Any]]: """Get active tasks via HTTP client.""" return await self.http_client.get_active_tasks() async def get_task_cloud_status(self, task_id: int) -> Dict[str, Any]: """ Get cloud deployment status for a task. Args: task_id: ID of the task Returns: Dictionary with cloud deployment status """ if not self.deploy_to_cloud: return { 'cloud_enabled': False, 'lambda_function': None, 'schedule': None } function_name = f"timelooker-task-{task_id}" schedule_name = f"timelooker-schedule-{task_id}" status = { 'cloud_enabled': True, 'lambda_function': None, 'schedule': None, 'errors': [] } try: # Check Lambda function status response = self.lambda_deployer.lambda_client.get_function(FunctionName=function_name) status['lambda_function'] = { 'arn': response['Configuration']['FunctionArn'], 'state': response['Configuration']['State'], 'last_modified': response['Configuration']['LastModified'] } except Exception as e: status['errors'].append(f"Lambda function not found or error: {e}") try: # Check EventBridge schedule status response = self.lambda_deployer.scheduler_client.get_schedule(Name=schedule_name) status['schedule'] = { 'name': response['Name'], 'state': response['State'], 'schedule_expression': response['ScheduleExpression'] } except Exception as e: status['errors'].append(f"EventBridge schedule not found or error: {e}") return status

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fortnightly-devs/mcp-x402-task-scheduler'

If you have feedback or need assistance with the MCP directory API, please join our Discord server