scheduling_tools.py•15.1 kB
"""
Scheduling tools for the MCP server.
"""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from persistence import (
load_job_execution_log,
remove_job_from_persistence,
save_job_to_persistence,
save_recurring_job_to_persistence,
)
from scheduler import parse_schedule_time, scheduler
from . import mcp
@mcp.tool()
async def schedule_telegram_message(
text: str,
schedule_time: str,
chat_id: Optional[str] = None,
job_id: Optional[str] = None,
) -> str:
"""
Schedule a Telegram message to be sent at a specific time.
Args:
text: The message text to send.
schedule_time: When to send the message. Can be:
- ISO format datetime string (e.g., "2024-01-15T14:30:00")
- Relative time like "in 5 minutes", "in 1 hour", "in 2 days"
- Specific time like "tomorrow at 9:00", "next monday at 10:30"
chat_id: The Telegram chat ID to send to. If not provided, uses ADMIN_ID from env.
job_id: Optional custom job ID. If not provided, will be auto-generated.
Returns:
Success message with job ID or error message.
"""
import asyncio
from . import telegram_tools
logging.info(
"schedule_telegram_message called with text=%r, schedule_time=%r",
text,
schedule_time,
)
def sync_send_telegram_message(text, chat_id):
# Run the async function in a new event loop (safe for scheduler)
try:
asyncio.run(
telegram_tools.send_telegram_message(text=text, chat_id=chat_id)
)
except RuntimeError:
# If already in an event loop (shouldn't happen in scheduler), fallback
loop = asyncio.get_event_loop()
loop.run_until_complete(
telegram_tools.send_telegram_message(text=text, chat_id=chat_id)
)
try:
# Parse schedule time
run_time, error_msg = parse_schedule_time(schedule_time)
if error_msg:
return error_msg
# Generate job ID if not provided
if not job_id:
job_id = f"telegram_msg_{int(datetime.now().timestamp())}"
# Schedule the job using the sync wrapper
scheduler.add_job(
sync_send_telegram_message,
"date",
run_date=run_time,
args=[text, chat_id],
id=job_id,
replace_existing=True,
)
# Save job to persistent storage
save_job_to_persistence(
job_id, "send_telegram_message", run_time, [text, chat_id], {}
)
# Start scheduler if not already running
if not scheduler.running:
scheduler.start()
logging.info(
"Scheduled Telegram message with job ID: %s for %s", job_id, run_time
)
return f"Scheduled Telegram message with job ID: {job_id} for {run_time}"
except Exception as e:
logging.error("Error scheduling Telegram message: %s", e)
return f"Error scheduling Telegram message: {e}"
@mcp.tool()
async def schedule_function_call(
function_name: str,
schedule_time: str,
args: Optional[List[Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
job_id: Optional[str] = None,
) -> str:
"""
Schedule any registered function to be called at a specific time.
Args:
function_name: Name of the function to call (must be in function registry).
schedule_time: When to call the function. Can be:
- ISO format datetime string (e.g., "2024-01-15T14:30:00")
- Relative time like "in 5 minutes", "in 1 hour", "in 2 days"
args: List of positional arguments to pass to the function.
kwargs: Dictionary of keyword arguments to pass to the function.
job_id: Optional custom job ID. If not provided, will be auto-generated.
Returns:
Success message with job ID or error message.
"""
from . import function_registry
logging.info(
"schedule_function_call called with function=%r, schedule_time=%r",
function_name,
schedule_time,
)
try:
# Check if function exists in registry
if function_name not in function_registry:
available_functions = ", ".join(function_registry.keys())
return f"Error: Function '{function_name}' not found in registry. Available functions: {available_functions}"
# Parse schedule time
run_time, error_msg = parse_schedule_time(schedule_time)
if error_msg:
return error_msg
# Generate job ID if not provided
if not job_id:
job_id = f"{function_name}_{int(datetime.now().timestamp())}"
# Prepare arguments
func_args = args or []
func_kwargs = kwargs or {}
# Schedule the job
scheduler.add_job(
function_registry[function_name],
"date",
run_date=run_time,
args=func_args,
kwargs=func_kwargs,
id=job_id,
replace_existing=True,
)
# Save job to persistent storage
save_job_to_persistence(job_id, function_name, run_time, func_args, func_kwargs)
# Start scheduler if not already running
if not scheduler.running:
scheduler.start()
logging.info(
"Scheduled function call '%s' with job ID: %s for %s",
function_name,
job_id,
run_time,
)
return f"Scheduled function call '{function_name}' with job ID: {job_id} for {run_time}"
except Exception as e:
logging.error("Error scheduling function call: %s", e)
return f"Error scheduling function call: {e}"
@mcp.tool()
async def schedule_recurring_job(
function_name: str,
schedule_expression: str,
args: Optional[List[Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
job_id: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> str:
"""
Schedule a recurring job using cron-like expressions.
Args:
function_name: Name of the function to call (must be in function registry).
schedule_expression: Cron-like expression or interval:
- Cron: "0 9 * * 1-5" (9 AM weekdays)
- Interval: "every 5 minutes", "every 1 hour", "every 2 days"
args: List of positional arguments to pass to the function.
kwargs: Dictionary of keyword arguments to pass to the function.
job_id: Optional custom job ID. If not provided, will be auto-generated.
start_date: Optional start date (ISO format). If not provided, starts immediately.
end_date: Optional end date (ISO format). If not provided, runs indefinitely.
Returns:
Success message with job ID or error message.
"""
from . import function_registry
logging.info(
"schedule_recurring_job called with function=%r, schedule=%r",
function_name,
schedule_expression,
)
try:
# Check if function exists in registry
if function_name not in function_registry:
available_functions = ", ".join(function_registry.keys())
return f"Error: Function '{function_name}' not found in registry. Available functions: {available_functions}"
# Generate job ID if not provided
if not job_id:
job_id = f"recurring_{function_name}_{int(datetime.now().timestamp())}"
# Prepare arguments
func_args = args or []
func_kwargs = kwargs or {}
# Parse schedule expression
if schedule_expression.startswith("every "):
# Interval-based scheduling
interval_str = schedule_expression[6:] # Remove "every "
if "minute" in interval_str:
minutes = int(interval_str.split()[0])
trigger = "interval"
trigger_args = {"minutes": minutes}
elif "hour" in interval_str:
hours = int(interval_str.split()[0])
trigger = "interval"
trigger_args = {"hours": hours}
elif "day" in interval_str:
days = int(interval_str.split()[0])
trigger = "interval"
trigger_args = {"days": days}
else:
return f"Error: Unsupported interval format: {schedule_expression}. Use 'every X minutes/hours/days'"
else:
# Cron-based scheduling
trigger = "cron"
trigger_args = {}
# Parse cron expression (minute hour day month day_of_week)
cron_parts = schedule_expression.split()
if len(cron_parts) == 5:
trigger_args = {
"minute": cron_parts[0],
"hour": cron_parts[1],
"day": cron_parts[2],
"month": cron_parts[3],
"day_of_week": cron_parts[4],
}
else:
return f"Error: Invalid cron expression: {schedule_expression}. Use format: 'minute hour day month day_of_week'"
# Parse start and end dates
start_date_obj = None
end_date_obj = None
if start_date:
try:
start_date_obj = datetime.fromisoformat(
start_date.replace("Z", "+00:00")
)
except ValueError:
return (
f"Error: Invalid start_date format: {start_date}. Use ISO format."
)
if end_date:
try:
end_date_obj = datetime.fromisoformat(end_date.replace("Z", "+00:00"))
except ValueError:
return f"Error: Invalid end_date format: {end_date}. Use ISO format."
# Schedule the recurring job
job = scheduler.add_job(
function_registry[function_name],
trigger,
**trigger_args,
args=func_args,
kwargs=func_kwargs,
id=job_id,
replace_existing=True,
start_date=start_date_obj,
end_date=end_date_obj,
)
# Save recurring job info to persistent storage (different format)
save_recurring_job_to_persistence(
job_id,
function_name,
schedule_expression,
func_args,
func_kwargs,
start_date,
end_date,
)
# Start scheduler if not already running
if not scheduler.running:
scheduler.start()
next_run = job.next_run_time if job.next_run_time else "Not scheduled"
logging.info(
"Scheduled recurring job '%s' with job ID: %s, next run: %s",
function_name,
job_id,
next_run,
)
return f"Scheduled recurring job '{function_name}' with job ID: {job_id}, next run: {next_run}"
except Exception as e:
logging.error("Error scheduling recurring job: %s", e)
return f"Error scheduling recurring job: {e}"
@mcp.tool()
async def list_scheduled_jobs() -> str:
"""
List all currently scheduled jobs.
Returns:
Formatted list of scheduled jobs or error message.
"""
logging.info("list_scheduled_jobs called")
try:
jobs = scheduler.get_jobs()
if not jobs:
return "No scheduled jobs found."
output = f"Scheduled Jobs ({len(jobs)} total):\n\n"
for job in jobs:
output += f"Job ID: {job.id}\n"
output += f"Function: {job.func.__name__}\n"
output += f"Next Run: {job.next_run_time}\n"
output += f"Args: {job.args}\n"
output += f"Kwargs: {job.kwargs}\n"
output += "---\n"
return output
except Exception as e:
logging.error("Error listing scheduled jobs: %s", e)
return f"Error listing scheduled jobs: {e}"
@mcp.tool()
async def cancel_scheduled_job(job_id: str) -> str:
"""
Cancel a scheduled job by its ID.
Args:
job_id: The ID of the job to cancel.
Returns:
Success or error message.
"""
logging.info("cancel_scheduled_job called with job_id=%r", job_id)
try:
# Check if job exists
job = scheduler.get_job(job_id)
if not job:
return f"Error: Job with ID '{job_id}' not found."
# Remove the job
scheduler.remove_job(job_id)
# Remove from persistent storage
remove_job_from_persistence(job_id)
logging.info("Cancelled job with ID: %s", job_id)
return f"Successfully cancelled job with ID: {job_id}"
except Exception as e:
logging.error("Error cancelling job %s: %s", job_id, e)
return f"Error cancelling job {job_id}: {e}"
@mcp.tool()
async def get_available_functions() -> str:
"""
Get a list of all available functions that can be scheduled.
Returns:
Formatted list of available functions.
"""
from . import function_registry
logging.info("get_available_functions called")
try:
functions = list(function_registry.keys())
output = f"Available Functions for Scheduling ({len(functions)} total):\n\n"
for func_name in sorted(functions):
output += f"- {func_name}\n"
return output
except Exception as e:
logging.error("Error getting available functions: %s", e)
return f"Error getting available functions: {e}"
@mcp.tool()
async def get_job_execution_log(limit: Optional[int] = 50) -> str:
"""
Get the job execution log with recent job runs.
Args:
limit: Maximum number of log entries to return (default: 50).
Returns:
Formatted job execution log or error message.
"""
logging.info("get_job_execution_log called with limit=%r", limit)
try:
log_entries = load_job_execution_log()
if not log_entries:
return "No job execution log entries found."
# Get the most recent entries
recent_entries = log_entries[-limit:] if limit else log_entries
output = f"Job Execution Log (showing {len(recent_entries)} of {len(log_entries)} entries):\n\n"
for entry in reversed(recent_entries): # Show most recent first
status = "✅ SUCCESS" if entry["success"] else "❌ FAILED"
output += f"Job ID: {entry['job_id']}\n"
output += f"Function: {entry['function_name']}\n"
output += f"Time: {entry['timestamp']}\n"
output += f"Status: {status}\n"
if entry["result"]:
output += f"Result: {entry['result']}\n"
if entry["error"]:
output += f"Error: {entry['error']}\n"
output += "---\n"
return output
except Exception as e:
logging.error("Error getting job execution log: %s", e)
return f"Error getting job execution log: {e}"