Git MCP
by kjozsa
- git_mcp
from dataclasses import dataclass
from typing import AsyncIterator, Dict, List, Optional, Tuple
from datetime import datetime
import subprocess
import os
import re
from mcp.server.fastmcp import FastMCP, Context
from contextlib import asynccontextmanager
@dataclass
class GitContext:
git_repos_path: str
@asynccontextmanager
async def git_lifespan(server: FastMCP) -> AsyncIterator[GitContext]:
"""Manage Git repositories lifecycle"""
# read .env
import dotenv
dotenv.load_dotenv()
git_repos_path = os.environ.get("GIT_REPOS_PATH", os.getcwd())
try:
yield GitContext(git_repos_path=git_repos_path)
finally:
pass # No cleanup needed
mcp = FastMCP("git-mcp", lifespan=git_lifespan)
def _run_git_command(repo_path: str, command: List[str]) -> str:
"""Run a git command in the specified repository path"""
if not os.path.exists(repo_path):
raise ValueError(f"Repository path does not exist: {repo_path}")
full_command = ["git"] + command
try:
result = subprocess.run(
full_command, cwd=repo_path, check=True, capture_output=True, text=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
error_message = e.stderr.strip() if e.stderr else str(e)
raise ValueError(f"Git command failed: {error_message}")
@mcp.tool()
def get_last_git_tag(ctx: Context, repo_name: str) -> Dict[str, str]:
"""Find the last git tag in the repository
Args:
repo_name: Name of the git repository
Returns:
Dictionary containing tag version and date
"""
git_repos_path = ctx.request_context.lifespan_context.git_repos_path
repo_path = os.path.join(git_repos_path, repo_name)
# Get the most recent tag
try:
# Get the most recent tag name
tag_name = _run_git_command(repo_path, ["describe", "--tags", "--abbrev=0"])
# Get the tag date
tag_date_str = _run_git_command(
repo_path, ["log", "-1", "--format=%ai", tag_name]
)
# Parse the date string into a datetime object
tag_date = datetime.strptime(tag_date_str, "%Y-%m-%d %H:%M:%S %z")
formatted_date = tag_date.strftime("%Y-%m-%d %H:%M:%S")
return {"version": tag_name, "date": formatted_date}
except ValueError as e:
if "No names found" in str(e):
return {"version": "No tags found", "date": ""}
raise e
@mcp.tool()
def list_commits_since_last_tag(
ctx: Context, repo_name: str, max_count: Optional[int] = None
) -> List[Dict[str, str]]:
"""List commit messages since main HEAD and the last git tag
Args:
repo_name: Name of the git repository
max_count: Maximum number of commits to return
Returns:
List of dictionaries containing commit hash, author, date, and message
"""
git_repos_path = ctx.request_context.lifespan_context.git_repos_path
repo_path = os.path.join(git_repos_path, repo_name)
try:
# Try to get the most recent tag
last_tag = _run_git_command(repo_path, ["describe", "--tags", "--abbrev=0"])
except ValueError as e:
if "No names found" in str(e):
# If no tags found, return all commits
last_tag = ""
else:
raise e
# Build the git log command
log_command = ["log", "--pretty=format:%H|%an|%ad|%s", "--date=iso"]
if last_tag:
log_command.append(f"{last_tag}..HEAD")
if max_count is not None:
log_command.extend(["-n", str(max_count)])
# Get the commit logs
commit_logs = _run_git_command(repo_path, log_command)
# Parse the commit logs
commits = []
if commit_logs:
for line in commit_logs.split("\n"):
if not line.strip():
continue
parts = line.split("|", 3)
if len(parts) == 4:
commit_hash, author, date, message = parts
commits.append(
{
"hash": commit_hash,
"author": author,
"date": date,
"message": message,
}
)
return commits
@mcp.tool()
def list_repositories(ctx: Context) -> List[str]:
"""List all git repositories in the configured path
Returns:
List of repository names
"""
git_repos_path = ctx.request_context.lifespan_context.git_repos_path
if not os.path.exists(git_repos_path):
raise ValueError(f"Git repositories path does not exist: {git_repos_path}")
repos = []
for item in os.listdir(git_repos_path):
item_path = os.path.join(git_repos_path, item)
if os.path.isdir(item_path) and os.path.exists(os.path.join(item_path, ".git")):
repos.append(item)
return repos
@mcp.tool()
def create_git_tag(
ctx: Context, repo_name: str, tag_name: str, message: Optional[str] = None
) -> Dict[str, str]:
"""Create a new git tag in the repository
Args:
repo_name: Name of the git repository
tag_name: Name of the tag to create
message: Optional message for annotated tag
Returns:
Dictionary containing status and tag information
"""
git_repos_path = ctx.request_context.lifespan_context.git_repos_path
repo_path = os.path.join(git_repos_path, repo_name)
# Validate repository exists
if not os.path.exists(repo_path) or not os.path.exists(
os.path.join(repo_path, ".git")
):
raise ValueError(f"Repository not found: {repo_name}")
# Create the tag command
if message:
# Create annotated tag with message
tag_command = ["tag", "-a", tag_name, "-m", message]
else:
# Create lightweight tag
tag_command = ["tag", tag_name]
# Execute the tag command
try:
_run_git_command(repo_path, tag_command)
# Get tag date
tag_date_str = _run_git_command(
repo_path, ["log", "-1", "--format=%ai", tag_name]
)
# Parse the date string into a datetime object
tag_date = datetime.strptime(tag_date_str, "%Y-%m-%d %H:%M:%S %z")
formatted_date = tag_date.strftime("%Y-%m-%d %H:%M:%S")
return {
"status": "success",
"version": tag_name,
"date": formatted_date,
"type": "annotated" if message else "lightweight",
}
except ValueError as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
def push_git_tag(ctx: Context, repo_name: str, tag_name: str) -> Dict[str, str]:
"""Push a git tag to the default remote
Args:
repo_name: Name of the git repository
tag_name: Name of the tag to push
Returns:
Dictionary containing status and information about the operation
"""
git_repos_path = ctx.request_context.lifespan_context.git_repos_path
repo_path = os.path.join(git_repos_path, repo_name)
# Validate repository exists
if not os.path.exists(repo_path) or not os.path.exists(
os.path.join(repo_path, ".git")
):
raise ValueError(f"Repository not found: {repo_name}")
# Validate tag exists
try:
_run_git_command(repo_path, ["tag", "-l", tag_name])
except ValueError:
return {
"status": "error",
"error": f"Tag {tag_name} not found in repository {repo_name}",
}
# Get the default remote (usually 'origin')
try:
remote = _run_git_command(repo_path, ["remote"])
if not remote:
return {
"status": "error",
"error": f"No remote configured for repository {repo_name}",
}
# Use the first remote if multiple are available
default_remote = remote.split("\n")[0].strip()
# Push the tag to the remote
push_result = _run_git_command(repo_path, ["push", default_remote, tag_name])
return {
"status": "success",
"remote": default_remote,
"tag": tag_name,
"message": f"Successfully pushed tag {tag_name} to remote {default_remote}",
}
except ValueError as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
async def refresh_repository(ctx: Context, repo_name: str) -> Dict:
"""Refresh repository by checking out main branch and pulling all remotes
Args:
repo_name: Name of the git repository
Returns:
Dictionary containing status and information about the operation
"""
git_ctx = ctx.get_context(GitContext)
repo_path = os.path.join(git_ctx.git_repos_path, repo_name)
# Validate repository exists
if not os.path.exists(repo_path) or not os.path.exists(
os.path.join(repo_path, ".git")
):
raise ValueError(f"Repository not found: {repo_name}")
try:
# Get all remotes
remotes = _run_git_command(repo_path, ["remote"]).strip().split("\n")
if not remotes or remotes[0] == "":
return {
"status": "error",
"error": f"No remotes configured for repository {repo_name}",
}
# Checkout main branch
try:
_run_git_command(repo_path, ["checkout", "main"])
except ValueError as e:
# Try master if main doesn't exist
try:
_run_git_command(repo_path, ["checkout", "master"])
except ValueError:
return {
"status": "error",
"error": f"Failed to checkout main or master branch: {str(e)}",
}
# Pull from all remotes
pull_results = {}
for remote in remotes:
if remote: # Skip empty remote names
try:
result = _run_git_command(repo_path, ["pull", remote, "main"])
pull_results[remote] = "success"
except ValueError as e:
# Try master if main doesn't exist
try:
result = _run_git_command(repo_path, ["pull", remote, "master"])
pull_results[remote] = "success"
except ValueError as e:
pull_results[remote] = f"error: {str(e)}"
return {
"status": "success",
"repository": repo_name,
"branch": "main",
"pull_results": pull_results,
}
except Exception as e:
return {"status": "error", "error": f"Failed to refresh repository: {str(e)}"}
from importlib.metadata import version
try:
__version__ = version("git-mcp")
print(f"Starting MCP server version {__version__}")
except Exception:
print("Starting MCP server (version unknown)")