Skip to main content
Glama
scheduler.py16.9 kB
# Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Automatic backup scheduler for MCP Memory Service. Provides scheduled database backups with configurable intervals and retention policies. """ import asyncio import shutil import sqlite3 import logging import time from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Dict, Any, List, Optional from ..config import ( BACKUPS_PATH, BACKUP_ENABLED, BACKUP_INTERVAL, BACKUP_RETENTION, BACKUP_MAX_COUNT, SQLITE_VEC_PATH ) logger = logging.getLogger(__name__) class BackupService: """Service for creating and managing database backups.""" def __init__(self, backups_dir: str = None, db_path: str = None): """Initialize backup service. Args: backups_dir: Directory to store backups (defaults to BACKUPS_PATH) db_path: Path to database file (defaults to SQLITE_VEC_PATH) """ self.backups_dir = Path(backups_dir or BACKUPS_PATH) # Determine database path with clear fallback logic db_path_str = db_path or SQLITE_VEC_PATH self.db_path = Path(db_path_str) if db_path_str else None self.last_backup_time: Optional[float] = None self.backup_count: int = 0 self._lock = asyncio.Lock() # Ensure thread-safe operations # Ensure backup directory exists self.backups_dir.mkdir(parents=True, exist_ok=True) # Load existing backup metadata self._load_backup_metadata() logger.info(f"BackupService initialized: backups_dir={self.backups_dir}, db_path={self.db_path}") def _load_backup_metadata(self): """Load metadata about existing backups.""" backups = self.list_backups() self.backup_count = len(backups) if backups: # Get most recent backup time latest = backups[0] self.last_backup_time = latest.get('created_timestamp', 0) def _generate_backup_filename(self) -> str: """Generate a timestamped backup filename.""" timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') return f"memory_backup_{timestamp}.db" async def create_backup(self, description: str = None) -> Dict[str, Any]: """Create a new database backup. Args: description: Optional description for the backup Returns: Dict with backup details """ if not self.db_path or not self.db_path.exists(): return { 'success': False, 'error': f'Database file not found: {self.db_path}', 'timestamp': datetime.now(timezone.utc).isoformat() } async with self._lock: # Ensure atomic operations try: start_time = time.time() created_at = datetime.now(timezone.utc) # Generate backup filename backup_filename = self._generate_backup_filename() backup_path = self.backups_dir / backup_filename # Use SQLite's native backup API for safe, consistent backups # This handles active database connections properly def _do_backup(): source = sqlite3.connect(str(self.db_path)) dest = sqlite3.connect(str(backup_path)) try: source.backup(dest) finally: source.close() dest.close() await asyncio.to_thread(_do_backup) # Calculate backup duration (just the backup operation) backup_duration = time.time() - start_time # Get backup size backup_size = backup_path.stat().st_size # Update metadata self.last_backup_time = created_at.timestamp() self.backup_count += 1 logger.info(f"Created backup: {backup_filename} ({backup_size} bytes) in {backup_duration:.2f}s") # Cleanup old backups (outside of duration calculation) await self.cleanup_old_backups() return { 'success': True, 'filename': backup_filename, 'path': str(backup_path), 'size_bytes': backup_size, 'description': description, 'created_at': created_at.isoformat(), 'duration_seconds': round(backup_duration, 3) } except Exception as e: logger.error(f"Failed to create backup: {e}") return { 'success': False, 'error': str(e), 'timestamp': datetime.now(timezone.utc).isoformat() } def list_backups(self) -> List[Dict[str, Any]]: """List all available backups. Returns: List of backup info dicts, sorted by date (newest first) """ backups = [] try: for backup_file in self.backups_dir.glob('memory_backup_*.db'): stat = backup_file.stat() # Parse timestamp from filename try: timestamp_str = backup_file.stem.replace('memory_backup_', '') created_dt = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S') created_dt = created_dt.replace(tzinfo=timezone.utc) except ValueError: created_dt = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) backups.append({ 'filename': backup_file.name, 'path': str(backup_file), 'size_bytes': stat.st_size, 'created_at': created_dt.isoformat(), 'created_timestamp': created_dt.timestamp(), 'age_days': (datetime.now(timezone.utc) - created_dt).days }) # Sort by creation time, newest first backups.sort(key=lambda x: x['created_timestamp'], reverse=True) except Exception as e: logger.error(f"Error listing backups: {e}") return backups async def cleanup_old_backups(self) -> Dict[str, Any]: """Remove old backups based on retention policy. Removes backups that are: - Older than BACKUP_RETENTION days - Exceed BACKUP_MAX_COUNT Returns: Dict with cleanup results """ removed = [] errors = [] try: backups = self.list_backups() retention_cutoff = datetime.now(timezone.utc) - timedelta(days=BACKUP_RETENTION) for i, backup in enumerate(backups): should_remove = False reason = "" # Check if exceeds max count if i >= BACKUP_MAX_COUNT: should_remove = True reason = f"exceeds max count ({BACKUP_MAX_COUNT})" # Check if older than retention period try: created_dt = datetime.fromisoformat(backup['created_at'].replace('Z', '+00:00')) if created_dt < retention_cutoff: should_remove = True reason = f"older than {BACKUP_RETENTION} days" except (ValueError, KeyError) as e: logger.warning(f"Could not parse timestamp for backup {backup.get('filename', 'unknown')}: {e}") if should_remove: try: # Use asyncio.to_thread to avoid blocking the event loop await asyncio.to_thread(Path(backup['path']).unlink) removed.append({ 'filename': backup['filename'], 'reason': reason }) logger.info(f"Removed old backup: {backup['filename']} ({reason})") except Exception as e: errors.append({ 'filename': backup['filename'], 'error': str(e) }) logger.error(f"Failed to remove backup {backup['filename']}: {e}") # Update count more efficiently by subtracting removed count self.backup_count = max(0, self.backup_count - len(removed)) except Exception as e: logger.error(f"Error during backup cleanup: {e}") errors.append({'error': str(e)}) return { 'removed_count': len(removed), 'removed': removed, 'errors': errors } async def restore_backup(self, filename: str) -> Dict[str, Any]: """Restore database from a backup. Args: filename: Name of backup file to restore Returns: Dict with restore results """ backup_path = self.backups_dir / filename if not backup_path.exists(): return { 'success': False, 'error': f'Backup file not found: {filename}' } if not self.db_path: return { 'success': False, 'error': 'Database path not configured' } try: # Create a backup of current database first if self.db_path.exists(): current_backup = self.db_path.with_suffix('.db.pre_restore') # Use asyncio.to_thread to avoid blocking the event loop await asyncio.to_thread(shutil.copy2, str(self.db_path), str(current_backup)) logger.info(f"Created pre-restore backup: {current_backup}") # Restore from backup # Use asyncio.to_thread to avoid blocking the event loop await asyncio.to_thread(shutil.copy2, str(backup_path), str(self.db_path)) logger.info(f"Restored database from backup: {filename}") return { 'success': True, 'filename': filename, 'restored_at': datetime.now(timezone.utc).isoformat() } except Exception as e: logger.error(f"Failed to restore backup: {e}") return { 'success': False, 'error': str(e) } def get_status(self) -> Dict[str, Any]: """Get current backup service status. Returns: Dict with backup service status """ backups = self.list_backups() total_size = sum(b['size_bytes'] for b in backups) # Calculate time since last backup time_since_last = None if self.last_backup_time: time_since_last = time.time() - self.last_backup_time # Calculate next scheduled backup time next_backup = self._calculate_next_backup_time() return { 'enabled': BACKUP_ENABLED, 'interval': BACKUP_INTERVAL, 'retention_days': BACKUP_RETENTION, 'max_count': BACKUP_MAX_COUNT, 'backup_count': len(backups), 'total_size_bytes': total_size, 'last_backup_time': self.last_backup_time, 'time_since_last_seconds': time_since_last, 'next_backup_at': next_backup.isoformat() if next_backup else None, 'backups_dir': str(self.backups_dir), 'db_path': str(self.db_path) if self.db_path else None } def _calculate_next_backup_time(self) -> Optional[datetime]: """Calculate the next scheduled backup time.""" if not BACKUP_ENABLED or not self.last_backup_time: return None last_backup_dt = datetime.fromtimestamp(self.last_backup_time, tz=timezone.utc) if BACKUP_INTERVAL == 'hourly': return last_backup_dt + timedelta(hours=1) elif BACKUP_INTERVAL == 'daily': return last_backup_dt + timedelta(days=1) elif BACKUP_INTERVAL == 'weekly': return last_backup_dt + timedelta(weeks=1) return None class BackupScheduler: """Scheduler for automatic database backups.""" def __init__(self, backup_service: BackupService = None): """Initialize backup scheduler. Args: backup_service: BackupService instance (creates one if not provided) """ self.backup_service = backup_service or BackupService() self.is_running = False self._task: Optional[asyncio.Task] = None logger.info("BackupScheduler initialized") def _get_interval_seconds(self) -> int: """Get backup interval in seconds.""" if BACKUP_INTERVAL == 'hourly': return 3600 elif BACKUP_INTERVAL == 'daily': return 86400 elif BACKUP_INTERVAL == 'weekly': return 604800 return 86400 # Default to daily async def start(self): """Start the backup scheduler.""" if self.is_running: logger.warning("BackupScheduler already running") return if not BACKUP_ENABLED: logger.info("Backups disabled, scheduler not started") return self.is_running = True self._task = asyncio.create_task(self._schedule_loop()) logger.info(f"BackupScheduler started with {BACKUP_INTERVAL} interval") async def stop(self): """Stop the backup scheduler.""" if not self.is_running: return self.is_running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass logger.info("BackupScheduler stopped") async def _schedule_loop(self): """Main scheduling loop.""" interval_seconds = self._get_interval_seconds() while self.is_running: try: # Check if it's time for a backup should_backup = False if not self.backup_service.last_backup_time: # No previous backup, create one should_backup = True else: time_since_last = time.time() - self.backup_service.last_backup_time if time_since_last >= interval_seconds: should_backup = True if should_backup: logger.info("Scheduled backup triggered") result = await self.backup_service.create_backup( description=f"Scheduled {BACKUP_INTERVAL} backup" ) if result['success']: logger.info(f"Scheduled backup completed: {result['filename']}") else: logger.error(f"Scheduled backup failed: {result.get('error')}") # Sleep for a check interval (every 5 minutes) await asyncio.sleep(300) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in backup scheduler loop: {e}") await asyncio.sleep(60) # Wait before retrying def get_status(self) -> Dict[str, Any]: """Get scheduler status. Returns: Dict with scheduler status and backup service status """ status = self.backup_service.get_status() status['scheduler_running'] = self.is_running return status # Global backup service instance _backup_service: Optional[BackupService] = None _backup_scheduler: Optional[BackupScheduler] = None def get_backup_service() -> BackupService: """Get or create the global backup service instance.""" global _backup_service if _backup_service is None: _backup_service = BackupService() return _backup_service def get_backup_scheduler() -> BackupScheduler: """Get or create the global backup scheduler instance.""" global _backup_scheduler if _backup_scheduler is None: _backup_scheduler = BackupScheduler(get_backup_service()) return _backup_scheduler

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server