backup.pyโข24.6 kB
#!/usr/bin/env python3
"""
Backup and Recovery Module for AnyDocs MCP Server
Comprehensive backup strategies and disaster recovery procedures.
"""
import asyncio
import json
import os
import shutil
import tarfile
import gzip
import glob
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any
import tempfile
try:
import boto3
AWS_AVAILABLE = True
except ImportError:
AWS_AVAILABLE = False
boto3 = None
try:
import aiofiles
AIOFILES_AVAILABLE = True
except ImportError:
AIOFILES_AVAILABLE = False
aiofiles = None
from ..database.manager import DatabaseManager
from ..utils.logging import get_logger
logger = get_logger(__name__)
class BackupManager:
"""Comprehensive backup and recovery management."""
def __init__(self, config: Dict[str, Any]):
"""Initialize backup manager."""
self.config = config
self.backup_dir = Path(config.get('backup_dir', './backups'))
self.backup_dir.mkdir(parents=True, exist_ok=True)
# Storage backends
self.local_enabled = config.get('local_enabled', True)
self.s3_enabled = config.get('s3_enabled', False) and AWS_AVAILABLE
self.retention_days = config.get('retention_days', 30)
# AWS S3 configuration
if self.s3_enabled:
self.s3_bucket = config.get('s3_bucket')
self.s3_prefix = config.get('s3_prefix', 'anydocs-backups/')
self.s3_client = boto3.client('s3')
# Backup schedule
self.daily_enabled = config.get('daily_enabled', True)
self.weekly_enabled = config.get('weekly_enabled', True)
self.monthly_enabled = config.get('monthly_enabled', True)
async def create_full_backup(self, backup_name: Optional[str] = None) -> Dict[str, Any]:
"""Create a full system backup."""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = backup_name or f"full_backup_{timestamp}"
logger.info(f"Starting full backup: {backup_name}")
backup_info = {
'name': backup_name,
'type': 'full',
'timestamp': timestamp,
'created_at': datetime.now().isoformat(),
'components': [],
'size_bytes': 0,
'status': 'in_progress'
}
# Create backup directory
backup_path = self.backup_dir / backup_name
backup_path.mkdir(parents=True, exist_ok=True)
# Backup database
db_backup = await self._backup_database(backup_path)
backup_info['components'].append(db_backup)
# Backup configuration
config_backup = await self._backup_configuration(backup_path)
backup_info['components'].append(config_backup)
# Backup logs
logs_backup = await self._backup_logs(backup_path)
backup_info['components'].append(logs_backup)
# Backup cached content
cache_backup = await self._backup_cache(backup_path)
backup_info['components'].append(cache_backup)
# Check if any critical component failed
failed_components = [c for c in backup_info['components'] if c.get('status') == 'failed']
if failed_components and db_backup.get('status') == 'failed':
# If database backup fails, consider the whole backup failed
backup_info['status'] = 'failed'
backup_info['error'] = 'Critical component (database) backup failed'
return backup_info
# Create backup archive
archive_path = await self._create_archive(backup_path, backup_name)
backup_info['archive_path'] = str(archive_path)
# Calculate total size
backup_info['size_bytes'] = archive_path.stat().st_size
# Upload to remote storage if enabled
if self.s3_enabled:
s3_key = await self._upload_to_s3(archive_path, backup_name)
backup_info['s3_key'] = s3_key
# Save backup metadata
await self._save_backup_metadata(backup_info)
# Clean up temporary directory
shutil.rmtree(backup_path)
backup_info['status'] = 'completed'
logger.info(f"Full backup completed: {backup_name}")
return backup_info
except Exception as e:
logger.error(f"Full backup failed: {e}")
backup_info['status'] = 'failed'
backup_info['error'] = str(e)
return backup_info
async def _backup_database(self, backup_path: Path) -> Dict[str, Any]:
"""Backup database."""
try:
logger.info("Backing up database...")
db_backup_path = backup_path / "database"
db_backup_path.mkdir(exist_ok=True)
# Export database schema and data
# This would typically use pg_dump for PostgreSQL or sqlite3 .dump for SQLite
db_dump_file = db_backup_path / "database_dump.sql"
# For SQLite (adjust for your database type)
import sqlite3
# This is a simplified example - adjust based on your database
# You might need to use the actual database URL from configuration
db_path = "anydocs.db" # Adjust this
if os.path.exists(db_path):
with open(db_dump_file, 'w') as f:
conn = sqlite3.connect(db_path)
for line in conn.iterdump():
f.write('%s\\n' % line)
conn.close()
# Compress the dump
with open(db_dump_file, 'rb') as f_in:
with gzip.open(f"{db_dump_file}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(db_dump_file)
return {
'component': 'database',
'status': 'success',
'file': f"database/database_dump.sql.gz",
'size_bytes': (db_backup_path / "database_dump.sql.gz").stat().st_size
}
except Exception as e:
logger.error(f"Database backup failed: {e}")
return {
'component': 'database',
'status': 'failed',
'error': str(e)
}
async def _backup_configuration(self, backup_path: Path) -> Dict[str, Any]:
"""Backup configuration files."""
try:
logger.info("Backing up configuration...")
config_backup_path = backup_path / "config"
config_backup_path.mkdir(exist_ok=True)
# Backup config files
config_files = [
"config.yaml",
"config_proper.yaml",
"config_corrected.yaml",
".env"
]
backed_up_files = []
total_size = 0
# Use current working directory as base
base_dir = os.getcwd()
for config_file in config_files:
config_file_path = os.path.join(base_dir, config_file)
if os.path.exists(config_file_path):
dest_path = config_backup_path / config_file
shutil.copy2(config_file_path, dest_path)
backed_up_files.append(config_file)
total_size += dest_path.stat().st_size
return {
'component': 'configuration',
'status': 'success',
'files': backed_up_files,
'size_bytes': total_size
}
except Exception as e:
logger.error(f"Configuration backup failed: {e}")
return {
'component': 'configuration',
'status': 'failed',
'error': str(e)
}
async def _backup_logs(self, backup_path: Path) -> Dict[str, Any]:
"""Backup log files."""
try:
logger.info("Backing up logs...")
logs_backup_path = backup_path / "logs"
logs_backup_path.mkdir(exist_ok=True)
# Backup log files (last 7 days)
log_dirs = ["logs", ".", "/var/log/anydocs"]
log_patterns = ["*.log", "anydocs*.log"]
backed_up_files = []
total_size = 0
cutoff_date = datetime.now() - timedelta(days=7)
import glob
for log_dir in log_dirs:
if os.path.exists(log_dir):
for pattern in log_patterns:
pattern_path = os.path.join(log_dir, pattern)
for log_file in glob.glob(pattern_path):
try:
file_path = Path(log_file)
if file_path.stat().st_mtime > cutoff_date.timestamp():
dest_path = logs_backup_path / file_path.name
shutil.copy2(file_path, dest_path)
backed_up_files.append(str(file_path))
total_size += dest_path.stat().st_size
except Exception as e:
logger.warning(f"Failed to backup log file {log_file}: {e}")
return {
'component': 'logs',
'status': 'success',
'files': backed_up_files,
'size_bytes': total_size
}
except Exception as e:
logger.error(f"Logs backup failed: {e}")
return {
'component': 'logs',
'status': 'failed',
'error': str(e)
}
async def _backup_cache(self, backup_path: Path) -> Dict[str, Any]:
"""Backup cache data."""
try:
logger.info("Backing up cache...")
cache_backup_path = backup_path / "cache"
cache_backup_path.mkdir(exist_ok=True)
# Backup cache directories
cache_dirs = ["cache", ".cache", "/tmp/anydocs_cache"]
backed_up_dirs = []
total_size = 0
for cache_dir in cache_dirs:
if os.path.exists(cache_dir):
dest_dir = cache_backup_path / Path(cache_dir).name
shutil.copytree(cache_dir, dest_dir, ignore_errors=True)
backed_up_dirs.append(cache_dir)
# Calculate size
for root, dirs, files in os.walk(dest_dir):
for file in files:
file_path = os.path.join(root, file)
try:
total_size += os.path.getsize(file_path)
except (OSError, IOError):
pass
return {
'component': 'cache',
'status': 'success',
'directories': backed_up_dirs,
'size_bytes': total_size
}
except Exception as e:
logger.error(f"Cache backup failed: {e}")
return {
'component': 'cache',
'status': 'failed',
'error': str(e)
}
async def _create_archive(self, backup_path: Path, backup_name: str) -> Path:
"""Create compressed archive of backup."""
archive_path = self.backup_dir / f"{backup_name}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
tar.add(backup_path, arcname=backup_name)
return archive_path
async def _upload_to_s3(self, archive_path: Path, backup_name: str) -> str:
"""Upload backup to S3."""
if not self.s3_enabled:
return None
s3_key = f"{self.s3_prefix}{backup_name}.tar.gz"
try:
self.s3_client.upload_file(str(archive_path), self.s3_bucket, s3_key)
logger.info(f"Backup uploaded to S3: s3://{self.s3_bucket}/{s3_key}")
return s3_key
except Exception as e:
logger.error(f"S3 upload failed: {e}")
raise
async def _save_backup_metadata(self, backup_info: Dict[str, Any]):
"""Save backup metadata."""
metadata_file = self.backup_dir / f"{backup_info['name']}_metadata.json"
with open(metadata_file, 'w') as f:
json.dump(backup_info, f, indent=2)
async def restore_from_backup(self, backup_name: str, components: Optional[List[str]] = None) -> Dict[str, Any]:
"""Restore from backup."""
try:
logger.info(f"Starting restore from backup: {backup_name}")
# Find backup
backup_archive = self.backup_dir / f"{backup_name}.tar.gz"
metadata_file = self.backup_dir / f"{backup_name}_metadata.json"
if not backup_archive.exists():
# Try downloading from S3
if self.s3_enabled:
s3_key = f"{self.s3_prefix}{backup_name}.tar.gz"
self.s3_client.download_file(self.s3_bucket, s3_key, str(backup_archive))
else:
raise FileNotFoundError(f"Backup not found: {backup_name}")
# Load metadata
backup_info = {}
if metadata_file.exists():
with open(metadata_file, 'r') as f:
backup_info = json.load(f)
# Extract backup
restore_path = self.backup_dir / f"restore_{backup_name}"
restore_path.mkdir(exist_ok=True)
with tarfile.open(backup_archive, "r:gz") as tar:
tar.extractall(restore_path)
extracted_path = restore_path / backup_name
# Restore components
restore_results = []
if not components or 'database' in components:
result = await self._restore_database(extracted_path)
restore_results.append(result)
if not components or 'configuration' in components:
result = await self._restore_configuration(extracted_path)
restore_results.append(result)
if not components or 'cache' in components:
result = await self._restore_cache(extracted_path)
restore_results.append(result)
# Clean up
shutil.rmtree(restore_path)
logger.info(f"Restore completed: {backup_name}")
return {
'backup_name': backup_name,
'restore_timestamp': datetime.now().isoformat(),
'components_restored': restore_results,
'status': 'completed'
}
except Exception as e:
logger.error(f"Restore failed: {e}")
return {
'backup_name': backup_name,
'status': 'failed',
'error': str(e)
}
async def _restore_database(self, backup_path: Path) -> Dict[str, Any]:
"""Restore database from backup."""
try:
logger.info("Restoring database...")
db_backup_path = backup_path / "database" / "database_dump.sql.gz"
if db_backup_path.exists():
# Decompress and restore
with gzip.open(db_backup_path, 'rt') as f:
sql_content = f.read()
# Execute SQL (this is simplified - adjust for your database)
import sqlite3
conn = sqlite3.connect("anydocs_restored.db")
conn.executescript(sql_content)
conn.close()
return {
'component': 'database',
'status': 'success',
'message': 'Database restored successfully'
}
else:
return {
'component': 'database',
'status': 'skipped',
'message': 'No database backup found'
}
except Exception as e:
logger.error(f"Database restore failed: {e}")
return {
'component': 'database',
'status': 'failed',
'error': str(e)
}
async def _restore_configuration(self, backup_path: Path) -> Dict[str, Any]:
"""Restore configuration from backup."""
try:
logger.info("Restoring configuration...")
config_backup_path = backup_path / "config"
if config_backup_path.exists():
restored_files = []
for config_file in config_backup_path.iterdir():
if config_file.is_file():
dest_path = Path(config_file.name)
shutil.copy2(config_file, dest_path)
restored_files.append(config_file.name)
return {
'component': 'configuration',
'status': 'success',
'restored_files': restored_files
}
else:
return {
'component': 'configuration',
'status': 'skipped',
'message': 'No configuration backup found'
}
except Exception as e:
logger.error(f"Configuration restore failed: {e}")
return {
'component': 'configuration',
'status': 'failed',
'error': str(e)
}
async def _restore_cache(self, backup_path: Path) -> Dict[str, Any]:
"""Restore cache from backup."""
try:
logger.info("Restoring cache...")
cache_backup_path = backup_path / "cache"
if cache_backup_path.exists():
restored_dirs = []
for cache_dir in cache_backup_path.iterdir():
if cache_dir.is_dir():
dest_path = Path("cache") / cache_dir.name
dest_path.parent.mkdir(exist_ok=True)
if dest_path.exists():
shutil.rmtree(dest_path)
shutil.copytree(cache_dir, dest_path)
restored_dirs.append(cache_dir.name)
return {
'component': 'cache',
'status': 'success',
'restored_directories': restored_dirs
}
else:
return {
'component': 'cache',
'status': 'skipped',
'message': 'No cache backup found'
}
except Exception as e:
logger.error(f"Cache restore failed: {e}")
return {
'component': 'cache',
'status': 'failed',
'error': str(e)
}
async def list_backups(self) -> List[Dict[str, Any]]:
"""List available backups."""
backups = []
# Local backups
for backup_file in self.backup_dir.glob("*.tar.gz"):
backup_name = backup_file.stem.replace('.tar', '')
metadata_file = self.backup_dir / f"{backup_name}_metadata.json"
backup_info = {
'name': backup_name,
'location': 'local',
'size_bytes': backup_file.stat().st_size,
'created_at': datetime.fromtimestamp(backup_file.stat().st_ctime).isoformat()
}
if metadata_file.exists():
with open(metadata_file, 'r') as f:
metadata = json.load(f)
backup_info.update(metadata)
backups.append(backup_info)
# S3 backups (if enabled)
if self.s3_enabled:
try:
response = self.s3_client.list_objects_v2(
Bucket=self.s3_bucket,
Prefix=self.s3_prefix
)
for obj in response.get('Contents', []):
if obj['Key'].endswith('.tar.gz'):
backup_name = Path(obj['Key']).stem.replace('.tar', '')
backups.append({
'name': backup_name,
'location': 's3',
'size_bytes': obj['Size'],
'created_at': obj['LastModified'].isoformat(),
's3_key': obj['Key']
})
except Exception as e:
logger.error(f"Failed to list S3 backups: {e}")
return sorted(backups, key=lambda x: x['created_at'], reverse=True)
async def cleanup_old_backups(self):
"""Clean up old backups based on retention policy."""
try:
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
removed_count = 0
# Clean up local backups
for backup_file in self.backup_dir.glob("*.tar.gz"):
file_date = datetime.fromtimestamp(backup_file.stat().st_ctime)
if file_date < cutoff_date:
backup_name = backup_file.stem.replace('.tar', '')
metadata_file = self.backup_dir / f"{backup_name}_metadata.json"
backup_file.unlink()
if metadata_file.exists():
metadata_file.unlink()
removed_count += 1
logger.info(f"Removed old backup: {backup_name}")
# Clean up S3 backups (if enabled)
if self.s3_enabled:
try:
response = self.s3_client.list_objects_v2(
Bucket=self.s3_bucket,
Prefix=self.s3_prefix
)
for obj in response.get('Contents', []):
if obj['LastModified'].replace(tzinfo=None) < cutoff_date:
self.s3_client.delete_object(
Bucket=self.s3_bucket,
Key=obj['Key']
)
removed_count += 1
logger.info(f"Removed old S3 backup: {obj['Key']}")
except Exception as e:
logger.error(f"Failed to cleanup S3 backups: {e}")
logger.info(f"Cleanup completed. Removed {removed_count} old backups.")
except Exception as e:
logger.error(f"Backup cleanup failed: {e}")
# Global backup manager instance
_backup_manager: Optional[BackupManager] = None
def get_backup_manager(config: Optional[Dict[str, Any]] = None) -> BackupManager:
"""Get global backup manager instance."""
global _backup_manager
if _backup_manager is None:
default_config = {
'backup_dir': './backups',
'retention_days': 30,
'local_enabled': True,
's3_enabled': False
}
_backup_manager = BackupManager(config or default_config)
return _backup_manager