scheduler.py•6.01 kB
"""
Scheduling functionality for the MCP server.
"""
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional, Tuple
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from persistence import ( # save_job_to_persistence, # Unused import removed; save_recurring_job_to_persistence, # Unused import removed
load_scheduled_jobs,
log_job_execution,
remove_job_from_persistence,
)
# Initialize the scheduler (force UTC timezone)
scheduler = AsyncIOScheduler(
jobstores={"default": MemoryJobStore()},
executors={"default": AsyncIOExecutor()},
job_defaults={"coalesce": False, "max_instances": 3},
timezone=timezone.utc,
)
def parse_schedule_time(schedule_time: str) -> Tuple[Optional[datetime], str]:
"""
Parse schedule time string and return timezone-aware UTC datetime and error message.
Args:
schedule_time: Time string to parse
Returns:
Tuple of (datetime, error_message). If error_message is not empty, datetime is None.
"""
now = datetime.now(timezone.utc)
if schedule_time.startswith("in "):
# Relative time parsing
time_str = schedule_time[3:] # Remove "in "
if "second" in time_str:
seconds = int(time_str.split()[0])
run_time = now + timedelta(seconds=seconds)
elif "minute" in time_str:
minutes = int(time_str.split()[0])
run_time = now + timedelta(minutes=minutes)
elif "hour" in time_str:
hours = int(time_str.split()[0])
run_time = now + timedelta(hours=hours)
elif "day" in time_str:
days = int(time_str.split()[0])
run_time = now + timedelta(days=days)
elif "week" in time_str:
weeks = int(time_str.split()[0])
run_time = now + timedelta(weeks=weeks)
else:
return (
None,
(
f"Error: Unsupported relative time format: {schedule_time}. Supported: seconds, minutes, hours, days, weeks"
),
)
else:
# Try to parse as ISO datetime
try:
# Accepts both naive and aware, but always convert to UTC and make aware
dt = datetime.fromisoformat(schedule_time.replace("Z", "+00:00"))
if dt.tzinfo is None:
# Assume naive datetimes are in UTC
run_time = dt.replace(tzinfo=timezone.utc)
else:
run_time = dt.astimezone(timezone.utc)
except ValueError:
return (
None,
(
f"Error: Invalid datetime format: {schedule_time}. Use ISO format or relative time like 'in 5 minutes'"
),
)
return run_time, ""
def job_listener(event):
"""Handle job execution events"""
if event.exception:
logging.error("Job %s failed: %s", event.job_id, event.exception)
log_job_execution(
event.job_id, event.job.func.__name__, False, error=str(event.exception)
)
else:
logging.info("Job %s executed successfully", event.job_id)
log_job_execution(
event.job_id,
event.job.func.__name__,
True,
result="Job executed successfully",
)
def restore_jobs_from_persistence(function_registry: Dict[str, Any]) -> int:
"""Restore jobs from persistent storage on startup, using UTC for all time comparisons."""
try:
jobs = load_scheduled_jobs()
restored_count = 0
for job_id, job_data in jobs.items():
try:
# Handle recurring jobs differently
if job_data.get("type") == "recurring":
# Skip recurring jobs for now - they need special handling
continue
# Always parse as aware UTC datetime
dt = datetime.fromisoformat(job_data["run_time"].replace("Z", "+00:00"))
if dt.tzinfo is None:
run_time = dt.replace(tzinfo=timezone.utc)
else:
run_time = dt.astimezone(timezone.utc)
# Only restore jobs that haven't run yet (in UTC)
if run_time > datetime.now(timezone.utc):
if job_data["function_name"] in function_registry:
scheduler.add_job(
function_registry[job_data["function_name"]],
"date",
run_date=run_time,
args=job_data["args"],
kwargs=job_data["kwargs"],
id=job_id,
replace_existing=True,
)
restored_count += 1
else:
logging.warning(
"Function %s not found in registry, skipping job %s",
job_data["function_name"],
job_id,
)
else:
# Remove expired jobs
remove_job_from_persistence(job_id)
except (KeyError, ValueError, TypeError) as exc:
logging.error("Error restoring job %s: %s", job_id, exc)
remove_job_from_persistence(job_id)
logging.info("Restored %d jobs from persistence", restored_count)
return restored_count
except (OSError, json.JSONDecodeError, KeyError, ValueError, TypeError) as exc:
logging.error("Error restoring jobs from persistence: %s", exc)
return 0
# Add event listener
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)