Skip to main content
Glama

Ultimate MCP Coding Platform

backup.py9.58 kB
"""Automated Neo4j backup strategy.""" from __future__ import annotations import asyncio import logging import os import shutil from datetime import datetime, timedelta from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) class Neo4jBackupManager: """Automated Neo4j backup management.""" def __init__( self, backup_dir: str = "/backups", container_name: str = "ultimate_mcp_neo4j", retention_days: int = 7, max_backups: int = 10, ): self.backup_dir = Path(backup_dir) self.container_name = container_name self.retention_days = retention_days self.max_backups = max_backups # Ensure backup directory exists self.backup_dir.mkdir(parents=True, exist_ok=True) async def create_backup(self) -> Optional[Path]: """Create a new backup of the Neo4j database.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_name = f"neo4j_backup_{timestamp}.dump" backup_path = self.backup_dir / backup_name try: # Create backup using neo4j-admin dump cmd = [ "docker", "exec", self.container_name, "neo4j-admin", "database", "dump", "neo4j", "--to-path=/backups", f"--overwrite-destination=true" ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: # Copy backup from container to host copy_cmd = [ "docker", "cp", f"{self.container_name}:/backups/neo4j.dump", str(backup_path) ] copy_process = await asyncio.create_subprocess_exec( *copy_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) await copy_process.communicate() if copy_process.returncode == 0: logger.info("Backup created successfully", extra={ "backup_path": str(backup_path), "size_mb": backup_path.stat().st_size / (1024 * 1024) }) return backup_path else: logger.error("Failed to copy backup from container") else: logger.error("Backup creation failed", extra={ "stderr": stderr.decode() if stderr else "No error output" }) except Exception as e: logger.error("Backup creation exception", extra={"error": str(e)}) return None async def restore_backup(self, backup_path: Path) -> bool: """Restore database from backup.""" if not backup_path.exists(): logger.error("Backup file not found", extra={"path": str(backup_path)}) return False try: # Stop Neo4j service stop_cmd = ["docker", "exec", self.container_name, "neo4j", "stop"] await asyncio.create_subprocess_exec(*stop_cmd) # Wait for service to stop await asyncio.sleep(5) # Copy backup to container copy_cmd = [ "docker", "cp", str(backup_path), f"{self.container_name}:/backups/restore.dump" ] copy_process = await asyncio.create_subprocess_exec( *copy_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) await copy_process.communicate() if copy_process.returncode != 0: logger.error("Failed to copy backup to container") return False # Restore database restore_cmd = [ "docker", "exec", self.container_name, "neo4j-admin", "database", "load", "neo4j", "--from-path=/backups", "--overwrite-destination=true" ] restore_process = await asyncio.create_subprocess_exec( *restore_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await restore_process.communicate() if restore_process.returncode == 0: # Start Neo4j service start_cmd = ["docker", "exec", self.container_name, "neo4j", "start"] await asyncio.create_subprocess_exec(*start_cmd) logger.info("Database restored successfully", extra={ "backup_path": str(backup_path) }) return True else: logger.error("Database restore failed", extra={ "stderr": stderr.decode() if stderr else "No error output" }) except Exception as e: logger.error("Restore exception", extra={"error": str(e)}) return False async def cleanup_old_backups(self) -> None: """Remove old backups based on retention policy.""" try: backups = list(self.backup_dir.glob("neo4j_backup_*.dump")) backups.sort(key=lambda x: x.stat().st_mtime, reverse=True) # Remove backups older than retention period cutoff_time = datetime.now() - timedelta(days=self.retention_days) removed_count = 0 for backup in backups: backup_time = datetime.fromtimestamp(backup.stat().st_mtime) # Keep max_backups most recent, regardless of age if len(backups) - removed_count <= self.max_backups: break if backup_time < cutoff_time: backup.unlink() removed_count += 1 logger.info("Removed old backup", extra={"path": str(backup)}) # Also enforce max backup count remaining_backups = list(self.backup_dir.glob("neo4j_backup_*.dump")) remaining_backups.sort(key=lambda x: x.stat().st_mtime, reverse=True) if len(remaining_backups) > self.max_backups: for backup in remaining_backups[self.max_backups:]: backup.unlink() removed_count += 1 logger.info("Removed excess backup", extra={"path": str(backup)}) if removed_count > 0: logger.info("Backup cleanup completed", extra={"removed": removed_count}) except Exception as e: logger.error("Backup cleanup failed", extra={"error": str(e)}) async def list_backups(self) -> list[dict]: """List available backups with metadata.""" backups = [] try: for backup_path in self.backup_dir.glob("neo4j_backup_*.dump"): stat = backup_path.stat() backups.append({ "name": backup_path.name, "path": str(backup_path), "size_mb": round(stat.st_size / (1024 * 1024), 2), "created_at": datetime.fromtimestamp(stat.st_mtime).isoformat(), "age_days": (datetime.now() - datetime.fromtimestamp(stat.st_mtime)).days, }) # Sort by creation time, newest first backups.sort(key=lambda x: x["created_at"], reverse=True) except Exception as e: logger.error("Failed to list backups", extra={"error": str(e)}) return backups async def schedule_backups(self, interval_hours: int = 24) -> None: """Schedule automatic backups.""" logger.info("Starting backup scheduler", extra={"interval_hours": interval_hours}) while True: try: # Create backup backup_path = await self.create_backup() if backup_path: # Cleanup old backups await self.cleanup_old_backups() # Wait for next backup await asyncio.sleep(interval_hours * 3600) except Exception as e: logger.error("Backup scheduler error", extra={"error": str(e)}) # Wait 1 hour before retrying on error await asyncio.sleep(3600) # Global backup manager _backup_manager: Optional[Neo4jBackupManager] = None def get_backup_manager() -> Neo4jBackupManager: """Get global backup manager.""" global _backup_manager if _backup_manager is None: _backup_manager = Neo4jBackupManager() return _backup_manager def init_backup_manager(**kwargs) -> Neo4jBackupManager: """Initialize backup manager.""" global _backup_manager _backup_manager = Neo4jBackupManager(**kwargs) return _backup_manager

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Senpai-Sama7/Ultimate_MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server