Skip to main content
Glama
scheduler.py5.87 kB
""" Log cleanup scheduler using APScheduler """ import asyncio from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from datetime import datetime from .database import cleanup_old_logs, get_setting class LogCleanupScheduler: """Log cleanup scheduler manager""" def __init__(self): """Initialize the scheduler""" self.scheduler: Optional[AsyncIOScheduler] = None self.job_id = "log_cleanup_job" self._initialized = False def _create_scheduler(self) -> AsyncIOScheduler: """Create and configure the scheduler""" scheduler = AsyncIOScheduler() # Optional: Add event listeners for logging scheduler.add_listener(self._job_executed, EVENT_JOB_EXECUTED) scheduler.add_listener(self._job_error, EVENT_JOB_ERROR) return scheduler def _job_executed(self, event): """Handle job execution success""" print(f"Log cleanup job executed successfully at {datetime.now()}") def _job_error(self, event): """Handle job execution error""" print(f"Log cleanup job failed at {datetime.now()}: {event.exception}") async def _cleanup_logs_async(self): """Async wrapper for cleanup_old_logs function""" try: # Run the synchronous cleanup function in a thread pool loop = asyncio.get_event_loop() await loop.run_in_executor(None, cleanup_old_logs) except Exception as e: print(f"Error during async log cleanup: {e}") raise def initialize(self): """Initialize the scheduler and add the cleanup job""" if self._initialized: return try: # Create scheduler self.scheduler = self._create_scheduler() # Get current settings setting = get_setting() clean_hour = setting.clean_logs_at_hour # Add the cleanup job with cron trigger # Run daily at the specified hour trigger = CronTrigger(hour=clean_hour, minute=0, second=0) self.scheduler.add_job( func=self._cleanup_logs_async, trigger=trigger, id=self.job_id, name="Daily Log Cleanup", replace_existing=True, # Task-level configuration (overrides defaults) coalesce=True, # Merge duplicate tasks max_instances=1, # Maximum 1 instance running concurrently misfire_grace_time=3600 # Grace time for missed tasks (1 hour) ) print(f"Log cleanup job scheduled to run daily at {clean_hour:02d}:00") self._initialized = True except Exception as e: print(f"Error initializing log cleanup scheduler: {e}") raise def start(self): """Start the scheduler""" if not self._initialized: self.initialize() if self.scheduler and not self.scheduler.running: self.scheduler.start() print("Log cleanup scheduler started") def stop(self): """Stop the scheduler""" if self.scheduler and self.scheduler.running: self.scheduler.shutdown(wait=False) print("Log cleanup scheduler stopped") def reschedule_cleanup_job(self, new_hour: int): """Reschedule the cleanup job to run at a different hour Args: new_hour: Hour of the day (0-23) when the job should run """ if not self.scheduler: print("Scheduler not initialized, cannot reschedule job") return try: # Validate hour if not (0 <= new_hour <= 23): raise ValueError(f"Hour must be between 0 and 23, got {new_hour}") # Create new trigger new_trigger = CronTrigger(hour=new_hour, minute=0, second=0) # Reschedule the job self.scheduler.reschedule_job( job_id=self.job_id, trigger=new_trigger ) print(f"Log cleanup job rescheduled to run daily at {new_hour:02d}:00") except Exception as e: print(f"Error rescheduling log cleanup job: {e}") raise def get_next_run_time(self) -> Optional[datetime]: """Get the next scheduled run time for the cleanup job""" if not self.scheduler: return None try: job = self.scheduler.get_job(self.job_id) if job: return job.next_run_time except Exception as e: print(f"Error getting next run time: {e}") return None def trigger_cleanup_now(self): """Trigger log cleanup immediately (for testing purposes)""" if not self.scheduler: print("Scheduler not initialized, cannot trigger cleanup") return try: # Add a one-time job to run immediately self.scheduler.add_job( func=self._cleanup_logs_async, trigger='date', run_date=datetime.now(), id=f"{self.job_id}_manual", name="Manual Log Cleanup", replace_existing=True ) print("Manual log cleanup triggered") except Exception as e: print(f"Error triggering manual cleanup: {e}") raise # Global scheduler instance log_cleanup_scheduler = LogCleanupScheduler()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/itcook/graphiti-mcp-pro'

If you have feedback or need assistance with the MCP directory API, please join our Discord server