Skip to main content
Glama
git_backup.py12.7 kB
"""Git integration for automated backups of STM storage. Provides automatic commits, snapshots, and restore functionality. """ import time from datetime import datetime from pathlib import Path from typing import Any from git import Repo from git.exc import InvalidGitRepositoryError class GitBackup: """Git-based backup manager for STM storage.""" def __init__(self, storage_dir: Path, auto_commit: bool = True): """ Initialize Git backup manager. Args: storage_dir: Directory containing JSONL storage files auto_commit: If True, enable automatic commits """ self.storage_dir = storage_dir self.auto_commit = auto_commit self.repo: Repo | None = None # Tracking for auto-commit self._last_commit_time = 0 self._commit_interval = 3600 # Default: 1 hour in seconds self._pending_changes = False def initialize(self) -> None: """ Initialize Git repository in storage directory. Creates a new repo if one doesn't exist. """ try: # Try to open existing repository self.repo = Repo(self.storage_dir) except InvalidGitRepositoryError: # Initialize new repository self.repo = Repo.init(self.storage_dir) # Create initial commit self._create_commit("Initial STM storage", initial=True) def set_commit_interval(self, seconds: int) -> None: """ Set the interval for automatic commits. Args: seconds: Interval in seconds between auto-commits """ self._commit_interval = seconds def mark_dirty(self) -> None: """Mark that changes have been made (for auto-commit tracking).""" self._pending_changes = True def _create_commit(self, message: str, initial: bool = False) -> str: """ Create a git commit with all changes. Args: message: Commit message initial: If True, this is the initial commit Returns: Commit SHA Raises: GitCommandError: If commit fails """ if self.repo is None: raise RuntimeError("Git repository not initialized") # Add all files if initial: # For initial commit, add .gitignore gitignore_path = self.storage_dir / ".gitignore" if not gitignore_path.exists(): with open(gitignore_path, "w") as f: f.write("# Python\n__pycache__/\n*.py[cod]\n*$py.class\n\n") f.write("# Temp files\n*.tmp\n*.swp\n.DS_Store\n") # Stage all changes self.repo.index.add("*") # Check if there are changes to commit if not self.repo.is_dirty() and not initial: # No changes to commit return self.repo.head.commit.hexsha # Create commit commit = self.repo.index.commit(message) # Update tracking self._last_commit_time = int(time.time()) self._pending_changes = False return commit.hexsha def create_snapshot(self, message: str | None = None) -> str: """ Create a snapshot (commit) of current storage state. Args: message: Custom commit message (default: auto-generated with timestamp) Returns: Commit SHA """ if self.repo is None: raise RuntimeError("Git repository not initialized") if message is None: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") message = f"STM snapshot - {timestamp}" return self._create_commit(message) def auto_commit_if_needed(self) -> str | None: """ Perform automatic commit if interval has elapsed and there are changes. Returns: Commit SHA if commit was created, None otherwise """ if not self.auto_commit: return None if not self._pending_changes: return None now = int(time.time()) if now - self._last_commit_time < self._commit_interval: return None # Time to commit timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") message = f"Auto-commit - {timestamp}" return self._create_commit(message) def get_history(self, limit: int = 20) -> list[dict[str, Any]]: """ Get commit history. Args: limit: Maximum number of commits to return Returns: List of commit information dictionaries """ if self.repo is None: raise RuntimeError("Git repository not initialized") commits = [] for commit in list(self.repo.iter_commits())[:limit]: commits.append( { "sha": commit.hexsha, "short_sha": commit.hexsha[:7], "message": commit.message.strip(), "author": str(commit.author), "timestamp": commit.committed_date, "datetime": datetime.fromtimestamp(commit.committed_date).isoformat(), } ) return commits def restore_snapshot(self, commit_sha: str) -> None: """ Restore storage to a specific commit. Args: commit_sha: SHA of commit to restore to Raises: GitCommandError: If restore fails """ if self.repo is None: raise RuntimeError("Git repository not initialized") # Check for uncommitted changes if self.repo.is_dirty(): raise RuntimeError("Repository has uncommitted changes. Commit or stash them first.") # Reset to the specified commit self.repo.git.reset("--hard", commit_sha) def get_status(self) -> dict[str, Any]: """ Get repository status. Returns: Dictionary with status information """ if self.repo is None: return { "initialized": False, "error": "Repository not initialized", } try: return { "initialized": True, "dirty": self.repo.is_dirty(), "untracked_files": self.repo.untracked_files, "current_branch": self.repo.active_branch.name, "total_commits": len(list(self.repo.iter_commits())), "last_commit": { "sha": self.repo.head.commit.hexsha[:7], "message": self.repo.head.commit.message.strip(), "timestamp": self.repo.head.commit.committed_date, } if self.repo.head.is_valid() else None, "auto_commit_enabled": self.auto_commit, "pending_changes": self._pending_changes, "time_until_next_commit": max( 0, self._commit_interval - (int(time.time()) - self._last_commit_time) ) if self.auto_commit else None, } except Exception as e: return { "initialized": True, "error": str(e), } def create_branch(self, branch_name: str) -> None: """ Create a new branch. Args: branch_name: Name of the branch to create """ if self.repo is None: raise RuntimeError("Git repository not initialized") self.repo.create_head(branch_name) def switch_branch(self, branch_name: str) -> None: """ Switch to a different branch. Args: branch_name: Name of the branch to switch to Raises: GitCommandError: If branch doesn't exist or switch fails """ if self.repo is None: raise RuntimeError("Git repository not initialized") self.repo.git.checkout(branch_name) def diff_commits(self, commit_sha_a: str, commit_sha_b: str) -> str: """ Get diff between two commits. Args: commit_sha_a: First commit SHA commit_sha_b: Second commit SHA Returns: Diff output as string """ if self.repo is None: raise RuntimeError("Git repository not initialized") # GitPython returns a Git object response; ensure string type for API contract return str(self.repo.git.diff(commit_sha_a, commit_sha_b)) def get_stats(self) -> dict[str, Any]: """ Get repository statistics. Returns: Dictionary with statistics """ if self.repo is None: return {"error": "Repository not initialized"} try: commits = list(self.repo.iter_commits()) # Calculate stats total_additions = 0 total_deletions = 0 for commit in commits: stats = commit.stats.total total_additions += stats.get("insertions", 0) total_deletions += stats.get("deletions", 0) return { "total_commits": len(commits), "total_additions": total_additions, "total_deletions": total_deletions, "branches": [head.name for head in self.repo.heads], "current_branch": self.repo.active_branch.name, "repo_size_mb": sum( f.stat().st_size for f in Path(self.repo.git_dir).rglob("*") if f.is_file() ) / (1024 * 1024), } except Exception as e: return {"error": str(e)} def main() -> int: """CLI entry point for Git backup operations.""" import argparse import json import sys parser = argparse.ArgumentParser(description="Manage Git backups for STM storage") parser.add_argument( "storage_dir", type=Path, help="Path to STM storage directory", ) subparsers = parser.add_subparsers(dest="command", help="Command to execute") # Initialize command subparsers.add_parser("init", help="Initialize Git repository") # Snapshot command snapshot_parser = subparsers.add_parser("snapshot", help="Create a snapshot") snapshot_parser.add_argument("-m", "--message", help="Commit message") # History command history_parser = subparsers.add_parser("history", help="Show commit history") history_parser.add_argument("-n", "--limit", type=int, default=20, help="Number of commits") # Status command subparsers.add_parser("status", help="Show repository status") # Restore command restore_parser = subparsers.add_parser("restore", help="Restore to a commit") restore_parser.add_argument("commit_sha", help="Commit SHA to restore to") # Stats command subparsers.add_parser("stats", help="Show repository statistics") args = parser.parse_args() if not args.command: parser.print_help() return 1 try: backup = GitBackup(storage_dir=args.storage_dir, auto_commit=False) if args.command == "init": backup.initialize() print("✓ Git repository initialized") elif args.command == "snapshot": backup.initialize() commit_sha = backup.create_snapshot(message=args.message) print(f"✓ Snapshot created: {commit_sha[:7]}") elif args.command == "history": backup.initialize() history = backup.get_history(limit=args.limit) print(f"\nCommit history ({len(history)} commits):\n") for commit in history: print(f" {commit['short_sha']} - {commit['datetime']}") print(f" {commit['message']}") print() elif args.command == "status": backup.initialize() status = backup.get_status() print("\nRepository status:\n") print(json.dumps(status, indent=2)) elif args.command == "restore": backup.initialize() backup.restore_snapshot(args.commit_sha) print(f"✓ Restored to commit {args.commit_sha[:7]}") elif args.command == "stats": backup.initialize() stats = backup.get_stats() print("\nRepository statistics:\n") print(json.dumps(stats, indent=2)) return 0 except Exception as e: print(f"Error: {e}", file=sys.stderr) return 1 if __name__ == "__main__": import sys sys.exit(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server