Skip to main content
Glama

Simple MCP

by karar-hayder
scheduler.py6.01 kB
""" Scheduling functionality for the MCP server. """ import json import logging from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional, Tuple from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from apscheduler.executors.asyncio import AsyncIOExecutor from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.asyncio import AsyncIOScheduler from persistence import ( # save_job_to_persistence, # Unused import removed; save_recurring_job_to_persistence, # Unused import removed load_scheduled_jobs, log_job_execution, remove_job_from_persistence, ) # Initialize the scheduler (force UTC timezone) scheduler = AsyncIOScheduler( jobstores={"default": MemoryJobStore()}, executors={"default": AsyncIOExecutor()}, job_defaults={"coalesce": False, "max_instances": 3}, timezone=timezone.utc, ) def parse_schedule_time(schedule_time: str) -> Tuple[Optional[datetime], str]: """ Parse schedule time string and return timezone-aware UTC datetime and error message. Args: schedule_time: Time string to parse Returns: Tuple of (datetime, error_message). If error_message is not empty, datetime is None. """ now = datetime.now(timezone.utc) if schedule_time.startswith("in "): # Relative time parsing time_str = schedule_time[3:] # Remove "in " if "second" in time_str: seconds = int(time_str.split()[0]) run_time = now + timedelta(seconds=seconds) elif "minute" in time_str: minutes = int(time_str.split()[0]) run_time = now + timedelta(minutes=minutes) elif "hour" in time_str: hours = int(time_str.split()[0]) run_time = now + timedelta(hours=hours) elif "day" in time_str: days = int(time_str.split()[0]) run_time = now + timedelta(days=days) elif "week" in time_str: weeks = int(time_str.split()[0]) run_time = now + timedelta(weeks=weeks) else: return ( None, ( f"Error: Unsupported relative time format: {schedule_time}. Supported: seconds, minutes, hours, days, weeks" ), ) else: # Try to parse as ISO datetime try: # Accepts both naive and aware, but always convert to UTC and make aware dt = datetime.fromisoformat(schedule_time.replace("Z", "+00:00")) if dt.tzinfo is None: # Assume naive datetimes are in UTC run_time = dt.replace(tzinfo=timezone.utc) else: run_time = dt.astimezone(timezone.utc) except ValueError: return ( None, ( f"Error: Invalid datetime format: {schedule_time}. Use ISO format or relative time like 'in 5 minutes'" ), ) return run_time, "" def job_listener(event): """Handle job execution events""" if event.exception: logging.error("Job %s failed: %s", event.job_id, event.exception) log_job_execution( event.job_id, event.job.func.__name__, False, error=str(event.exception) ) else: logging.info("Job %s executed successfully", event.job_id) log_job_execution( event.job_id, event.job.func.__name__, True, result="Job executed successfully", ) def restore_jobs_from_persistence(function_registry: Dict[str, Any]) -> int: """Restore jobs from persistent storage on startup, using UTC for all time comparisons.""" try: jobs = load_scheduled_jobs() restored_count = 0 for job_id, job_data in jobs.items(): try: # Handle recurring jobs differently if job_data.get("type") == "recurring": # Skip recurring jobs for now - they need special handling continue # Always parse as aware UTC datetime dt = datetime.fromisoformat(job_data["run_time"].replace("Z", "+00:00")) if dt.tzinfo is None: run_time = dt.replace(tzinfo=timezone.utc) else: run_time = dt.astimezone(timezone.utc) # Only restore jobs that haven't run yet (in UTC) if run_time > datetime.now(timezone.utc): if job_data["function_name"] in function_registry: scheduler.add_job( function_registry[job_data["function_name"]], "date", run_date=run_time, args=job_data["args"], kwargs=job_data["kwargs"], id=job_id, replace_existing=True, ) restored_count += 1 else: logging.warning( "Function %s not found in registry, skipping job %s", job_data["function_name"], job_id, ) else: # Remove expired jobs remove_job_from_persistence(job_id) except (KeyError, ValueError, TypeError) as exc: logging.error("Error restoring job %s: %s", job_id, exc) remove_job_from_persistence(job_id) logging.info("Restored %d jobs from persistence", restored_count) return restored_count except (OSError, json.JSONDecodeError, KeyError, ValueError, TypeError) as exc: logging.error("Error restoring jobs from persistence: %s", exc) return 0 # Add event listener scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/karar-hayder/Simple-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server