Skip to main content
Glama

AnyDocs MCP Server

by funky1688
backup.pyโ€ข24.6 kB
#!/usr/bin/env python3 """ Backup and Recovery Module for AnyDocs MCP Server Comprehensive backup strategies and disaster recovery procedures. """ import asyncio import json import os import shutil import tarfile import gzip import glob from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Any import tempfile try: import boto3 AWS_AVAILABLE = True except ImportError: AWS_AVAILABLE = False boto3 = None try: import aiofiles AIOFILES_AVAILABLE = True except ImportError: AIOFILES_AVAILABLE = False aiofiles = None from ..database.manager import DatabaseManager from ..utils.logging import get_logger logger = get_logger(__name__) class BackupManager: """Comprehensive backup and recovery management.""" def __init__(self, config: Dict[str, Any]): """Initialize backup manager.""" self.config = config self.backup_dir = Path(config.get('backup_dir', './backups')) self.backup_dir.mkdir(parents=True, exist_ok=True) # Storage backends self.local_enabled = config.get('local_enabled', True) self.s3_enabled = config.get('s3_enabled', False) and AWS_AVAILABLE self.retention_days = config.get('retention_days', 30) # AWS S3 configuration if self.s3_enabled: self.s3_bucket = config.get('s3_bucket') self.s3_prefix = config.get('s3_prefix', 'anydocs-backups/') self.s3_client = boto3.client('s3') # Backup schedule self.daily_enabled = config.get('daily_enabled', True) self.weekly_enabled = config.get('weekly_enabled', True) self.monthly_enabled = config.get('monthly_enabled', True) async def create_full_backup(self, backup_name: Optional[str] = None) -> Dict[str, Any]: """Create a full system backup.""" try: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_name = backup_name or f"full_backup_{timestamp}" logger.info(f"Starting full backup: {backup_name}") backup_info = { 'name': backup_name, 'type': 'full', 'timestamp': timestamp, 'created_at': datetime.now().isoformat(), 'components': [], 'size_bytes': 0, 'status': 'in_progress' } # Create backup directory backup_path = self.backup_dir / backup_name backup_path.mkdir(parents=True, exist_ok=True) # Backup database db_backup = await self._backup_database(backup_path) backup_info['components'].append(db_backup) # Backup configuration config_backup = await self._backup_configuration(backup_path) backup_info['components'].append(config_backup) # Backup logs logs_backup = await self._backup_logs(backup_path) backup_info['components'].append(logs_backup) # Backup cached content cache_backup = await self._backup_cache(backup_path) backup_info['components'].append(cache_backup) # Check if any critical component failed failed_components = [c for c in backup_info['components'] if c.get('status') == 'failed'] if failed_components and db_backup.get('status') == 'failed': # If database backup fails, consider the whole backup failed backup_info['status'] = 'failed' backup_info['error'] = 'Critical component (database) backup failed' return backup_info # Create backup archive archive_path = await self._create_archive(backup_path, backup_name) backup_info['archive_path'] = str(archive_path) # Calculate total size backup_info['size_bytes'] = archive_path.stat().st_size # Upload to remote storage if enabled if self.s3_enabled: s3_key = await self._upload_to_s3(archive_path, backup_name) backup_info['s3_key'] = s3_key # Save backup metadata await self._save_backup_metadata(backup_info) # Clean up temporary directory shutil.rmtree(backup_path) backup_info['status'] = 'completed' logger.info(f"Full backup completed: {backup_name}") return backup_info except Exception as e: logger.error(f"Full backup failed: {e}") backup_info['status'] = 'failed' backup_info['error'] = str(e) return backup_info async def _backup_database(self, backup_path: Path) -> Dict[str, Any]: """Backup database.""" try: logger.info("Backing up database...") db_backup_path = backup_path / "database" db_backup_path.mkdir(exist_ok=True) # Export database schema and data # This would typically use pg_dump for PostgreSQL or sqlite3 .dump for SQLite db_dump_file = db_backup_path / "database_dump.sql" # For SQLite (adjust for your database type) import sqlite3 # This is a simplified example - adjust based on your database # You might need to use the actual database URL from configuration db_path = "anydocs.db" # Adjust this if os.path.exists(db_path): with open(db_dump_file, 'w') as f: conn = sqlite3.connect(db_path) for line in conn.iterdump(): f.write('%s\\n' % line) conn.close() # Compress the dump with open(db_dump_file, 'rb') as f_in: with gzip.open(f"{db_dump_file}.gz", 'wb') as f_out: shutil.copyfileobj(f_in, f_out) os.remove(db_dump_file) return { 'component': 'database', 'status': 'success', 'file': f"database/database_dump.sql.gz", 'size_bytes': (db_backup_path / "database_dump.sql.gz").stat().st_size } except Exception as e: logger.error(f"Database backup failed: {e}") return { 'component': 'database', 'status': 'failed', 'error': str(e) } async def _backup_configuration(self, backup_path: Path) -> Dict[str, Any]: """Backup configuration files.""" try: logger.info("Backing up configuration...") config_backup_path = backup_path / "config" config_backup_path.mkdir(exist_ok=True) # Backup config files config_files = [ "config.yaml", "config_proper.yaml", "config_corrected.yaml", ".env" ] backed_up_files = [] total_size = 0 # Use current working directory as base base_dir = os.getcwd() for config_file in config_files: config_file_path = os.path.join(base_dir, config_file) if os.path.exists(config_file_path): dest_path = config_backup_path / config_file shutil.copy2(config_file_path, dest_path) backed_up_files.append(config_file) total_size += dest_path.stat().st_size return { 'component': 'configuration', 'status': 'success', 'files': backed_up_files, 'size_bytes': total_size } except Exception as e: logger.error(f"Configuration backup failed: {e}") return { 'component': 'configuration', 'status': 'failed', 'error': str(e) } async def _backup_logs(self, backup_path: Path) -> Dict[str, Any]: """Backup log files.""" try: logger.info("Backing up logs...") logs_backup_path = backup_path / "logs" logs_backup_path.mkdir(exist_ok=True) # Backup log files (last 7 days) log_dirs = ["logs", ".", "/var/log/anydocs"] log_patterns = ["*.log", "anydocs*.log"] backed_up_files = [] total_size = 0 cutoff_date = datetime.now() - timedelta(days=7) import glob for log_dir in log_dirs: if os.path.exists(log_dir): for pattern in log_patterns: pattern_path = os.path.join(log_dir, pattern) for log_file in glob.glob(pattern_path): try: file_path = Path(log_file) if file_path.stat().st_mtime > cutoff_date.timestamp(): dest_path = logs_backup_path / file_path.name shutil.copy2(file_path, dest_path) backed_up_files.append(str(file_path)) total_size += dest_path.stat().st_size except Exception as e: logger.warning(f"Failed to backup log file {log_file}: {e}") return { 'component': 'logs', 'status': 'success', 'files': backed_up_files, 'size_bytes': total_size } except Exception as e: logger.error(f"Logs backup failed: {e}") return { 'component': 'logs', 'status': 'failed', 'error': str(e) } async def _backup_cache(self, backup_path: Path) -> Dict[str, Any]: """Backup cache data.""" try: logger.info("Backing up cache...") cache_backup_path = backup_path / "cache" cache_backup_path.mkdir(exist_ok=True) # Backup cache directories cache_dirs = ["cache", ".cache", "/tmp/anydocs_cache"] backed_up_dirs = [] total_size = 0 for cache_dir in cache_dirs: if os.path.exists(cache_dir): dest_dir = cache_backup_path / Path(cache_dir).name shutil.copytree(cache_dir, dest_dir, ignore_errors=True) backed_up_dirs.append(cache_dir) # Calculate size for root, dirs, files in os.walk(dest_dir): for file in files: file_path = os.path.join(root, file) try: total_size += os.path.getsize(file_path) except (OSError, IOError): pass return { 'component': 'cache', 'status': 'success', 'directories': backed_up_dirs, 'size_bytes': total_size } except Exception as e: logger.error(f"Cache backup failed: {e}") return { 'component': 'cache', 'status': 'failed', 'error': str(e) } async def _create_archive(self, backup_path: Path, backup_name: str) -> Path: """Create compressed archive of backup.""" archive_path = self.backup_dir / f"{backup_name}.tar.gz" with tarfile.open(archive_path, "w:gz") as tar: tar.add(backup_path, arcname=backup_name) return archive_path async def _upload_to_s3(self, archive_path: Path, backup_name: str) -> str: """Upload backup to S3.""" if not self.s3_enabled: return None s3_key = f"{self.s3_prefix}{backup_name}.tar.gz" try: self.s3_client.upload_file(str(archive_path), self.s3_bucket, s3_key) logger.info(f"Backup uploaded to S3: s3://{self.s3_bucket}/{s3_key}") return s3_key except Exception as e: logger.error(f"S3 upload failed: {e}") raise async def _save_backup_metadata(self, backup_info: Dict[str, Any]): """Save backup metadata.""" metadata_file = self.backup_dir / f"{backup_info['name']}_metadata.json" with open(metadata_file, 'w') as f: json.dump(backup_info, f, indent=2) async def restore_from_backup(self, backup_name: str, components: Optional[List[str]] = None) -> Dict[str, Any]: """Restore from backup.""" try: logger.info(f"Starting restore from backup: {backup_name}") # Find backup backup_archive = self.backup_dir / f"{backup_name}.tar.gz" metadata_file = self.backup_dir / f"{backup_name}_metadata.json" if not backup_archive.exists(): # Try downloading from S3 if self.s3_enabled: s3_key = f"{self.s3_prefix}{backup_name}.tar.gz" self.s3_client.download_file(self.s3_bucket, s3_key, str(backup_archive)) else: raise FileNotFoundError(f"Backup not found: {backup_name}") # Load metadata backup_info = {} if metadata_file.exists(): with open(metadata_file, 'r') as f: backup_info = json.load(f) # Extract backup restore_path = self.backup_dir / f"restore_{backup_name}" restore_path.mkdir(exist_ok=True) with tarfile.open(backup_archive, "r:gz") as tar: tar.extractall(restore_path) extracted_path = restore_path / backup_name # Restore components restore_results = [] if not components or 'database' in components: result = await self._restore_database(extracted_path) restore_results.append(result) if not components or 'configuration' in components: result = await self._restore_configuration(extracted_path) restore_results.append(result) if not components or 'cache' in components: result = await self._restore_cache(extracted_path) restore_results.append(result) # Clean up shutil.rmtree(restore_path) logger.info(f"Restore completed: {backup_name}") return { 'backup_name': backup_name, 'restore_timestamp': datetime.now().isoformat(), 'components_restored': restore_results, 'status': 'completed' } except Exception as e: logger.error(f"Restore failed: {e}") return { 'backup_name': backup_name, 'status': 'failed', 'error': str(e) } async def _restore_database(self, backup_path: Path) -> Dict[str, Any]: """Restore database from backup.""" try: logger.info("Restoring database...") db_backup_path = backup_path / "database" / "database_dump.sql.gz" if db_backup_path.exists(): # Decompress and restore with gzip.open(db_backup_path, 'rt') as f: sql_content = f.read() # Execute SQL (this is simplified - adjust for your database) import sqlite3 conn = sqlite3.connect("anydocs_restored.db") conn.executescript(sql_content) conn.close() return { 'component': 'database', 'status': 'success', 'message': 'Database restored successfully' } else: return { 'component': 'database', 'status': 'skipped', 'message': 'No database backup found' } except Exception as e: logger.error(f"Database restore failed: {e}") return { 'component': 'database', 'status': 'failed', 'error': str(e) } async def _restore_configuration(self, backup_path: Path) -> Dict[str, Any]: """Restore configuration from backup.""" try: logger.info("Restoring configuration...") config_backup_path = backup_path / "config" if config_backup_path.exists(): restored_files = [] for config_file in config_backup_path.iterdir(): if config_file.is_file(): dest_path = Path(config_file.name) shutil.copy2(config_file, dest_path) restored_files.append(config_file.name) return { 'component': 'configuration', 'status': 'success', 'restored_files': restored_files } else: return { 'component': 'configuration', 'status': 'skipped', 'message': 'No configuration backup found' } except Exception as e: logger.error(f"Configuration restore failed: {e}") return { 'component': 'configuration', 'status': 'failed', 'error': str(e) } async def _restore_cache(self, backup_path: Path) -> Dict[str, Any]: """Restore cache from backup.""" try: logger.info("Restoring cache...") cache_backup_path = backup_path / "cache" if cache_backup_path.exists(): restored_dirs = [] for cache_dir in cache_backup_path.iterdir(): if cache_dir.is_dir(): dest_path = Path("cache") / cache_dir.name dest_path.parent.mkdir(exist_ok=True) if dest_path.exists(): shutil.rmtree(dest_path) shutil.copytree(cache_dir, dest_path) restored_dirs.append(cache_dir.name) return { 'component': 'cache', 'status': 'success', 'restored_directories': restored_dirs } else: return { 'component': 'cache', 'status': 'skipped', 'message': 'No cache backup found' } except Exception as e: logger.error(f"Cache restore failed: {e}") return { 'component': 'cache', 'status': 'failed', 'error': str(e) } async def list_backups(self) -> List[Dict[str, Any]]: """List available backups.""" backups = [] # Local backups for backup_file in self.backup_dir.glob("*.tar.gz"): backup_name = backup_file.stem.replace('.tar', '') metadata_file = self.backup_dir / f"{backup_name}_metadata.json" backup_info = { 'name': backup_name, 'location': 'local', 'size_bytes': backup_file.stat().st_size, 'created_at': datetime.fromtimestamp(backup_file.stat().st_ctime).isoformat() } if metadata_file.exists(): with open(metadata_file, 'r') as f: metadata = json.load(f) backup_info.update(metadata) backups.append(backup_info) # S3 backups (if enabled) if self.s3_enabled: try: response = self.s3_client.list_objects_v2( Bucket=self.s3_bucket, Prefix=self.s3_prefix ) for obj in response.get('Contents', []): if obj['Key'].endswith('.tar.gz'): backup_name = Path(obj['Key']).stem.replace('.tar', '') backups.append({ 'name': backup_name, 'location': 's3', 'size_bytes': obj['Size'], 'created_at': obj['LastModified'].isoformat(), 's3_key': obj['Key'] }) except Exception as e: logger.error(f"Failed to list S3 backups: {e}") return sorted(backups, key=lambda x: x['created_at'], reverse=True) async def cleanup_old_backups(self): """Clean up old backups based on retention policy.""" try: cutoff_date = datetime.now() - timedelta(days=self.retention_days) removed_count = 0 # Clean up local backups for backup_file in self.backup_dir.glob("*.tar.gz"): file_date = datetime.fromtimestamp(backup_file.stat().st_ctime) if file_date < cutoff_date: backup_name = backup_file.stem.replace('.tar', '') metadata_file = self.backup_dir / f"{backup_name}_metadata.json" backup_file.unlink() if metadata_file.exists(): metadata_file.unlink() removed_count += 1 logger.info(f"Removed old backup: {backup_name}") # Clean up S3 backups (if enabled) if self.s3_enabled: try: response = self.s3_client.list_objects_v2( Bucket=self.s3_bucket, Prefix=self.s3_prefix ) for obj in response.get('Contents', []): if obj['LastModified'].replace(tzinfo=None) < cutoff_date: self.s3_client.delete_object( Bucket=self.s3_bucket, Key=obj['Key'] ) removed_count += 1 logger.info(f"Removed old S3 backup: {obj['Key']}") except Exception as e: logger.error(f"Failed to cleanup S3 backups: {e}") logger.info(f"Cleanup completed. Removed {removed_count} old backups.") except Exception as e: logger.error(f"Backup cleanup failed: {e}") # Global backup manager instance _backup_manager: Optional[BackupManager] = None def get_backup_manager(config: Optional[Dict[str, Any]] = None) -> BackupManager: """Get global backup manager instance.""" global _backup_manager if _backup_manager is None: default_config = { 'backup_dir': './backups', 'retention_days': 30, 'local_enabled': True, 's3_enabled': False } _backup_manager = BackupManager(config or default_config) return _backup_manager

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server