Skip to main content
Glama

MCP Git Server

by MementoRC
git_primitives.py16.1 kB
""" Git primitive operations for MCP Git Server. This module provides atomic, indivisible Git operations that serve as the foundation for higher-level Git functionality. These primitives handle basic Git commands, repository validation, status parsing, and error handling. Design Principles: - Single responsibility: Each function does exactly one thing - No side effects: Functions are pure where possible - Clear error handling: Consistent exception patterns - Type safety: Comprehensive type hints and validation - Performance: Fast execution with minimal overhead Critical for TDD Compliance: This module implements the interface defined by test specifications. DO NOT modify tests to match this implementation - this implementation must satisfy the test requirements to prevent LLM compliance issues. """ import re import subprocess from dataclasses import dataclass from pathlib import Path from typing import Any # Exception classes for error handling class GitCommandError(Exception): """Raised when a git command fails or is invalid.""" def __init__( self, message: str, command: list[str] | None = None, repo_path: str | None = None, return_code: int | None = None, stderr: str | None = None, ): super().__init__(message) self.command = command self.repo_path = repo_path self.return_code = return_code self.stderr = stderr class GitRepositoryError(Exception): """Raised when repository operations fail due to repository state.""" def __init__( self, message: str, repo_path: str | None = None, suggested_action: str | None = None, ): super().__init__(message) self.repo_path = repo_path self.suggested_action = suggested_action class GitValidationError(Exception): """Raised when validation of git-related data fails.""" def __init__( self, message: str, field: str | None = None, value: Any | None = None, validation_rule: str | None = None, ): super().__init__(message) self.field = field self.value = value self.validation_rule = validation_rule # Result classes for structured return values @dataclass class GitCommandResult: """Result of executing a git command.""" success: bool output: str error: str | None = None return_code: int = 0 @dataclass class GitStatusParsed: """Parsed git status output.""" modified_files: list[str] added_files: list[str] deleted_files: list[str] untracked_files: list[str] renamed_files: list[str] copied_files: list[str] @dataclass class GitCommitParsed: """Parsed git commit information.""" hash: str author_name: str author_email: str message: str date: str @dataclass class GitRepositoryStatus: """Complete repository status information.""" is_clean: bool modified_files: list[str] untracked_files: list[str] staged_files: list[str] @dataclass class GitValidationResult: """Result of repository path validation.""" is_valid: bool absolute_path: Path @dataclass class GitFormattedError: """Formatted git error with context.""" message: str context: str command: list[str] suggestion: str | None = None # Core primitive operations def execute_git_command( repo_path: str, command: list[str], timeout: int = 30 ) -> GitCommandResult: """ Execute a git command in the specified repository. Args: repo_path: Path to the git repository command: Git command arguments (e.g., ['status', '--porcelain']) timeout: Command timeout in seconds Returns: GitCommandResult with success status and output Raises: GitRepositoryError: If repository path doesn't exist GitCommandError: If command is invalid or fails """ # Validate command is not empty if not command: raise GitCommandError( "Invalid git command: command list cannot be empty", command=command, repo_path=repo_path, ) # Prepare full command full_command = ["git"] + command try: # Execute command result = subprocess.run( full_command, cwd=repo_path, capture_output=True, text=True, timeout=timeout ) if result.returncode == 0: return GitCommandResult( success=True, output=result.stdout.strip(), return_code=result.returncode, ) else: return GitCommandResult( success=False, output=result.stdout.strip(), error=result.stderr.strip(), return_code=result.returncode, ) except subprocess.TimeoutExpired: return GitCommandResult( success=False, output="", error=f"timeout: Command timed out after {timeout} seconds", return_code=-1, ) except FileNotFoundError as e: # Repository path doesn't exist raise GitRepositoryError( f"Repository path does not exist: {repo_path}", repo_path=repo_path, suggested_action="Verify the repository path is correct", ) from e except Exception as e: return GitCommandResult( success=False, output="", error=f"Command execution failed: {str(e)}", return_code=-1, ) def is_git_repository(repo_path: str) -> bool: """ Check if the given path is a git repository. Args: repo_path: Path to check Returns: True if path is a git repository, False otherwise """ repo_path_obj = Path(repo_path) # Check if path exists and is a directory if not repo_path_obj.exists() or not repo_path_obj.is_dir(): return False # Check for .git directory git_dirs = list(repo_path_obj.glob(".git")) return len(git_dirs) > 0 def validate_repository_path(repo_path: str) -> GitValidationResult: """ Validate a repository path and return validation result. Args: repo_path: Path to validate Returns: GitValidationResult with validation status and normalized path Raises: GitValidationError: If path is invalid """ repo_path_obj = Path(repo_path) # Check if path exists if not repo_path_obj.exists(): raise GitValidationError( f"Invalid repository path: path does not exist - {repo_path}", field="repo_path", value=repo_path, validation_rule="path_must_exist", ) # Check if it's a git repository if not is_git_repository(repo_path): raise GitValidationError( f"Invalid repository path: not a git repository - {repo_path}", field="repo_path", value=repo_path, validation_rule="must_be_git_repository", ) return GitValidationResult(is_valid=True, absolute_path=repo_path_obj.resolve()) def get_repository_status(repo_path: str) -> GitRepositoryStatus: """ Get the complete status of a git repository. Args: repo_path: Path to the git repository Returns: GitRepositoryStatus with file status information """ result = execute_git_command(repo_path, ["status", "--porcelain"]) if not result.success: raise GitRepositoryError( f"Failed to get repository status: {result.error}", repo_path=repo_path ) # Parse status output parsed = parse_git_status_output(result.output) # Determine if repository is clean is_clean = ( len(parsed.modified_files) == 0 and len(parsed.added_files) == 0 and len(parsed.deleted_files) == 0 and len(parsed.untracked_files) == 0 ) return GitRepositoryStatus( is_clean=is_clean, modified_files=parsed.modified_files, untracked_files=parsed.untracked_files, staged_files=parsed.added_files, ) def get_staged_files(repo_path: str) -> list[str]: """ Get list of staged files in the repository. Args: repo_path: Path to the git repository Returns: List of staged file paths """ result = execute_git_command(repo_path, ["status", "--porcelain"]) if not result.success: raise GitRepositoryError( f"Failed to get staged files: {result.error}", repo_path=repo_path ) parsed = parse_git_status_output(result.output) return parsed.added_files def get_unstaged_files(repo_path: str) -> list[str]: """ Get list of unstaged modified files in the repository. Args: repo_path: Path to the git repository Returns: List of unstaged modified file paths """ result = execute_git_command(repo_path, ["status", "--porcelain"]) if not result.success: raise GitRepositoryError( f"Failed to get unstaged files: {result.error}", repo_path=repo_path ) parsed = parse_git_status_output(result.output) return parsed.modified_files + parsed.deleted_files def get_untracked_files(repo_path: str) -> list[str]: """ Get list of untracked files in the repository. Args: repo_path: Path to the git repository Returns: List of untracked file paths """ result = execute_git_command(repo_path, ["status", "--porcelain"]) if not result.success: raise GitRepositoryError( f"Failed to get untracked files: {result.error}", repo_path=repo_path ) parsed = parse_git_status_output(result.output) return parsed.untracked_files def get_current_branch(repo_path: str) -> str | None: """ Get the current branch name. Args: repo_path: Path to the git repository Returns: Current branch name, or None if in detached HEAD state """ result = execute_git_command(repo_path, ["branch", "--show-current"]) if not result.success: raise GitRepositoryError( f"Failed to get current branch: {result.error}", repo_path=repo_path ) branch_name = result.output.strip() # Check for detached HEAD if not branch_name or "detached" in branch_name.lower(): return None return branch_name def get_commit_hash(repo_path: str, short: bool = False) -> str: """ Get the current commit hash. Args: repo_path: Path to the git repository short: If True, return short hash format Returns: Current commit hash """ command = ["rev-parse"] if short: command.append("--short") command.append("HEAD") result = execute_git_command(repo_path, command) if not result.success: raise GitRepositoryError( f"Failed to get commit hash: {result.error}", repo_path=repo_path ) return result.output.strip() def parse_git_status_output(status_output: str) -> GitStatusParsed: """ Parse git status --porcelain output into categorized file lists. Args: status_output: Raw git status --porcelain output Returns: GitStatusParsed with categorized file lists """ modified_files = [] added_files = [] deleted_files = [] untracked_files = [] renamed_files = [] copied_files = [] for line in status_output.split("\n"): if not line.strip(): continue # Parse status codes (first two characters) status_codes = line[:2] filename = line[3:].strip() # Handle different status combinations if status_codes == "??": untracked_files.append(filename) elif status_codes[0] == "A": added_files.append(filename) elif status_codes[0] == "M" or status_codes[1] == "M": if status_codes[0] == " ": # Unstaged modification modified_files.append(filename) else: # Staged modification added_files.append(filename) elif status_codes[0] == "D" or status_codes[1] == "D": deleted_files.append(filename) elif status_codes[0] == "R": renamed_files.append(filename) elif status_codes[0] == "C": copied_files.append(filename) return GitStatusParsed( modified_files=modified_files, added_files=added_files, deleted_files=deleted_files, untracked_files=untracked_files, renamed_files=renamed_files, copied_files=copied_files, ) def parse_git_log_output(log_output: str) -> list[GitCommitParsed]: """ Parse git log output into commit information. Args: log_output: Raw git log output Returns: List of GitCommitParsed objects """ commits = [] current_commit = None current_message_lines = [] for line in log_output.split("\n"): if line.startswith("commit "): # Save previous commit if exists if current_commit: current_commit.message = "\n".join(current_message_lines).strip() commits.append(current_commit) current_message_lines = [] # Start new commit commit_hash = line.split(" ")[1] current_commit = GitCommitParsed( hash=commit_hash, author_name="", author_email="", message="", date="" ) elif line.startswith("Author: ") and current_commit: # Parse author line: "Author: Name <email>" author_info = line[8:] # Remove "Author: " match = re.match(r"(.+) <(.+)>", author_info) if match: current_commit.author_name = match.group(1) current_commit.author_email = match.group(2) elif line.startswith("Date: ") and current_commit: current_commit.date = line[6:].strip() # Remove "Date: " elif line.startswith(" ") and current_commit: # Commit message line (indented) current_message_lines.append(line[4:]) # Remove indentation elif line.strip() == "" and current_commit and current_message_lines: # Empty line in message current_message_lines.append("") # Add the last commit if current_commit: current_commit.message = "\n".join(current_message_lines).strip() commits.append(current_commit) return commits def format_git_error( raw_error: str, command: list[str], repo_path: str ) -> GitFormattedError: """ Format a raw git error into a human-readable error with context. Args: raw_error: Raw error message from git command: The git command that failed repo_path: Repository path where command was executed Returns: GitFormattedError with formatted message and suggestions """ # Create human-readable message if "not a git repository" in raw_error.lower(): message = f"The directory '{repo_path}' is not a git repository." suggestion = "Initialize a git repository with 'git init' or navigate to an existing repository." elif "not found" in raw_error.lower(): message = f"Git repository or file not found in '{repo_path}'." suggestion = "Check that the path exists and contains a valid git repository." elif "permission denied" in raw_error.lower(): message = f"Permission denied accessing git repository at '{repo_path}'." suggestion = ( "Check file permissions and ensure you have access to the repository." ) else: message = f"Git operation failed: {raw_error}" suggestion = "Check the git command syntax and repository state." return GitFormattedError( message=message, context=f"Command: git {' '.join(command)} (in {repo_path})", command=command, suggestion=suggestion, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server