Skip to main content
Glama

TimeLooker MCP Server

task_manager.py14.4 kB
""" TaskManager - Database operations for TimeLooker Lambda function. """ import os import smtplib from datetime import datetime, timedelta from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from typing import List, Dict, Any, Optional from sqlalchemy.orm import Session from sqlalchemy import desc from .models import Task, SearchResult, Execution, get_db_session, DatabaseSession from ..utils.logging_config import get_logger class TaskManager: """ Manages task configuration, search results, and database operations. All methods work with database sessions and are stateless. """ def __init__(self): """Initialize TaskManager.""" self.logger = get_logger(__name__) def create_task(self, task_description: str, frequency_minutes: int, runtime_minutes: int, recipient_email: str, sender_email: str = None) -> int: """ Create a new monitoring task. Args: task_description: Natural language description of what to search for frequency_minutes: How often to check for updates (in minutes) runtime_minutes: How long the task should run (in minutes) recipient_email: Email address to send notifications to sender_email: Email address to send notifications from Returns: Task ID of the created task """ with DatabaseSession() as session: task = Task( task_description=task_description, frequency_minutes=frequency_minutes, runtime_minutes=runtime_minutes, recipient_email=recipient_email, sender_email=sender_email or os.getenv("DEFAULT_SENDER_EMAIL") ) session.add(task) session.commit() task_id = task.id self.logger.info(f"Created new task with ID: {task_id}") return task_id def get_task(self, task_id: int) -> Optional[Dict[str, Any]]: """ Get task configuration by ID. Args: task_id: ID of the task to retrieve Returns: Task configuration as dictionary, or None if not found """ with DatabaseSession() as session: task = session.query(Task).filter(Task.id == task_id, Task.is_active == True).first() if not task: return None return { 'id': task.id, 'task_description': task.task_description, 'frequency_minutes': task.frequency_minutes, 'runtime_minutes': task.runtime_minutes, 'recipient_email': task.recipient_email, 'sender_email': task.sender_email, 'created_at': task.created_at.isoformat() if task.created_at else None, 'updated_at': task.updated_at.isoformat() if task.updated_at else None } def get_previous_search_results(self, task_id: int, limit: int = 100) -> List[Dict[str, Any]]: """ Get previous search results for a task. Args: task_id: ID of the task limit: Maximum number of results to return Returns: List of previous search results """ session = get_db_session() try: results = session.query(SearchResult)\ .filter(SearchResult.task_id == task_id)\ .order_by(desc(SearchResult.found_at))\ .limit(limit)\ .all() return [{ 'name': result.name, 'description': result.description, 'url': result.url, 'source': result.source, 'location': result.location, 'additional_info': result.additional_info, 'hash': result.content_hash, 'timestamp': result.found_at.isoformat() } for result in results] except Exception as e: self.logger.error(f"Error getting previous results for task {task_id}: {e}") return [] finally: session.close() def save_search_results(self, task_id: int, search_results: List[Dict[str, Any]]) -> bool: """ Save current search results to database. Args: task_id: ID of the task search_results: List of search results to save Returns: True if successful, False otherwise """ session = get_db_session() try: # Clear previous results to avoid accumulating too much data session.query(SearchResult).filter(SearchResult.task_id == task_id).delete() # Save new results for result in search_results: search_result = SearchResult( task_id=task_id, name=result.get('name', ''), description=result.get('description', ''), url=result.get('url', ''), source=result.get('source', ''), location=result.get('location', ''), additional_info=result.get('additional_info', ''), content_hash=result.get('hash', ''), raw_data=result # Store complete original data ) session.add(search_result) session.commit() self.logger.info(f"Saved {len(search_results)} search results for task {task_id}") return True except Exception as e: session.rollback() self.logger.error(f"Error saving search results for task {task_id}: {e}") return False finally: session.close() def create_execution_record(self, task_id: int) -> int: """ Create a new execution record. Args: task_id: ID of the task being executed Returns: Execution ID """ session = get_db_session() try: execution = Execution( task_id=task_id, status='running' ) session.add(execution) session.commit() execution_id = execution.id self.logger.info(f"Created execution record {execution_id} for task {task_id}") return execution_id except Exception as e: session.rollback() self.logger.error(f"Error creating execution record for task {task_id}: {e}") raise finally: session.close() def update_execution_record(self, execution_id: int, status: str, items_found: int = 0, new_items_count: int = 0, notification_sent: bool = False, error_message: str = None) -> bool: """ Update an execution record. Args: execution_id: ID of the execution record status: New status (completed, failed, timeout) items_found: Total number of items found new_items_count: Number of new items found notification_sent: Whether notification was sent error_message: Error message if any Returns: True if successful, False otherwise """ session = get_db_session() try: execution = session.query(Execution).filter(Execution.id == execution_id).first() if not execution: self.logger.error(f"Execution record {execution_id} not found") return False execution.status = status execution.items_found = items_found execution.new_items_count = new_items_count execution.notification_sent = notification_sent execution.completed_at = datetime.now() if error_message: execution.error_message = error_message session.commit() return True except Exception as e: session.rollback() self.logger.error(f"Error updating execution record {execution_id}: {e}") return False finally: session.close() def send_email_notification(self, task_config: Dict[str, Any], new_items: List[Dict[str, Any]]) -> bool: """ Send email notification about new items found. Args: task_config: Task configuration dictionary new_items: List of new items to include in email Returns: True if email was sent successfully, False otherwise """ try: msg = MIMEMultipart() msg['From'] = task_config['sender_email'] msg['To'] = task_config['recipient_email'] msg['Subject'] = f"New items found for task: {task_config['task_description'][:50]}..." body = f""" New items have been detected for your monitoring task: Task: {task_config['task_description']} Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Number of new items: {len(new_items)} New Items Found: {'='*50} """ for i, item in enumerate(new_items, 1): body += f"{i}. {item.get('name', 'Unknown Item')}\n" body += f" Source: {item.get('source', 'Unknown')}\n" body += f" Location: {item.get('location', 'Not specified')}\n" body += f" Description: {item.get('description', 'No description')}\n" body += f" URL: {item.get('url', 'No URL provided')}\n" body += f" Additional Info: {item.get('additional_info', 'None')}\n\n" body += f""" {'='*50} This is an automated message from TimeLooker. """ msg.attach(MIMEText(body, 'plain')) # Note: In a real implementation, you would need to set up SMTP credentials # For now, this is a placeholder that logs the email content self.logger.info(f"Email notification prepared for {task_config['recipient_email']}") self.logger.info(f"Email would contain {len(new_items)} new items") # Uncomment and configure for actual email sending: # server = smtplib.SMTP('smtp.gmail.com', 587) # server.starttls() # server.login(task_config['sender_email'], "your_app_password") # server.send_message(msg) # server.quit() return True except Exception as e: self.logger.error(f"Error sending email notification: {e}") return False def should_task_run(self, task_id: int) -> bool: """ Check if a task should run based on its frequency and last execution. Args: task_id: ID of the task to check Returns: True if task should run, False otherwise """ session = get_db_session() try: task = session.query(Task).filter(Task.id == task_id, Task.is_active == True).first() if not task: return False # Get the last execution last_execution = session.query(Execution)\ .filter(Execution.task_id == task_id)\ .order_by(desc(Execution.started_at))\ .first() if not last_execution: # No previous execution, should run return True # Check if enough time has passed since last execution time_since_last = datetime.now() - last_execution.started_at frequency_delta = timedelta(minutes=task.frequency_minutes) return time_since_last >= frequency_delta except Exception as e: self.logger.error(f"Error checking if task {task_id} should run: {e}") return False finally: session.close() def deactivate_task(self, task_id: int) -> bool: """ Deactivate a task (soft delete). Args: task_id: ID of the task to deactivate Returns: True if successful, False otherwise """ session = get_db_session() try: task = session.query(Task).filter(Task.id == task_id).first() if not task: return False task.is_active = False task.updated_at = datetime.now() session.commit() self.logger.info(f"Deactivated task {task_id}") return True except Exception as e: session.rollback() self.logger.error(f"Error deactivating task {task_id}: {e}") return False finally: session.close() def get_active_tasks(self) -> List[Dict[str, Any]]: """ Get all active tasks. Returns: List of active task configurations """ session = get_db_session() try: tasks = session.query(Task).filter(Task.is_active == True).all() return [{ 'id': task.id, 'task_description': task.task_description, 'frequency_minutes': task.frequency_minutes, 'runtime_minutes': task.runtime_minutes, 'recipient_email': task.recipient_email, 'sender_email': task.sender_email, 'created_at': task.created_at.isoformat() if task.created_at else None, 'updated_at': task.updated_at.isoformat() if task.updated_at else None } for task in tasks] except Exception as e: self.logger.error(f"Error getting active tasks: {e}") return [] finally: session.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fortnightly-devs/mcp-x402-task-scheduler'

If you have feedback or need assistance with the MCP directory API, please join our Discord server