Skip to main content
Glama

Simple MCP

by karar-hayder
scheduling_tools.py15.1 kB
""" Scheduling tools for the MCP server. """ import logging from datetime import datetime from typing import Any, Dict, List, Optional from persistence import ( load_job_execution_log, remove_job_from_persistence, save_job_to_persistence, save_recurring_job_to_persistence, ) from scheduler import parse_schedule_time, scheduler from . import mcp @mcp.tool() async def schedule_telegram_message( text: str, schedule_time: str, chat_id: Optional[str] = None, job_id: Optional[str] = None, ) -> str: """ Schedule a Telegram message to be sent at a specific time. Args: text: The message text to send. schedule_time: When to send the message. Can be: - ISO format datetime string (e.g., "2024-01-15T14:30:00") - Relative time like "in 5 minutes", "in 1 hour", "in 2 days" - Specific time like "tomorrow at 9:00", "next monday at 10:30" chat_id: The Telegram chat ID to send to. If not provided, uses ADMIN_ID from env. job_id: Optional custom job ID. If not provided, will be auto-generated. Returns: Success message with job ID or error message. """ import asyncio from . import telegram_tools logging.info( "schedule_telegram_message called with text=%r, schedule_time=%r", text, schedule_time, ) def sync_send_telegram_message(text, chat_id): # Run the async function in a new event loop (safe for scheduler) try: asyncio.run( telegram_tools.send_telegram_message(text=text, chat_id=chat_id) ) except RuntimeError: # If already in an event loop (shouldn't happen in scheduler), fallback loop = asyncio.get_event_loop() loop.run_until_complete( telegram_tools.send_telegram_message(text=text, chat_id=chat_id) ) try: # Parse schedule time run_time, error_msg = parse_schedule_time(schedule_time) if error_msg: return error_msg # Generate job ID if not provided if not job_id: job_id = f"telegram_msg_{int(datetime.now().timestamp())}" # Schedule the job using the sync wrapper scheduler.add_job( sync_send_telegram_message, "date", run_date=run_time, args=[text, chat_id], id=job_id, replace_existing=True, ) # Save job to persistent storage save_job_to_persistence( job_id, "send_telegram_message", run_time, [text, chat_id], {} ) # Start scheduler if not already running if not scheduler.running: scheduler.start() logging.info( "Scheduled Telegram message with job ID: %s for %s", job_id, run_time ) return f"Scheduled Telegram message with job ID: {job_id} for {run_time}" except Exception as e: logging.error("Error scheduling Telegram message: %s", e) return f"Error scheduling Telegram message: {e}" @mcp.tool() async def schedule_function_call( function_name: str, schedule_time: str, args: Optional[List[Any]] = None, kwargs: Optional[Dict[str, Any]] = None, job_id: Optional[str] = None, ) -> str: """ Schedule any registered function to be called at a specific time. Args: function_name: Name of the function to call (must be in function registry). schedule_time: When to call the function. Can be: - ISO format datetime string (e.g., "2024-01-15T14:30:00") - Relative time like "in 5 minutes", "in 1 hour", "in 2 days" args: List of positional arguments to pass to the function. kwargs: Dictionary of keyword arguments to pass to the function. job_id: Optional custom job ID. If not provided, will be auto-generated. Returns: Success message with job ID or error message. """ from . import function_registry logging.info( "schedule_function_call called with function=%r, schedule_time=%r", function_name, schedule_time, ) try: # Check if function exists in registry if function_name not in function_registry: available_functions = ", ".join(function_registry.keys()) return f"Error: Function '{function_name}' not found in registry. Available functions: {available_functions}" # Parse schedule time run_time, error_msg = parse_schedule_time(schedule_time) if error_msg: return error_msg # Generate job ID if not provided if not job_id: job_id = f"{function_name}_{int(datetime.now().timestamp())}" # Prepare arguments func_args = args or [] func_kwargs = kwargs or {} # Schedule the job scheduler.add_job( function_registry[function_name], "date", run_date=run_time, args=func_args, kwargs=func_kwargs, id=job_id, replace_existing=True, ) # Save job to persistent storage save_job_to_persistence(job_id, function_name, run_time, func_args, func_kwargs) # Start scheduler if not already running if not scheduler.running: scheduler.start() logging.info( "Scheduled function call '%s' with job ID: %s for %s", function_name, job_id, run_time, ) return f"Scheduled function call '{function_name}' with job ID: {job_id} for {run_time}" except Exception as e: logging.error("Error scheduling function call: %s", e) return f"Error scheduling function call: {e}" @mcp.tool() async def schedule_recurring_job( function_name: str, schedule_expression: str, args: Optional[List[Any]] = None, kwargs: Optional[Dict[str, Any]] = None, job_id: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> str: """ Schedule a recurring job using cron-like expressions. Args: function_name: Name of the function to call (must be in function registry). schedule_expression: Cron-like expression or interval: - Cron: "0 9 * * 1-5" (9 AM weekdays) - Interval: "every 5 minutes", "every 1 hour", "every 2 days" args: List of positional arguments to pass to the function. kwargs: Dictionary of keyword arguments to pass to the function. job_id: Optional custom job ID. If not provided, will be auto-generated. start_date: Optional start date (ISO format). If not provided, starts immediately. end_date: Optional end date (ISO format). If not provided, runs indefinitely. Returns: Success message with job ID or error message. """ from . import function_registry logging.info( "schedule_recurring_job called with function=%r, schedule=%r", function_name, schedule_expression, ) try: # Check if function exists in registry if function_name not in function_registry: available_functions = ", ".join(function_registry.keys()) return f"Error: Function '{function_name}' not found in registry. Available functions: {available_functions}" # Generate job ID if not provided if not job_id: job_id = f"recurring_{function_name}_{int(datetime.now().timestamp())}" # Prepare arguments func_args = args or [] func_kwargs = kwargs or {} # Parse schedule expression if schedule_expression.startswith("every "): # Interval-based scheduling interval_str = schedule_expression[6:] # Remove "every " if "minute" in interval_str: minutes = int(interval_str.split()[0]) trigger = "interval" trigger_args = {"minutes": minutes} elif "hour" in interval_str: hours = int(interval_str.split()[0]) trigger = "interval" trigger_args = {"hours": hours} elif "day" in interval_str: days = int(interval_str.split()[0]) trigger = "interval" trigger_args = {"days": days} else: return f"Error: Unsupported interval format: {schedule_expression}. Use 'every X minutes/hours/days'" else: # Cron-based scheduling trigger = "cron" trigger_args = {} # Parse cron expression (minute hour day month day_of_week) cron_parts = schedule_expression.split() if len(cron_parts) == 5: trigger_args = { "minute": cron_parts[0], "hour": cron_parts[1], "day": cron_parts[2], "month": cron_parts[3], "day_of_week": cron_parts[4], } else: return f"Error: Invalid cron expression: {schedule_expression}. Use format: 'minute hour day month day_of_week'" # Parse start and end dates start_date_obj = None end_date_obj = None if start_date: try: start_date_obj = datetime.fromisoformat( start_date.replace("Z", "+00:00") ) except ValueError: return ( f"Error: Invalid start_date format: {start_date}. Use ISO format." ) if end_date: try: end_date_obj = datetime.fromisoformat(end_date.replace("Z", "+00:00")) except ValueError: return f"Error: Invalid end_date format: {end_date}. Use ISO format." # Schedule the recurring job job = scheduler.add_job( function_registry[function_name], trigger, **trigger_args, args=func_args, kwargs=func_kwargs, id=job_id, replace_existing=True, start_date=start_date_obj, end_date=end_date_obj, ) # Save recurring job info to persistent storage (different format) save_recurring_job_to_persistence( job_id, function_name, schedule_expression, func_args, func_kwargs, start_date, end_date, ) # Start scheduler if not already running if not scheduler.running: scheduler.start() next_run = job.next_run_time if job.next_run_time else "Not scheduled" logging.info( "Scheduled recurring job '%s' with job ID: %s, next run: %s", function_name, job_id, next_run, ) return f"Scheduled recurring job '{function_name}' with job ID: {job_id}, next run: {next_run}" except Exception as e: logging.error("Error scheduling recurring job: %s", e) return f"Error scheduling recurring job: {e}" @mcp.tool() async def list_scheduled_jobs() -> str: """ List all currently scheduled jobs. Returns: Formatted list of scheduled jobs or error message. """ logging.info("list_scheduled_jobs called") try: jobs = scheduler.get_jobs() if not jobs: return "No scheduled jobs found." output = f"Scheduled Jobs ({len(jobs)} total):\n\n" for job in jobs: output += f"Job ID: {job.id}\n" output += f"Function: {job.func.__name__}\n" output += f"Next Run: {job.next_run_time}\n" output += f"Args: {job.args}\n" output += f"Kwargs: {job.kwargs}\n" output += "---\n" return output except Exception as e: logging.error("Error listing scheduled jobs: %s", e) return f"Error listing scheduled jobs: {e}" @mcp.tool() async def cancel_scheduled_job(job_id: str) -> str: """ Cancel a scheduled job by its ID. Args: job_id: The ID of the job to cancel. Returns: Success or error message. """ logging.info("cancel_scheduled_job called with job_id=%r", job_id) try: # Check if job exists job = scheduler.get_job(job_id) if not job: return f"Error: Job with ID '{job_id}' not found." # Remove the job scheduler.remove_job(job_id) # Remove from persistent storage remove_job_from_persistence(job_id) logging.info("Cancelled job with ID: %s", job_id) return f"Successfully cancelled job with ID: {job_id}" except Exception as e: logging.error("Error cancelling job %s: %s", job_id, e) return f"Error cancelling job {job_id}: {e}" @mcp.tool() async def get_available_functions() -> str: """ Get a list of all available functions that can be scheduled. Returns: Formatted list of available functions. """ from . import function_registry logging.info("get_available_functions called") try: functions = list(function_registry.keys()) output = f"Available Functions for Scheduling ({len(functions)} total):\n\n" for func_name in sorted(functions): output += f"- {func_name}\n" return output except Exception as e: logging.error("Error getting available functions: %s", e) return f"Error getting available functions: {e}" @mcp.tool() async def get_job_execution_log(limit: Optional[int] = 50) -> str: """ Get the job execution log with recent job runs. Args: limit: Maximum number of log entries to return (default: 50). Returns: Formatted job execution log or error message. """ logging.info("get_job_execution_log called with limit=%r", limit) try: log_entries = load_job_execution_log() if not log_entries: return "No job execution log entries found." # Get the most recent entries recent_entries = log_entries[-limit:] if limit else log_entries output = f"Job Execution Log (showing {len(recent_entries)} of {len(log_entries)} entries):\n\n" for entry in reversed(recent_entries): # Show most recent first status = "✅ SUCCESS" if entry["success"] else "❌ FAILED" output += f"Job ID: {entry['job_id']}\n" output += f"Function: {entry['function_name']}\n" output += f"Time: {entry['timestamp']}\n" output += f"Status: {status}\n" if entry["result"]: output += f"Result: {entry['result']}\n" if entry["error"]: output += f"Error: {entry['error']}\n" output += "---\n" return output except Exception as e: logging.error("Error getting job execution log: %s", e) return f"Error getting job execution log: {e}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/karar-hayder/Simple-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server