git_tools.py•9.69 kB
"""Local Git operations using subprocess."""
import subprocess
import re
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
import config
def _validate_repo() -> None:
"""Validate that REPO_ROOT is a Git repository."""
if not (config.REPO_ROOT / ".git").exists():
raise ValueError(f"No Git repository found at {config.REPO_ROOT}")
def _validate_path(file_path: str) -> Path:
"""Validate that a file path is within REPO_ROOT."""
_validate_repo()
full_path = (config.REPO_ROOT / file_path).resolve()
if not str(full_path).startswith(str(config.REPO_ROOT.resolve())):
raise ValueError(f"Path {file_path} is outside repository root")
return full_path
def _run_git_command(args: List[str], check: bool = True) -> subprocess.CompletedProcess:
"""Run a git command and return the result."""
_validate_repo()
try:
result = subprocess.run(
["git"] + args,
cwd=config.REPO_ROOT,
text=True,
capture_output=True,
check=check
)
return result
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Git command failed: {e.stderr.strip() or e.stdout.strip()}")
def git_status() -> Dict[str, Any]:
"""
Get the current Git status including branch, ahead/behind counts, and file lists.
Returns:
Dictionary with branch, ahead, behind, staged, modified, and untracked files.
"""
_validate_repo()
# Get status with porcelain format
result = _run_git_command(["status", "--porcelain=v1", "--branch"])
output = result.stdout
# Parse branch info
branch_match = re.search(r'## (.+?)(?:\.\.\.(.+?))?(?:\s+\[(ahead (\d+))?(?:, )?(behind (\d+))?\])?', output)
branch = "unknown"
ahead = 0
behind = 0
if branch_match:
branch = branch_match.group(1)
if branch_match.group(4):
ahead = int(branch_match.group(4))
if branch_match.group(6):
behind = int(branch_match.group(6))
# Parse file statuses
staged = []
modified = []
untracked = []
for line in output.split('\n'):
if not line.strip() or line.startswith('##'):
continue
status = line[:2]
file_path = line[3:].strip()
if status[0] in ['A', 'M', 'D', 'R', 'C']:
staged.append(file_path)
if status[1] in ['M', 'D']:
modified.append(file_path)
if status == '??':
untracked.append(file_path)
# Get current branch name
branch_result = _run_git_command(["rev-parse", "--abbrev-ref", "HEAD"], check=False)
if branch_result.returncode == 0:
branch = branch_result.stdout.strip()
result_dict = {
"branch": branch,
"ahead": ahead,
"behind": behind,
"staged": staged,
"modified": modified,
"untracked": untracked
}
# Create human-readable summary
content_parts = [f"Branch: {branch}"]
if ahead > 0:
content_parts.append(f"Ahead: {ahead}")
if behind > 0:
content_parts.append(f"Behind: {behind}")
if staged:
content_parts.append(f"\nStaged files ({len(staged)}):")
content_parts.extend([f" {f}" for f in staged])
if modified:
content_parts.append(f"\nModified files ({len(modified)}):")
content_parts.extend([f" {f}" for f in modified])
if untracked:
content_parts.append(f"\nUntracked files ({len(untracked)}):")
content_parts.extend([f" {f}" for f in untracked])
if not staged and not modified and not untracked:
content_parts.append("\nWorking tree clean")
result_dict["content"] = "\n".join(content_parts)
return result_dict
def git_log(limit: int = 10) -> Dict[str, Any]:
"""
Get Git commit log.
Args:
limit: Number of commits to return (1-50, default 10)
Returns:
Dictionary with list of commits.
"""
_validate_repo()
if not isinstance(limit, int) or limit < 1 or limit > 50:
raise ValueError("limit must be an integer between 1 and 50")
# Format: hash|author_name|author_email|date|message
result = _run_git_command([
"log",
f"-n{limit}",
"--pretty=format:%H|%an|%ae|%ai|%s",
"--date=iso"
])
commits = []
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split('|', 4)
if len(parts) >= 5:
hash_val, author_name, author_email, date_str, message = parts
# Parse date and convert to ISO format
try:
dt = datetime.strptime(date_str.split()[0] + " " + date_str.split()[1], "%Y-%m-%d %H:%M:%S")
date_iso = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
except:
date_iso = date_str
commits.append({
"hash": hash_val[:7], # Short hash
"author_name": author_name,
"author_email": author_email,
"date": date_iso,
"message": message
})
# Create human-readable summary
content_parts = [f"Recent commits ({len(commits)}):"]
for commit in commits:
content_parts.append(f"\n{commit['hash']} - {commit['message']}")
content_parts.append(f" Author: {commit['author_name']} <{commit['author_email']}>")
content_parts.append(f" Date: {commit['date']}")
return {
"commits": commits,
"content": "\n".join(content_parts)
}
def git_diff(file_path: str) -> Dict[str, Any]:
"""
Get the diff for a specific file.
Args:
file_path: Relative path to the file
Returns:
Dictionary with file path and diff content.
"""
_validate_path(file_path)
result = _run_git_command(["diff", "--", file_path], check=False)
diff_output = result.stdout if result.stdout else "(no changes)"
return {
"file_path": file_path,
"diff": diff_output,
"content": f"Diff for {file_path}:\n\n{diff_output}"
}
def git_list_branches() -> Dict[str, Any]:
"""
List all Git branches.
Returns:
Dictionary with current branch and list of all branches.
"""
_validate_repo()
result = _run_git_command(["branch", "--list"])
branches = [line.strip().lstrip('* ').strip() for line in result.stdout.split('\n') if line.strip()]
# Get current branch
current_result = _run_git_command(["rev-parse", "--abbrev-ref", "HEAD"])
current = current_result.stdout.strip()
return {
"current": current,
"branches": branches,
"content": f"Current branch: {current}\n\nAll branches:\n" + "\n".join([f" {'*' if b == current else ' '} {b}" for b in branches])
}
def git_stage_files(files: List[str]) -> Dict[str, Any]:
"""
Stage files for commit.
Args:
files: List of relative file paths to stage
Returns:
Dictionary with list of staged files and success status.
"""
if not files:
raise ValueError("files list cannot be empty")
# Validate all paths
validated_paths = []
for file_path in files:
_validate_path(file_path)
validated_paths.append(file_path)
_run_git_command(["add", "--"] + validated_paths)
return {
"staged": validated_paths,
"success": True,
"content": f"Successfully staged {len(validated_paths)} file(s):\n" + "\n".join([f" {f}" for f in validated_paths])
}
def git_commit(message: str) -> Dict[str, Any]:
"""
Create a commit with the given message.
Args:
message: Commit message (minimum 3 characters)
Returns:
Dictionary with success status, commit hash, and message.
"""
if not message or len(message.strip()) < 3:
raise ValueError("Commit message must be at least 3 characters long")
_validate_repo()
# Check if there are staged changes
status_result = _run_git_command(["diff", "--cached", "--quiet"], check=False)
if status_result.returncode == 0:
raise ValueError("No staged changes to commit. Please stage files first using git_stage_files.")
_run_git_command(["commit", "-m", message])
# Get the commit hash
hash_result = _run_git_command(["rev-parse", "HEAD"])
commit_hash = hash_result.stdout.strip()[:7]
return {
"success": True,
"commit_hash": commit_hash,
"message": message,
"content": f"Successfully committed: {commit_hash}\nMessage: {message}"
}
def git_checkout_branch(branch: str, create: bool = False) -> Dict[str, Any]:
"""
Checkout a branch, optionally creating it.
Args:
branch: Branch name to checkout
create: If True, create the branch if it doesn't exist
Returns:
Dictionary with success status and current branch.
"""
_validate_repo()
if not branch or not branch.strip():
raise ValueError("Branch name cannot be empty")
if create:
_run_git_command(["checkout", "-b", branch])
else:
_run_git_command(["checkout", branch])
# Verify current branch
current_result = _run_git_command(["rev-parse", "--abbrev-ref", "HEAD"])
current = current_result.stdout.strip()
return {
"success": True,
"current": current,
"content": f"Successfully checked out branch: {current}"
}