Skip to main content
Glama

E-commerce Local MCP Server

scheduler.py9.25 kB
""" Dynamic sync scheduler with configurable intervals. Handles background sync operations with runtime control. """ import asyncio import logging from typing import Optional, Callable, Awaitable from datetime import datetime, timedelta from enum import Enum from ..config.settings import settings logger = logging.getLogger(__name__) class SchedulerStatus(Enum): """Scheduler status enum.""" STOPPED = "stopped" RUNNING = "running" PAUSED = "paused" class SyncScheduler: """ Background scheduler for automated sync operations. Supports dynamic interval changes and runtime control. """ def __init__(self, sync_function: Callable[[], Awaitable]): """ Initialize scheduler with sync function. Args: sync_function: Async function to call for sync operations """ self.sync_function = sync_function self.status = SchedulerStatus.STOPPED self.interval_minutes = getattr(settings, 'SYNC_INTERVAL_MINUTES', 60) self.last_sync_time: Optional[datetime] = None self.next_sync_time: Optional[datetime] = None self._task: Optional[asyncio.Task] = None self._stop_event = asyncio.Event() self._pause_event = asyncio.Event() self._interval_changed = asyncio.Event() async def start(self): """Start the scheduler.""" if self.status == SchedulerStatus.RUNNING: logger.warning("Scheduler is already running") return if not getattr(settings, 'SYNC_ENABLED', False): logger.warning("Sync is disabled in configuration") return self.status = SchedulerStatus.RUNNING self._stop_event.clear() self._pause_event.set() # Start unpaused # Start the background task self._task = asyncio.create_task(self._scheduler_loop()) logger.info(f"Sync scheduler started with {self.interval_minutes}-minute intervals") async def stop(self): """Stop the scheduler.""" if self.status == SchedulerStatus.STOPPED: return self.status = SchedulerStatus.STOPPED self._stop_event.set() if self._task: try: await asyncio.wait_for(self._task, timeout=5.0) except asyncio.TimeoutError: logger.warning("Scheduler task did not stop gracefully, cancelling") self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None self.next_sync_time = None logger.info("Sync scheduler stopped") async def pause(self): """Pause the scheduler without stopping it.""" if self.status == SchedulerStatus.RUNNING: self.status = SchedulerStatus.PAUSED self._pause_event.clear() logger.info("Sync scheduler paused") async def resume(self): """Resume the scheduler if paused.""" if self.status == SchedulerStatus.PAUSED: self.status = SchedulerStatus.RUNNING self._pause_event.set() logger.info("Sync scheduler resumed") async def update_interval(self, minutes: int): """ Update sync interval dynamically. Args: minutes: New interval in minutes """ if minutes < 1: raise ValueError("Interval must be at least 1 minute") old_interval = self.interval_minutes self.interval_minutes = minutes # Signal that interval changed self._interval_changed.set() # Update next sync time if self.last_sync_time: self.next_sync_time = self.last_sync_time + timedelta(minutes=minutes) else: self.next_sync_time = datetime.now() + timedelta(minutes=minutes) logger.info(f"Sync interval updated from {old_interval} to {minutes} minutes") async def trigger_immediate_sync(self): """Trigger an immediate sync operation.""" if self.status == SchedulerStatus.STOPPED: logger.warning("Cannot trigger sync: scheduler is stopped") return False try: logger.info("Triggering immediate sync") await self._execute_sync() return True except Exception as e: logger.error(f"Immediate sync failed: {e}") return False def get_status(self) -> dict: """Get current scheduler status and timing information.""" return { "status": self.status.value, "interval_minutes": self.interval_minutes, "last_sync_time": self.last_sync_time.isoformat() if self.last_sync_time else None, "next_sync_time": self.next_sync_time.isoformat() if self.next_sync_time else None, "sync_enabled": getattr(settings, 'SYNC_ENABLED', False), "time_until_next_sync": self._get_time_until_next_sync() } def _get_time_until_next_sync(self) -> Optional[str]: """Get human-readable time until next sync.""" if not self.next_sync_time or self.status != SchedulerStatus.RUNNING: return None time_diff = self.next_sync_time - datetime.now() if time_diff.total_seconds() <= 0: return "Due now" total_seconds = int(time_diff.total_seconds()) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) if hours > 0: return f"{hours}h {minutes}m" elif minutes > 0: return f"{minutes}m {seconds}s" else: return f"{seconds}s" async def _scheduler_loop(self): """Main scheduler loop.""" logger.info("Scheduler loop started") # Calculate initial next sync time self.next_sync_time = datetime.now() + timedelta(minutes=self.interval_minutes) try: while not self._stop_event.is_set(): # Wait for pause to be cleared (resume) await self._pause_event.wait() # Check if we should stop if self._stop_event.is_set(): break # Calculate time to wait until next sync now = datetime.now() if self.next_sync_time and now >= self.next_sync_time: # Time for sync await self._execute_sync() # Schedule next sync self.next_sync_time = datetime.now() + timedelta(minutes=self.interval_minutes) # Wait for either: # 1. Time for next sync (max 60 seconds) # 2. Stop signal # 3. Pause signal # 4. Interval change signal wait_time = min(60, max(1, (self.next_sync_time - datetime.now()).total_seconds())) try: await asyncio.wait_for( asyncio.gather( self._stop_event.wait(), self._interval_changed.wait(), return_when=asyncio.FIRST_COMPLETED ), timeout=wait_time ) except asyncio.TimeoutError: # Normal timeout, continue loop pass # Clear interval changed event if self._interval_changed.is_set(): self._interval_changed.clear() except Exception as e: logger.error(f"Scheduler loop error: {e}") self.status = SchedulerStatus.STOPPED logger.info("Scheduler loop ended") async def _execute_sync(self): """Execute a sync operation.""" try: logger.info("Executing scheduled sync") sync_start_time = datetime.now() # Execute the sync function await self.sync_function() # Update timing self.last_sync_time = sync_start_time sync_duration = (datetime.now() - sync_start_time).total_seconds() logger.info(f"Scheduled sync completed in {sync_duration:.2f} seconds") except Exception as e: logger.error(f"Scheduled sync failed: {e}") # Don't stop the scheduler on sync failure # Just log and continue with next scheduled sync def is_running(self) -> bool: """Check if scheduler is running.""" return self.status == SchedulerStatus.RUNNING def is_paused(self) -> bool: """Check if scheduler is paused.""" return self.status == SchedulerStatus.PAUSED def is_stopped(self) -> bool: """Check if scheduler is stopped.""" return self.status == SchedulerStatus.STOPPED

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AnisurRahman06046/mcptestwithmodel'

If you have feedback or need assistance with the MCP directory API, please join our Discord server