task_manager.py•14.4 kB
"""
TaskManager - Database operations for TimeLooker Lambda function.
"""
import os
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy import desc
from .models import Task, SearchResult, Execution, get_db_session, DatabaseSession
from ..utils.logging_config import get_logger
class TaskManager:
"""
Manages task configuration, search results, and database operations.
All methods work with database sessions and are stateless.
"""
def __init__(self):
"""Initialize TaskManager."""
self.logger = get_logger(__name__)
def create_task(self, task_description: str, frequency_minutes: int, runtime_minutes: int,
recipient_email: str, sender_email: str = None) -> int:
"""
Create a new monitoring task.
Args:
task_description: Natural language description of what to search for
frequency_minutes: How often to check for updates (in minutes)
runtime_minutes: How long the task should run (in minutes)
recipient_email: Email address to send notifications to
sender_email: Email address to send notifications from
Returns:
Task ID of the created task
"""
with DatabaseSession() as session:
task = Task(
task_description=task_description,
frequency_minutes=frequency_minutes,
runtime_minutes=runtime_minutes,
recipient_email=recipient_email,
sender_email=sender_email or os.getenv("DEFAULT_SENDER_EMAIL")
)
session.add(task)
session.commit()
task_id = task.id
self.logger.info(f"Created new task with ID: {task_id}")
return task_id
def get_task(self, task_id: int) -> Optional[Dict[str, Any]]:
"""
Get task configuration by ID.
Args:
task_id: ID of the task to retrieve
Returns:
Task configuration as dictionary, or None if not found
"""
with DatabaseSession() as session:
task = session.query(Task).filter(Task.id == task_id, Task.is_active == True).first()
if not task:
return None
return {
'id': task.id,
'task_description': task.task_description,
'frequency_minutes': task.frequency_minutes,
'runtime_minutes': task.runtime_minutes,
'recipient_email': task.recipient_email,
'sender_email': task.sender_email,
'created_at': task.created_at.isoformat() if task.created_at else None,
'updated_at': task.updated_at.isoformat() if task.updated_at else None
}
def get_previous_search_results(self, task_id: int, limit: int = 100) -> List[Dict[str, Any]]:
"""
Get previous search results for a task.
Args:
task_id: ID of the task
limit: Maximum number of results to return
Returns:
List of previous search results
"""
session = get_db_session()
try:
results = session.query(SearchResult)\
.filter(SearchResult.task_id == task_id)\
.order_by(desc(SearchResult.found_at))\
.limit(limit)\
.all()
return [{
'name': result.name,
'description': result.description,
'url': result.url,
'source': result.source,
'location': result.location,
'additional_info': result.additional_info,
'hash': result.content_hash,
'timestamp': result.found_at.isoformat()
} for result in results]
except Exception as e:
self.logger.error(f"Error getting previous results for task {task_id}: {e}")
return []
finally:
session.close()
def save_search_results(self, task_id: int, search_results: List[Dict[str, Any]]) -> bool:
"""
Save current search results to database.
Args:
task_id: ID of the task
search_results: List of search results to save
Returns:
True if successful, False otherwise
"""
session = get_db_session()
try:
# Clear previous results to avoid accumulating too much data
session.query(SearchResult).filter(SearchResult.task_id == task_id).delete()
# Save new results
for result in search_results:
search_result = SearchResult(
task_id=task_id,
name=result.get('name', ''),
description=result.get('description', ''),
url=result.get('url', ''),
source=result.get('source', ''),
location=result.get('location', ''),
additional_info=result.get('additional_info', ''),
content_hash=result.get('hash', ''),
raw_data=result # Store complete original data
)
session.add(search_result)
session.commit()
self.logger.info(f"Saved {len(search_results)} search results for task {task_id}")
return True
except Exception as e:
session.rollback()
self.logger.error(f"Error saving search results for task {task_id}: {e}")
return False
finally:
session.close()
def create_execution_record(self, task_id: int) -> int:
"""
Create a new execution record.
Args:
task_id: ID of the task being executed
Returns:
Execution ID
"""
session = get_db_session()
try:
execution = Execution(
task_id=task_id,
status='running'
)
session.add(execution)
session.commit()
execution_id = execution.id
self.logger.info(f"Created execution record {execution_id} for task {task_id}")
return execution_id
except Exception as e:
session.rollback()
self.logger.error(f"Error creating execution record for task {task_id}: {e}")
raise
finally:
session.close()
def update_execution_record(self, execution_id: int, status: str, items_found: int = 0,
new_items_count: int = 0, notification_sent: bool = False,
error_message: str = None) -> bool:
"""
Update an execution record.
Args:
execution_id: ID of the execution record
status: New status (completed, failed, timeout)
items_found: Total number of items found
new_items_count: Number of new items found
notification_sent: Whether notification was sent
error_message: Error message if any
Returns:
True if successful, False otherwise
"""
session = get_db_session()
try:
execution = session.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
self.logger.error(f"Execution record {execution_id} not found")
return False
execution.status = status
execution.items_found = items_found
execution.new_items_count = new_items_count
execution.notification_sent = notification_sent
execution.completed_at = datetime.now()
if error_message:
execution.error_message = error_message
session.commit()
return True
except Exception as e:
session.rollback()
self.logger.error(f"Error updating execution record {execution_id}: {e}")
return False
finally:
session.close()
def send_email_notification(self, task_config: Dict[str, Any], new_items: List[Dict[str, Any]]) -> bool:
"""
Send email notification about new items found.
Args:
task_config: Task configuration dictionary
new_items: List of new items to include in email
Returns:
True if email was sent successfully, False otherwise
"""
try:
msg = MIMEMultipart()
msg['From'] = task_config['sender_email']
msg['To'] = task_config['recipient_email']
msg['Subject'] = f"New items found for task: {task_config['task_description'][:50]}..."
body = f"""
New items have been detected for your monitoring task:
Task: {task_config['task_description']}
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Number of new items: {len(new_items)}
New Items Found:
{'='*50}
"""
for i, item in enumerate(new_items, 1):
body += f"{i}. {item.get('name', 'Unknown Item')}\n"
body += f" Source: {item.get('source', 'Unknown')}\n"
body += f" Location: {item.get('location', 'Not specified')}\n"
body += f" Description: {item.get('description', 'No description')}\n"
body += f" URL: {item.get('url', 'No URL provided')}\n"
body += f" Additional Info: {item.get('additional_info', 'None')}\n\n"
body += f"""
{'='*50}
This is an automated message from TimeLooker.
"""
msg.attach(MIMEText(body, 'plain'))
# Note: In a real implementation, you would need to set up SMTP credentials
# For now, this is a placeholder that logs the email content
self.logger.info(f"Email notification prepared for {task_config['recipient_email']}")
self.logger.info(f"Email would contain {len(new_items)} new items")
# Uncomment and configure for actual email sending:
# server = smtplib.SMTP('smtp.gmail.com', 587)
# server.starttls()
# server.login(task_config['sender_email'], "your_app_password")
# server.send_message(msg)
# server.quit()
return True
except Exception as e:
self.logger.error(f"Error sending email notification: {e}")
return False
def should_task_run(self, task_id: int) -> bool:
"""
Check if a task should run based on its frequency and last execution.
Args:
task_id: ID of the task to check
Returns:
True if task should run, False otherwise
"""
session = get_db_session()
try:
task = session.query(Task).filter(Task.id == task_id, Task.is_active == True).first()
if not task:
return False
# Get the last execution
last_execution = session.query(Execution)\
.filter(Execution.task_id == task_id)\
.order_by(desc(Execution.started_at))\
.first()
if not last_execution:
# No previous execution, should run
return True
# Check if enough time has passed since last execution
time_since_last = datetime.now() - last_execution.started_at
frequency_delta = timedelta(minutes=task.frequency_minutes)
return time_since_last >= frequency_delta
except Exception as e:
self.logger.error(f"Error checking if task {task_id} should run: {e}")
return False
finally:
session.close()
def deactivate_task(self, task_id: int) -> bool:
"""
Deactivate a task (soft delete).
Args:
task_id: ID of the task to deactivate
Returns:
True if successful, False otherwise
"""
session = get_db_session()
try:
task = session.query(Task).filter(Task.id == task_id).first()
if not task:
return False
task.is_active = False
task.updated_at = datetime.now()
session.commit()
self.logger.info(f"Deactivated task {task_id}")
return True
except Exception as e:
session.rollback()
self.logger.error(f"Error deactivating task {task_id}: {e}")
return False
finally:
session.close()
def get_active_tasks(self) -> List[Dict[str, Any]]:
"""
Get all active tasks.
Returns:
List of active task configurations
"""
session = get_db_session()
try:
tasks = session.query(Task).filter(Task.is_active == True).all()
return [{
'id': task.id,
'task_description': task.task_description,
'frequency_minutes': task.frequency_minutes,
'runtime_minutes': task.runtime_minutes,
'recipient_email': task.recipient_email,
'sender_email': task.sender_email,
'created_at': task.created_at.isoformat() if task.created_at else None,
'updated_at': task.updated_at.isoformat() if task.updated_at else None
} for task in tasks]
except Exception as e:
self.logger.error(f"Error getting active tasks: {e}")
return []
finally:
session.close()