cloud_task_manager.py•9.38 kB
"""
Cloud-enabled TaskManager that integrates with AWS Lambda deployment.
Extends the base TaskManager with Lambda function management.
"""
import os
from typing import Dict, Any, Optional, List
from ..core.task_manager import TaskManager as CoreTaskManager
from ..api.task_manager_client import TaskManager as HTTPTaskManager
from .lambda_deployer import LambdaDeployer
from ..utils.logging_config import setup_logging
from ..utils.aws_secrets import setup_environment_from_secrets, is_cloud_mode
logger = setup_logging(__name__)
class CloudTaskManager:
"""
Enhanced TaskManager that deploys Lambda functions for each task.
Uses HTTP TaskManager for payment processing, then handles cloud deployment.
"""
def __init__(self, deploy_to_cloud: bool = True):
"""
Initialize CloudTaskManager.
Args:
deploy_to_cloud: Whether to deploy Lambda functions (False for local testing)
"""
# Setup AWS secrets if in cloud mode
if deploy_to_cloud and is_cloud_mode():
logger.info("Setting up environment from AWS Secrets Manager...")
setup_environment_from_secrets()
# Use HTTP client for payment processing and core operations
self.http_client = HTTPTaskManager()
# Use core TaskManager for any direct database operations if needed
self.core_client = CoreTaskManager()
self.deploy_to_cloud = deploy_to_cloud
if self.deploy_to_cloud:
self.lambda_deployer = LambdaDeployer()
logger.info("CloudTaskManager initialized with Lambda deployment enabled")
else:
logger.info("CloudTaskManager initialized in local mode (no Lambda deployment)")
async def create_task(self, task_description: str, frequency_minutes: int, runtime_minutes: int,
recipient_email: str, sender_email: str = None) -> int:
"""
Create a new monitoring task with x402 payment and deploy Lambda function.
Uses HTTP client for payment processing, then handles cloud deployment.
Args:
task_description: Natural language description of what to search for
frequency_minutes: How often to check for updates (in minutes)
runtime_minutes: How long the task should run (in minutes)
recipient_email: Email address to send notifications to
sender_email: Email address to send notifications from
Returns:
Task ID of the created task
Raises:
Exception: If payment validation fails or deployment fails
"""
# Create task via HTTP client - this handles x402 payment automatically
task_id = await self.http_client.create_task(
task_description=task_description,
frequency_minutes=frequency_minutes,
runtime_minutes=runtime_minutes,
recipient_email=recipient_email,
sender_email=sender_email
)
logger.info(f"Task {task_id} created successfully with payment via HTTP API")
# Deploy Lambda function if cloud deployment is enabled
if self.deploy_to_cloud:
try:
task_config = {
'task_description': task_description,
'frequency_minutes': frequency_minutes,
'runtime_minutes': runtime_minutes,
'recipient_email': recipient_email,
'sender_email': sender_email or os.getenv("DEFAULT_SENDER_EMAIL")
}
function_arn = self.lambda_deployer.deploy_task_function(task_id, task_config)
logger.info(f"Deployed Lambda function for task {task_id}: {function_arn}")
# Schedule cleanup after runtime expires
self.lambda_deployer.schedule_cleanup(task_id, runtime_minutes)
except Exception as e:
logger.error(f"Failed to deploy Lambda function for task {task_id}: {e}")
# Task was already created and paid for, so we shouldn't delete it
# Let user know deployment failed but task exists
raise Exception(f"Task {task_id} created successfully but Lambda deployment failed: {e}")
return task_id
async def deactivate_task(self, task_id: int) -> bool:
"""
Deactivate a task and clean up Lambda function.
Args:
task_id: ID of the task to deactivate
Returns:
True if successful, False otherwise
"""
# Deactivate task via HTTP client
success = await self.http_client.deactivate_task(task_id)
if success and self.deploy_to_cloud:
try:
# Delete Lambda function and schedule
cleanup_success = self.lambda_deployer.delete_task_function(task_id)
if cleanup_success:
logger.info(f"Cleaned up Lambda function for task {task_id}")
else:
logger.warning(f"Task {task_id} deactivated but Lambda cleanup failed")
except Exception as e:
logger.error(f"Error cleaning up Lambda function for task {task_id}: {e}")
# Task is still deactivated in database, but Lambda cleanup failed
return success
# Delegate methods to HTTP client to maintain interface compatibility
async def get_task(self, task_id: int) -> Optional[Dict[str, Any]]:
"""Get task via HTTP client."""
return await self.http_client.get_task(task_id)
async def should_task_run(self, task_id: int) -> bool:
"""Check if task should run via HTTP client."""
return await self.http_client.should_task_run(task_id)
async def create_execution_record(self, task_id: int) -> int:
"""Create execution record via HTTP client."""
return await self.http_client.create_execution_record(task_id)
async def get_previous_search_results(self, task_id: int, limit: int = 100) -> List[Dict[str, Any]]:
"""Get previous search results via HTTP client."""
return await self.http_client.get_previous_search_results(task_id, limit)
async def save_search_results(self, task_id: int, search_results: List[Dict[str, Any]]) -> bool:
"""Save search results via HTTP client."""
return await self.http_client.save_search_results(task_id, search_results)
async def send_email_notification(self, task_config: Dict[str, Any], new_items: List[Dict[str, Any]]) -> bool:
"""Send email notification via HTTP client."""
return await self.http_client.send_email_notification(task_config, new_items)
async def update_execution_record(self, execution_id: int, status: str, items_found: int = 0,
new_items_count: int = 0, notification_sent: bool = False,
error_message: str = None) -> bool:
"""Update execution record via HTTP client."""
return await self.http_client.update_execution_record(execution_id, status, items_found, new_items_count, notification_sent, error_message)
async def get_active_tasks(self) -> List[Dict[str, Any]]:
"""Get active tasks via HTTP client."""
return await self.http_client.get_active_tasks()
async def get_task_cloud_status(self, task_id: int) -> Dict[str, Any]:
"""
Get cloud deployment status for a task.
Args:
task_id: ID of the task
Returns:
Dictionary with cloud deployment status
"""
if not self.deploy_to_cloud:
return {
'cloud_enabled': False,
'lambda_function': None,
'schedule': None
}
function_name = f"timelooker-task-{task_id}"
schedule_name = f"timelooker-schedule-{task_id}"
status = {
'cloud_enabled': True,
'lambda_function': None,
'schedule': None,
'errors': []
}
try:
# Check Lambda function status
response = self.lambda_deployer.lambda_client.get_function(FunctionName=function_name)
status['lambda_function'] = {
'arn': response['Configuration']['FunctionArn'],
'state': response['Configuration']['State'],
'last_modified': response['Configuration']['LastModified']
}
except Exception as e:
status['errors'].append(f"Lambda function not found or error: {e}")
try:
# Check EventBridge schedule status
response = self.lambda_deployer.scheduler_client.get_schedule(Name=schedule_name)
status['schedule'] = {
'name': response['Name'],
'state': response['State'],
'schedule_expression': response['ScheduleExpression']
}
except Exception as e:
status['errors'].append(f"EventBridge schedule not found or error: {e}")
return status