"""
GitHub MCP Server
=================
A Model Context Protocol server that enables LLMs to interact with GitHub.
Features:
- Repository management (search, browse, read files)
- Issue tracking (list, create, update, comment)
- Pull request operations (list, create, merge, review)
- Branch and commit history
- GitHub Actions workflows
- Code search across repositories
Usage:
export GITHUB_TOKEN=your_personal_access_token
python github_mcp_server.py
Author: Hansen Nkefor
"""
import os
import base64
import logging
from datetime import datetime
from typing import Optional, Literal, Any
from enum import Enum
import httpx
from pydantic import BaseModel, Field
from mcp.server.fastmcp import FastMCP
# ============================================================================
# CONFIGURATION
# ============================================================================
API_BASE_URL = "https://api.github.com"
CHARACTER_LIMIT = 25000
DEFAULT_PER_PAGE = 30
MAX_PER_PAGE = 100
# Initialize MCP Server
mcp = FastMCP(
"GitHub MCP Server",
description="Interact with GitHub repositories, issues, PRs, and more"
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============================================================================
# ENUMS
# ============================================================================
class ResponseFormat(str, Enum):
"""Response format options"""
JSON = "json"
MARKDOWN = "markdown"
class IssueState(str, Enum):
"""Issue/PR state filter"""
OPEN = "open"
CLOSED = "closed"
ALL = "all"
class PRMergeMethod(str, Enum):
"""Pull request merge methods"""
MERGE = "merge"
SQUASH = "squash"
REBASE = "rebase"
class SortDirection(str, Enum):
"""Sort direction"""
ASC = "asc"
DESC = "desc"
# ============================================================================
# PYDANTIC MODELS - INPUT VALIDATION
# ============================================================================
class RepositoryQuery(BaseModel):
"""Search query for repositories"""
query: str = Field(..., description="Search query (e.g., 'terraform aws language:python')")
sort: Optional[Literal["stars", "forks", "updated", "help-wanted-issues"]] = Field(
default="stars", description="Sort results by"
)
order: SortDirection = Field(default=SortDirection.DESC, description="Sort order")
per_page: int = Field(default=10, ge=1, le=50, description="Results per page (1-50)")
model_config = {"extra": "forbid"}
class IssueFilter(BaseModel):
"""Filter for listing issues"""
state: IssueState = Field(default=IssueState.OPEN, description="Issue state filter")
labels: Optional[str] = Field(default=None, description="Comma-separated label names")
assignee: Optional[str] = Field(default=None, description="Filter by assignee username")
sort: Literal["created", "updated", "comments"] = Field(default="created")
direction: SortDirection = Field(default=SortDirection.DESC)
per_page: int = Field(default=20, ge=1, le=100)
model_config = {"extra": "forbid"}
class CreateIssueRequest(BaseModel):
"""Request to create a new issue"""
title: str = Field(..., min_length=1, max_length=256, description="Issue title")
body: Optional[str] = Field(default=None, max_length=65536, description="Issue body (Markdown)")
labels: Optional[list[str]] = Field(default=None, description="Label names to apply")
assignees: Optional[list[str]] = Field(default=None, description="Usernames to assign")
milestone: Optional[int] = Field(default=None, description="Milestone number")
model_config = {"extra": "forbid"}
class UpdateIssueRequest(BaseModel):
"""Request to update an issue"""
title: Optional[str] = Field(default=None, max_length=256)
body: Optional[str] = Field(default=None, max_length=65536)
state: Optional[IssueState] = Field(default=None)
labels: Optional[list[str]] = Field(default=None)
assignees: Optional[list[str]] = Field(default=None)
model_config = {"extra": "forbid"}
class CreatePRRequest(BaseModel):
"""Request to create a pull request"""
title: str = Field(..., min_length=1, max_length=256, description="PR title")
head: str = Field(..., description="Branch containing changes (e.g., 'feature-branch' or 'user:branch')")
base: str = Field(..., description="Branch to merge into (e.g., 'main')")
body: Optional[str] = Field(default=None, max_length=65536, description="PR description (Markdown)")
draft: bool = Field(default=False, description="Create as draft PR")
model_config = {"extra": "forbid"}
class CodeSearchQuery(BaseModel):
"""Code search query parameters"""
query: str = Field(..., description="Search query (e.g., 'def main language:python')")
per_page: int = Field(default=20, ge=1, le=100)
model_config = {"extra": "forbid"}
# ============================================================================
# API CLIENT HELPERS
# ============================================================================
def get_github_token() -> str:
"""Get GitHub token from environment"""
token = os.environ.get("GITHUB_TOKEN")
if not token:
raise ValueError(
"GITHUB_TOKEN environment variable not set. "
"Create a personal access token at https://github.com/settings/tokens "
"with 'repo', 'read:org', and 'workflow' scopes."
)
return token
def get_headers() -> dict[str, str]:
"""Get API request headers"""
return {
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {get_github_token()}",
"X-GitHub-Api-Version": "2022-11-28",
"User-Agent": "GitHub-MCP-Server/1.0"
}
async def github_request(
method: str,
endpoint: str,
params: Optional[dict] = None,
json_data: Optional[dict] = None
) -> dict[str, Any]:
"""
Make an authenticated request to the GitHub API.
Returns parsed JSON response or raises descriptive error.
"""
url = f"{API_BASE_URL}{endpoint}"
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.request(
method=method,
url=url,
headers=get_headers(),
params=params,
json=json_data
)
# Handle rate limiting
if response.status_code == 403:
remaining = response.headers.get("X-RateLimit-Remaining", "unknown")
reset_time = response.headers.get("X-RateLimit-Reset", "unknown")
if remaining == "0":
return {
"error": True,
"message": f"GitHub API rate limit exceeded. Resets at timestamp {reset_time}. "
"Try again later or use a token with higher limits."
}
# Handle not found
if response.status_code == 404:
return {
"error": True,
"message": f"Resource not found: {endpoint}. Verify the owner/repo/resource exists and you have access."
}
# Handle unauthorized
if response.status_code == 401:
return {
"error": True,
"message": "Authentication failed. Check that GITHUB_TOKEN is valid and not expired."
}
# Handle validation errors
if response.status_code == 422:
error_data = response.json()
errors = error_data.get("errors", [])
error_msgs = [f"{e.get('field', 'unknown')}: {e.get('message', 'validation error')}" for e in errors]
return {
"error": True,
"message": f"Validation error: {'; '.join(error_msgs)}"
}
response.raise_for_status()
# Handle empty responses (204 No Content)
if response.status_code == 204:
return {"success": True, "message": "Operation completed successfully"}
return response.json()
except httpx.TimeoutException:
return {
"error": True,
"message": "Request timed out. GitHub API may be slow. Try again."
}
except httpx.HTTPStatusError as e:
return {
"error": True,
"message": f"HTTP error {e.response.status_code}: {e.response.text[:500]}"
}
except Exception as e:
return {
"error": True,
"message": f"Request failed: {str(e)}"
}
def truncate_content(content: str, limit: int = CHARACTER_LIMIT) -> str:
"""Truncate content to character limit with indicator"""
if len(content) <= limit:
return content
return content[:limit] + f"\n\n... [Truncated - {len(content) - limit} more characters]"
def format_datetime(dt_str: Optional[str]) -> str:
"""Format ISO datetime string for display"""
if not dt_str:
return "N/A"
try:
dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M UTC")
except:
return dt_str
# ============================================================================
# REPOSITORY TOOLS
# ============================================================================
@mcp.tool()
async def search_repositories(
query: str,
sort: str = "stars",
order: str = "desc",
per_page: int = 10,
format: str = "markdown"
) -> str:
"""
Search GitHub repositories by query.
Use this to find repositories matching keywords, languages, topics, or other criteria.
Supports GitHub's search syntax (e.g., 'language:python stars:>1000').
Args:
query: Search query (e.g., 'terraform aws', 'language:python stars:>100')
sort: Sort by 'stars', 'forks', 'updated', or 'help-wanted-issues'
order: Sort order - 'asc' or 'desc'
per_page: Number of results (1-50)
format: Response format - 'json' or 'markdown'
Returns:
List of matching repositories with details
Examples:
- search_repositories("kubernetes helm") - Find k8s helm repos
- search_repositories("language:rust stars:>500") - Popular Rust repos
- search_repositories("topic:machine-learning") - ML repos by topic
"""
params = {
"q": query,
"sort": sort,
"order": order,
"per_page": min(per_page, 50)
}
result = await github_request("GET", "/search/repositories", params=params)
if result.get("error"):
return result["message"]
repos = result.get("items", [])
total = result.get("total_count", 0)
if format == "json":
return truncate_content(str({
"total_count": total,
"showing": len(repos),
"repositories": [
{
"full_name": r["full_name"],
"description": r.get("description", ""),
"stars": r["stargazers_count"],
"forks": r["forks_count"],
"language": r.get("language"),
"updated_at": r["updated_at"],
"url": r["html_url"],
"topics": r.get("topics", [])
}
for r in repos
]
}))
# Markdown format
lines = [f"# Repository Search Results\n", f"**Query:** `{query}`", f"**Total matches:** {total:,}\n"]
for r in repos:
lines.append(f"## [{r['full_name']}]({r['html_url']})")
if r.get("description"):
lines.append(f"> {r['description']}")
lines.append(f"- **Stars:** {r['stargazers_count']:,} | **Forks:** {r['forks_count']:,}")
lines.append(f"- **Language:** {r.get('language', 'N/A')} | **Updated:** {format_datetime(r['updated_at'])}")
if r.get("topics"):
lines.append(f"- **Topics:** {', '.join(r['topics'][:5])}")
lines.append("")
return truncate_content("\n".join(lines))
@mcp.tool()
async def get_repository(
owner: str,
repo: str,
format: str = "markdown"
) -> str:
"""
Get detailed information about a repository.
Args:
owner: Repository owner (username or organization)
repo: Repository name
format: Response format - 'json' or 'markdown'
Returns:
Repository details including description, stats, topics, and settings
Examples:
- get_repository("microsoft", "vscode")
- get_repository("kubernetes", "kubernetes")
"""
result = await github_request("GET", f"/repos/{owner}/{repo}")
if result.get("error"):
return result["message"]
if format == "json":
return truncate_content(str({
"full_name": result["full_name"],
"description": result.get("description"),
"homepage": result.get("homepage"),
"stars": result["stargazers_count"],
"forks": result["forks_count"],
"watchers": result["subscribers_count"],
"open_issues": result["open_issues_count"],
"language": result.get("language"),
"topics": result.get("topics", []),
"default_branch": result["default_branch"],
"created_at": result["created_at"],
"updated_at": result["updated_at"],
"license": result.get("license", {}).get("name"),
"visibility": result["visibility"],
"is_fork": result["fork"],
"clone_url": result["clone_url"],
"html_url": result["html_url"]
}))
# Markdown format
lines = [
f"# {result['full_name']}",
"",
f"> {result.get('description', 'No description')}"
]
if result.get("homepage"):
lines.append(f"\n**Homepage:** {result['homepage']}")
lines.extend([
"",
"## Statistics",
f"| Metric | Value |",
f"|--------|-------|",
f"| Stars | {result['stargazers_count']:,} |",
f"| Forks | {result['forks_count']:,} |",
f"| Watchers | {result['subscribers_count']:,} |",
f"| Open Issues | {result['open_issues_count']:,} |",
"",
"## Details",
f"- **Language:** {result.get('language', 'N/A')}",
f"- **License:** {result.get('license', {}).get('name', 'None')}",
f"- **Default Branch:** `{result['default_branch']}`",
f"- **Visibility:** {result['visibility']}",
f"- **Is Fork:** {'Yes' if result['fork'] else 'No'}",
f"- **Created:** {format_datetime(result['created_at'])}",
f"- **Updated:** {format_datetime(result['updated_at'])}",
])
if result.get("topics"):
lines.append(f"\n**Topics:** {', '.join(result['topics'])}")
lines.extend([
"",
"## Links",
f"- [Repository]({result['html_url']})",
f"- Clone: `{result['clone_url']}`"
])
return "\n".join(lines)
@mcp.tool()
async def list_repository_files(
owner: str,
repo: str,
path: str = "",
ref: Optional[str] = None,
format: str = "markdown"
) -> str:
"""
List files and directories in a repository path.
Args:
owner: Repository owner
repo: Repository name
path: Directory path (empty for root)
ref: Branch, tag, or commit SHA (defaults to default branch)
format: Response format - 'json' or 'markdown'
Returns:
List of files and directories with types and sizes
Examples:
- list_repository_files("owner", "repo") - List root
- list_repository_files("owner", "repo", "src") - List src directory
- list_repository_files("owner", "repo", "docs", ref="v1.0.0") - List docs at tag
"""
endpoint = f"/repos/{owner}/{repo}/contents/{path}"
params = {"ref": ref} if ref else {}
result = await github_request("GET", endpoint, params=params)
if result.get("error"):
return result["message"]
# Handle single file response
if isinstance(result, dict) and result.get("type") == "file":
return f"Path '{path}' is a file, not a directory. Use get_file_contents() to read it."
if not isinstance(result, list):
return "Unexpected response format"
# Sort: directories first, then files
items = sorted(result, key=lambda x: (x["type"] != "dir", x["name"].lower()))
if format == "json":
return truncate_content(str({
"path": path or "/",
"ref": ref or "default",
"items": [
{
"name": item["name"],
"type": item["type"],
"size": item.get("size", 0),
"path": item["path"]
}
for item in items
]
}))
# Markdown format
lines = [
f"# Contents of `{owner}/{repo}`",
f"**Path:** `/{path}`" if path else "**Path:** `/` (root)",
]
if ref:
lines.append(f"**Ref:** `{ref}`")
lines.append("")
lines.append("| Type | Name | Size |")
lines.append("|------|------|------|")
for item in items:
icon = "📁" if item["type"] == "dir" else "📄"
size = f"{item.get('size', 0):,} B" if item["type"] == "file" else "-"
lines.append(f"| {icon} {item['type']} | `{item['name']}` | {size} |")
return "\n".join(lines)
@mcp.tool()
async def get_file_contents(
owner: str,
repo: str,
path: str,
ref: Optional[str] = None
) -> str:
"""
Read the contents of a file from a repository.
Args:
owner: Repository owner
repo: Repository name
path: File path within repository
ref: Branch, tag, or commit SHA (optional)
Returns:
File contents as text (decoded from base64)
Examples:
- get_file_contents("owner", "repo", "README.md")
- get_file_contents("owner", "repo", "src/main.py", ref="develop")
"""
endpoint = f"/repos/{owner}/{repo}/contents/{path}"
params = {"ref": ref} if ref else {}
result = await github_request("GET", endpoint, params=params)
if result.get("error"):
return result["message"]
if result.get("type") != "file":
return f"Path '{path}' is not a file (type: {result.get('type')}). Use list_repository_files() for directories."
# Check size
size = result.get("size", 0)
if size > 1_000_000: # 1MB limit
return f"File is too large ({size:,} bytes). GitHub API only returns files up to 1MB via contents API."
# Decode content
try:
content = base64.b64decode(result.get("content", "")).decode("utf-8")
except Exception as e:
return f"Failed to decode file content: {e}. File may be binary."
# Format response
lines = [
f"# {path}",
f"**Size:** {size:,} bytes | **SHA:** `{result.get('sha', 'unknown')[:7]}`",
"",
"```" + (path.split(".")[-1] if "." in path else ""),
truncate_content(content, CHARACTER_LIMIT - 500),
"```"
]
return "\n".join(lines)
# ============================================================================
# ISSUE TOOLS
# ============================================================================
@mcp.tool()
async def list_issues(
owner: str,
repo: str,
state: str = "open",
labels: Optional[str] = None,
assignee: Optional[str] = None,
sort: str = "created",
direction: str = "desc",
per_page: int = 20,
format: str = "markdown"
) -> str:
"""
List issues in a repository with optional filters.
Args:
owner: Repository owner
repo: Repository name
state: Filter by state - 'open', 'closed', or 'all'
labels: Comma-separated label names (e.g., 'bug,help wanted')
assignee: Filter by assignee username
sort: Sort by 'created', 'updated', or 'comments'
direction: Sort direction - 'asc' or 'desc'
per_page: Number of results (1-100)
format: Response format - 'json' or 'markdown'
Returns:
List of issues with titles, states, labels, and assignees
"""
params = {
"state": state,
"sort": sort,
"direction": direction,
"per_page": min(per_page, 100)
}
if labels:
params["labels"] = labels
if assignee:
params["assignee"] = assignee
result = await github_request("GET", f"/repos/{owner}/{repo}/issues", params=params)
if result.get("error"):
return result["message"]
# Filter out pull requests (they also appear in issues endpoint)
issues = [i for i in result if "pull_request" not in i]
if format == "json":
return truncate_content(str({
"count": len(issues),
"issues": [
{
"number": i["number"],
"title": i["title"],
"state": i["state"],
"user": i["user"]["login"],
"labels": [l["name"] for l in i.get("labels", [])],
"assignees": [a["login"] for a in i.get("assignees", [])],
"comments": i["comments"],
"created_at": i["created_at"],
"updated_at": i["updated_at"],
"url": i["html_url"]
}
for i in issues
]
}))
# Markdown format
lines = [
f"# Issues: {owner}/{repo}",
f"**State:** {state} | **Count:** {len(issues)}",
""
]
for i in issues:
state_icon = "🟢" if i["state"] == "open" else "🔴"
labels_str = ", ".join([f"`{l['name']}`" for l in i.get("labels", [])[:3]])
assignees = ", ".join([a["login"] for a in i.get("assignees", [])[:2]])
lines.append(f"### {state_icon} #{i['number']}: {i['title']}")
lines.append(f"- **State:** {i['state']} | **Comments:** {i['comments']}")
lines.append(f"- **Author:** @{i['user']['login']} | **Created:** {format_datetime(i['created_at'])}")
if labels_str:
lines.append(f"- **Labels:** {labels_str}")
if assignees:
lines.append(f"- **Assignees:** {assignees}")
lines.append(f"- [View Issue]({i['html_url']})")
lines.append("")
return truncate_content("\n".join(lines))
@mcp.tool()
async def get_issue(
owner: str,
repo: str,
issue_number: int,
include_comments: bool = True,
format: str = "markdown"
) -> str:
"""
Get detailed information about a specific issue, optionally with comments.
Args:
owner: Repository owner
repo: Repository name
issue_number: Issue number
include_comments: Include issue comments (default True)
format: Response format - 'json' or 'markdown'
Returns:
Issue details including body, labels, assignees, and optionally comments
"""
result = await github_request("GET", f"/repos/{owner}/{repo}/issues/{issue_number}")
if result.get("error"):
return result["message"]
comments = []
if include_comments and result.get("comments", 0) > 0:
comments_result = await github_request(
"GET",
f"/repos/{owner}/{repo}/issues/{issue_number}/comments",
params={"per_page": 20}
)
if not comments_result.get("error"):
comments = comments_result
if format == "json":
return truncate_content(str({
"number": result["number"],
"title": result["title"],
"state": result["state"],
"body": result.get("body", ""),
"user": result["user"]["login"],
"labels": [l["name"] for l in result.get("labels", [])],
"assignees": [a["login"] for a in result.get("assignees", [])],
"milestone": result.get("milestone", {}).get("title") if result.get("milestone") else None,
"created_at": result["created_at"],
"updated_at": result["updated_at"],
"closed_at": result.get("closed_at"),
"comments_count": result["comments"],
"comments": [
{
"user": c["user"]["login"],
"body": c["body"][:500],
"created_at": c["created_at"]
}
for c in comments[:10]
],
"url": result["html_url"]
}))
# Markdown format
state_icon = "🟢" if result["state"] == "open" else "🔴"
lines = [
f"# {state_icon} Issue #{result['number']}: {result['title']}",
"",
f"**State:** {result['state']} | **Author:** @{result['user']['login']}",
f"**Created:** {format_datetime(result['created_at'])} | **Updated:** {format_datetime(result['updated_at'])}"
]
if result.get("closed_at"):
lines.append(f"**Closed:** {format_datetime(result['closed_at'])}")
labels = [f"`{l['name']}`" for l in result.get("labels", [])]
if labels:
lines.append(f"**Labels:** {', '.join(labels)}")
assignees = [f"@{a['login']}" for a in result.get("assignees", [])]
if assignees:
lines.append(f"**Assignees:** {', '.join(assignees)}")
if result.get("milestone"):
lines.append(f"**Milestone:** {result['milestone']['title']}")
lines.extend([
"",
"## Description",
result.get("body") or "_No description provided_",
])
if comments:
lines.extend(["", "## Comments", ""])
for c in comments[:10]:
lines.append(f"### @{c['user']['login']} - {format_datetime(c['created_at'])}")
lines.append(truncate_content(c["body"], 1000))
lines.append("")
lines.extend(["", f"[View on GitHub]({result['html_url']})"])
return truncate_content("\n".join(lines))
@mcp.tool()
async def create_issue(
owner: str,
repo: str,
title: str,
body: Optional[str] = None,
labels: Optional[list[str]] = None,
assignees: Optional[list[str]] = None
) -> str:
"""
Create a new issue in a repository.
Args:
owner: Repository owner
repo: Repository name
title: Issue title (required)
body: Issue description in Markdown
labels: List of label names to apply
assignees: List of usernames to assign
Returns:
Created issue details with number and URL
Examples:
- create_issue("owner", "repo", "Bug: Login fails", body="Steps to reproduce...")
- create_issue("owner", "repo", "Feature request", labels=["enhancement"])
"""
data = {"title": title}
if body:
data["body"] = body
if labels:
data["labels"] = labels
if assignees:
data["assignees"] = assignees
result = await github_request("POST", f"/repos/{owner}/{repo}/issues", json_data=data)
if result.get("error"):
return result["message"]
return f"""# Issue Created Successfully
**Number:** #{result['number']}
**Title:** {result['title']}
**URL:** {result['html_url']}
The issue has been created and is now open."""
@mcp.tool()
async def update_issue(
owner: str,
repo: str,
issue_number: int,
title: Optional[str] = None,
body: Optional[str] = None,
state: Optional[str] = None,
labels: Optional[list[str]] = None,
assignees: Optional[list[str]] = None
) -> str:
"""
Update an existing issue.
Args:
owner: Repository owner
repo: Repository name
issue_number: Issue number to update
title: New title (optional)
body: New body content (optional)
state: New state - 'open' or 'closed' (optional)
labels: Replace all labels (optional)
assignees: Replace all assignees (optional)
Returns:
Updated issue details
"""
data = {}
if title is not None:
data["title"] = title
if body is not None:
data["body"] = body
if state is not None:
data["state"] = state
if labels is not None:
data["labels"] = labels
if assignees is not None:
data["assignees"] = assignees
if not data:
return "No updates specified. Provide at least one field to update."
result = await github_request("PATCH", f"/repos/{owner}/{repo}/issues/{issue_number}", json_data=data)
if result.get("error"):
return result["message"]
return f"""# Issue Updated
**Number:** #{result['number']}
**Title:** {result['title']}
**State:** {result['state']}
**URL:** {result['html_url']}
Issue has been updated successfully."""
@mcp.tool()
async def add_issue_comment(
owner: str,
repo: str,
issue_number: int,
body: str
) -> str:
"""
Add a comment to an issue.
Args:
owner: Repository owner
repo: Repository name
issue_number: Issue number
body: Comment text (Markdown supported)
Returns:
Created comment details
"""
if not body.strip():
return "Comment body cannot be empty."
result = await github_request(
"POST",
f"/repos/{owner}/{repo}/issues/{issue_number}/comments",
json_data={"body": body}
)
if result.get("error"):
return result["message"]
return f"""# Comment Added
**Issue:** #{issue_number}
**Author:** @{result['user']['login']}
**Created:** {format_datetime(result['created_at'])}
**URL:** {result['html_url']}
Comment has been posted successfully."""
# ============================================================================
# PULL REQUEST TOOLS
# ============================================================================
@mcp.tool()
async def list_pull_requests(
owner: str,
repo: str,
state: str = "open",
sort: str = "created",
direction: str = "desc",
per_page: int = 20,
format: str = "markdown"
) -> str:
"""
List pull requests in a repository.
Args:
owner: Repository owner
repo: Repository name
state: Filter by state - 'open', 'closed', or 'all'
sort: Sort by 'created', 'updated', or 'popularity'
direction: Sort direction - 'asc' or 'desc'
per_page: Number of results (1-100)
format: Response format - 'json' or 'markdown'
Returns:
List of pull requests with status, reviewers, and merge info
"""
params = {
"state": state,
"sort": sort,
"direction": direction,
"per_page": min(per_page, 100)
}
result = await github_request("GET", f"/repos/{owner}/{repo}/pulls", params=params)
if result.get("error"):
return result["message"]
if format == "json":
return truncate_content(str({
"count": len(result),
"pull_requests": [
{
"number": pr["number"],
"title": pr["title"],
"state": pr["state"],
"draft": pr.get("draft", False),
"user": pr["user"]["login"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"mergeable_state": pr.get("mergeable_state"),
"created_at": pr["created_at"],
"updated_at": pr["updated_at"],
"url": pr["html_url"]
}
for pr in result
]
}))
# Markdown format
lines = [
f"# Pull Requests: {owner}/{repo}",
f"**State:** {state} | **Count:** {len(result)}",
""
]
for pr in result:
state_icon = "🟢" if pr["state"] == "open" else "🟣"
draft_badge = " [DRAFT]" if pr.get("draft") else ""
lines.append(f"### {state_icon} #{pr['number']}: {pr['title']}{draft_badge}")
lines.append(f"- **Author:** @{pr['user']['login']} | **State:** {pr['state']}")
lines.append(f"- **Branch:** `{pr['head']['ref']}` → `{pr['base']['ref']}`")
lines.append(f"- **Created:** {format_datetime(pr['created_at'])}")
lines.append(f"- [View PR]({pr['html_url']})")
lines.append("")
return truncate_content("\n".join(lines))
@mcp.tool()
async def get_pull_request(
owner: str,
repo: str,
pr_number: int,
format: str = "markdown"
) -> str:
"""
Get detailed information about a pull request.
Args:
owner: Repository owner
repo: Repository name
pr_number: Pull request number
format: Response format - 'json' or 'markdown'
Returns:
PR details including diff stats, reviewers, and merge status
"""
result = await github_request("GET", f"/repos/{owner}/{repo}/pulls/{pr_number}")
if result.get("error"):
return result["message"]
if format == "json":
return truncate_content(str({
"number": result["number"],
"title": result["title"],
"state": result["state"],
"draft": result.get("draft", False),
"merged": result.get("merged", False),
"mergeable": result.get("mergeable"),
"mergeable_state": result.get("mergeable_state"),
"user": result["user"]["login"],
"head": result["head"]["ref"],
"base": result["base"]["ref"],
"body": result.get("body", "")[:2000],
"additions": result.get("additions", 0),
"deletions": result.get("deletions", 0),
"changed_files": result.get("changed_files", 0),
"commits": result.get("commits", 0),
"created_at": result["created_at"],
"updated_at": result["updated_at"],
"merged_at": result.get("merged_at"),
"merged_by": result.get("merged_by", {}).get("login") if result.get("merged_by") else None,
"url": result["html_url"]
}))
# Markdown format
state_icon = "🟣" if result.get("merged") else ("🟢" if result["state"] == "open" else "🔴")
status = "Merged" if result.get("merged") else result["state"].title()
lines = [
f"# {state_icon} PR #{result['number']}: {result['title']}",
"",
f"**Status:** {status}",
f"**Author:** @{result['user']['login']}",
f"**Branch:** `{result['head']['ref']}` → `{result['base']['ref']}`",
""
]
if result.get("draft"):
lines.insert(2, "**⚠️ This is a draft pull request**")
lines.extend([
"## Changes",
f"| Metric | Value |",
f"|--------|-------|",
f"| Commits | {result.get('commits', 0)} |",
f"| Files Changed | {result.get('changed_files', 0)} |",
f"| Additions | +{result.get('additions', 0)} |",
f"| Deletions | -{result.get('deletions', 0)} |",
""
])
if result.get("mergeable") is not None:
mergeable_status = "✅ Yes" if result["mergeable"] else "❌ No"
lines.append(f"**Mergeable:** {mergeable_status} ({result.get('mergeable_state', 'unknown')})")
lines.append("")
lines.extend([
"## Description",
result.get("body") or "_No description provided_",
"",
"## Timeline",
f"- **Created:** {format_datetime(result['created_at'])}",
f"- **Updated:** {format_datetime(result['updated_at'])}"
])
if result.get("merged_at"):
lines.append(f"- **Merged:** {format_datetime(result['merged_at'])} by @{result['merged_by']['login']}")
lines.extend(["", f"[View on GitHub]({result['html_url']})"])
return truncate_content("\n".join(lines))
@mcp.tool()
async def create_pull_request(
owner: str,
repo: str,
title: str,
head: str,
base: str,
body: Optional[str] = None,
draft: bool = False
) -> str:
"""
Create a new pull request.
Args:
owner: Repository owner
repo: Repository name
title: PR title
head: Branch containing changes (e.g., 'feature-branch' or 'username:branch' for forks)
base: Branch to merge into (e.g., 'main')
body: PR description in Markdown
draft: Create as draft PR
Returns:
Created PR details with number and URL
Examples:
- create_pull_request("owner", "repo", "Add feature X", "feature-x", "main")
- create_pull_request("owner", "repo", "WIP: New feature", "wip-branch", "develop", draft=True)
"""
data = {
"title": title,
"head": head,
"base": base,
"draft": draft
}
if body:
data["body"] = body
result = await github_request("POST", f"/repos/{owner}/{repo}/pulls", json_data=data)
if result.get("error"):
return result["message"]
return f"""# Pull Request Created
**Number:** #{result['number']}
**Title:** {result['title']}
**Branch:** `{head}` → `{base}`
**Draft:** {'Yes' if draft else 'No'}
**URL:** {result['html_url']}
The pull request has been created successfully."""
@mcp.tool()
async def list_pr_files(
owner: str,
repo: str,
pr_number: int,
per_page: int = 50,
format: str = "markdown"
) -> str:
"""
List files changed in a pull request.
Args:
owner: Repository owner
repo: Repository name
pr_number: Pull request number
per_page: Number of files to return (1-100)
format: Response format - 'json' or 'markdown'
Returns:
List of changed files with additions, deletions, and status
"""
result = await github_request(
"GET",
f"/repos/{owner}/{repo}/pulls/{pr_number}/files",
params={"per_page": min(per_page, 100)}
)
if result.get("error"):
return result["message"]
if format == "json":
return truncate_content(str({
"count": len(result),
"files": [
{
"filename": f["filename"],
"status": f["status"],
"additions": f["additions"],
"deletions": f["deletions"],
"changes": f["changes"]
}
for f in result
]
}))
# Markdown format
total_add = sum(f["additions"] for f in result)
total_del = sum(f["deletions"] for f in result)
lines = [
f"# Changed Files in PR #{pr_number}",
f"**Total:** {len(result)} files | **+{total_add}** | **-{total_del}**",
"",
"| Status | File | Changes |",
"|--------|------|---------|"
]
status_icons = {
"added": "🟢",
"removed": "🔴",
"modified": "🟡",
"renamed": "🔵",
"copied": "⚪"
}
for f in result:
icon = status_icons.get(f["status"], "⚪")
changes = f"+{f['additions']}/-{f['deletions']}"
lines.append(f"| {icon} {f['status']} | `{f['filename']}` | {changes} |")
return "\n".join(lines)
@mcp.tool()
async def merge_pull_request(
owner: str,
repo: str,
pr_number: int,
merge_method: str = "merge",
commit_title: Optional[str] = None,
commit_message: Optional[str] = None
) -> str:
"""
Merge a pull request.
Args:
owner: Repository owner
repo: Repository name
pr_number: Pull request number
merge_method: Merge method - 'merge', 'squash', or 'rebase'
commit_title: Custom merge commit title (optional)
commit_message: Custom merge commit message (optional)
Returns:
Merge result with SHA and confirmation
Note: Requires write access to the repository and PR must be mergeable.
"""
data = {"merge_method": merge_method}
if commit_title:
data["commit_title"] = commit_title
if commit_message:
data["commit_message"] = commit_message
result = await github_request("PUT", f"/repos/{owner}/{repo}/pulls/{pr_number}/merge", json_data=data)
if result.get("error"):
return result["message"]
if result.get("merged"):
return f"""# Pull Request Merged
**PR:** #{pr_number}
**Method:** {merge_method}
**SHA:** `{result.get('sha', 'unknown')[:7]}`
**Message:** {result.get('message', 'Merged successfully')}
The pull request has been merged into the base branch."""
else:
return f"Merge failed: {result.get('message', 'Unknown error')}"
# ============================================================================
# BRANCH & COMMIT TOOLS
# ============================================================================
@mcp.tool()
async def list_branches(
owner: str,
repo: str,
per_page: int = 30,
format: str = "markdown"
) -> str:
"""
List branches in a repository.
Args:
owner: Repository owner
repo: Repository name
per_page: Number of branches to return (1-100)
format: Response format - 'json' or 'markdown'
Returns:
List of branches with protection status
"""
result = await github_request(
"GET",
f"/repos/{owner}/{repo}/branches",
params={"per_page": min(per_page, 100)}
)
if result.get("error"):
return result["message"]
if format == "json":
return truncate_content(str({
"count": len(result),
"branches": [
{
"name": b["name"],
"protected": b.get("protected", False),
"sha": b["commit"]["sha"][:7]
}
for b in result
]
}))
# Markdown format
lines = [
f"# Branches: {owner}/{repo}",
f"**Total:** {len(result)}",
"",
"| Branch | Protected | Latest Commit |",
"|--------|-----------|---------------|"
]
for b in result:
protected = "🔒 Yes" if b.get("protected") else "No"
lines.append(f"| `{b['name']}` | {protected} | `{b['commit']['sha'][:7]}` |")
return "\n".join(lines)
@mcp.tool()
async def get_commits(
owner: str,
repo: str,
sha: Optional[str] = None,
path: Optional[str] = None,
author: Optional[str] = None,
per_page: int = 20,
format: str = "markdown"
) -> str:
"""
Get commit history for a repository.
Args:
owner: Repository owner
repo: Repository name
sha: Branch name, tag, or commit SHA to start from
path: Filter commits touching this file path
author: Filter by author username or email
per_page: Number of commits (1-100)
format: Response format - 'json' or 'markdown'
Returns:
List of commits with messages, authors, and timestamps
"""
params = {"per_page": min(per_page, 100)}
if sha:
params["sha"] = sha
if path:
params["path"] = path
if author:
params["author"] = author
result = await github_request("GET", f"/repos/{owner}/{repo}/commits", params=params)
if result.get("error"):
return result["message"]
if format == "json":
return truncate_content(str({
"count": len(result),
"commits": [
{
"sha": c["sha"][:7],
"message": c["commit"]["message"].split("\n")[0][:100],
"author": c["commit"]["author"]["name"],
"date": c["commit"]["author"]["date"],
"url": c["html_url"]
}
for c in result
]
}))
# Markdown format
lines = [
f"# Commit History: {owner}/{repo}",
f"**Showing:** {len(result)} commits"
]
if sha:
lines.append(f"**Branch/Ref:** `{sha}`")
if path:
lines.append(f"**Path:** `{path}`")
lines.append("")
for c in result:
message = c["commit"]["message"].split("\n")[0][:80]
author = c["commit"]["author"]["name"]
date = format_datetime(c["commit"]["author"]["date"])
lines.append(f"- **`{c['sha'][:7]}`** {message}")
lines.append(f" - {author} • {date}")
lines.append("")
return truncate_content("\n".join(lines))
@mcp.tool()
async def compare_commits(
owner: str,
repo: str,
base: str,
head: str,
format: str = "markdown"
) -> str:
"""
Compare two commits, branches, or tags.
Args:
owner: Repository owner
repo: Repository name
base: Base branch/tag/SHA
head: Head branch/tag/SHA to compare
format: Response format - 'json' or 'markdown'
Returns:
Comparison showing commits ahead/behind and file changes
Examples:
- compare_commits("owner", "repo", "main", "feature-branch")
- compare_commits("owner", "repo", "v1.0.0", "v2.0.0")
"""
result = await github_request("GET", f"/repos/{owner}/{repo}/compare/{base}...{head}")
if result.get("error"):
return result["message"]
if format == "json":
return truncate_content(str({
"status": result.get("status"),
"ahead_by": result.get("ahead_by", 0),
"behind_by": result.get("behind_by", 0),
"total_commits": len(result.get("commits", [])),
"files_changed": len(result.get("files", [])),
"commits": [
{
"sha": c["sha"][:7],
"message": c["commit"]["message"].split("\n")[0][:100]
}
for c in result.get("commits", [])[:20]
],
"files": [
{
"filename": f["filename"],
"status": f["status"],
"additions": f["additions"],
"deletions": f["deletions"]
}
for f in result.get("files", [])[:30]
]
}))
# Markdown format
lines = [
f"# Comparing: `{base}` ← `{head}`",
"",
f"**Status:** {result.get('status', 'unknown')}",
f"**Ahead by:** {result.get('ahead_by', 0)} commits",
f"**Behind by:** {result.get('behind_by', 0)} commits",
f"**Files changed:** {len(result.get('files', []))}",
"",
"## Commits",
""
]
for c in result.get("commits", [])[:15]:
message = c["commit"]["message"].split("\n")[0][:70]
lines.append(f"- `{c['sha'][:7]}` {message}")
if len(result.get("commits", [])) > 15:
lines.append(f"- ... and {len(result['commits']) - 15} more commits")
lines.extend(["", "## Changed Files", ""])
for f in result.get("files", [])[:20]:
lines.append(f"- `{f['filename']}` ({f['status']}) +{f['additions']}/-{f['deletions']}")
if len(result.get("files", [])) > 20:
lines.append(f"- ... and {len(result['files']) - 20} more files")
lines.extend(["", f"[View comparison on GitHub]({result.get('html_url', '')})"])
return truncate_content("\n".join(lines))
# ============================================================================
# ACTIONS TOOLS
# ============================================================================
@mcp.tool()
async def list_workflows(
owner: str,
repo: str,
format: str = "markdown"
) -> str:
"""
List GitHub Actions workflows in a repository.
Args:
owner: Repository owner
repo: Repository name
format: Response format - 'json' or 'markdown'
Returns:
List of workflow files with their states
"""
result = await github_request("GET", f"/repos/{owner}/{repo}/actions/workflows")
if result.get("error"):
return result["message"]
workflows = result.get("workflows", [])
if format == "json":
return truncate_content(str({
"count": len(workflows),
"workflows": [
{
"id": w["id"],
"name": w["name"],
"path": w["path"],
"state": w["state"],
"url": w["html_url"]
}
for w in workflows
]
}))
# Markdown format
lines = [
f"# Workflows: {owner}/{repo}",
f"**Total:** {len(workflows)}",
"",
"| Name | Path | State |",
"|------|------|-------|"
]
state_icons = {"active": "🟢", "disabled_manually": "⚪", "disabled_inactivity": "🟡"}
for w in workflows:
icon = state_icons.get(w["state"], "⚪")
lines.append(f"| {w['name']} | `{w['path']}` | {icon} {w['state']} |")
return "\n".join(lines)
@mcp.tool()
async def get_workflow_runs(
owner: str,
repo: str,
workflow_id: Optional[int] = None,
branch: Optional[str] = None,
status: Optional[str] = None,
per_page: int = 10,
format: str = "markdown"
) -> str:
"""
Get recent workflow runs.
Args:
owner: Repository owner
repo: Repository name
workflow_id: Filter by specific workflow ID (optional)
branch: Filter by branch name (optional)
status: Filter by status - 'completed', 'in_progress', 'queued', etc.
per_page: Number of runs (1-100)
format: Response format - 'json' or 'markdown'
Returns:
List of workflow runs with status, timing, and conclusions
"""
if workflow_id:
endpoint = f"/repos/{owner}/{repo}/actions/workflows/{workflow_id}/runs"
else:
endpoint = f"/repos/{owner}/{repo}/actions/runs"
params = {"per_page": min(per_page, 100)}
if branch:
params["branch"] = branch
if status:
params["status"] = status
result = await github_request("GET", endpoint, params=params)
if result.get("error"):
return result["message"]
runs = result.get("workflow_runs", [])
if format == "json":
return truncate_content(str({
"count": len(runs),
"runs": [
{
"id": r["id"],
"name": r["name"],
"status": r["status"],
"conclusion": r.get("conclusion"),
"branch": r["head_branch"],
"event": r["event"],
"created_at": r["created_at"],
"url": r["html_url"]
}
for r in runs
]
}))
# Markdown format
lines = [
f"# Workflow Runs: {owner}/{repo}",
f"**Showing:** {len(runs)} runs",
""
]
conclusion_icons = {
"success": "✅",
"failure": "❌",
"cancelled": "⚪",
"skipped": "⏭️",
"timed_out": "⏱️",
None: "🔄"
}
for r in runs:
icon = conclusion_icons.get(r.get("conclusion"), "🔄")
conclusion = r.get("conclusion") or r["status"]
lines.append(f"### {icon} {r['name']}")
lines.append(f"- **Status:** {conclusion} | **Branch:** `{r['head_branch']}`")
lines.append(f"- **Trigger:** {r['event']} | **Started:** {format_datetime(r['created_at'])}")
lines.append(f"- [View Run]({r['html_url']})")
lines.append("")
return truncate_content("\n".join(lines))
@mcp.tool()
async def trigger_workflow(
owner: str,
repo: str,
workflow_id: int,
ref: str,
inputs: Optional[dict[str, str]] = None
) -> str:
"""
Trigger a workflow dispatch event.
Args:
owner: Repository owner
repo: Repository name
workflow_id: Workflow ID (from list_workflows)
ref: Branch or tag to run the workflow on
inputs: Workflow input parameters (if workflow accepts inputs)
Returns:
Confirmation of workflow dispatch
Note: Workflow must have 'workflow_dispatch' trigger configured.
"""
data = {"ref": ref}
if inputs:
data["inputs"] = inputs
result = await github_request(
"POST",
f"/repos/{owner}/{repo}/actions/workflows/{workflow_id}/dispatches",
json_data=data
)
if result.get("error"):
return result["message"]
return f"""# Workflow Triggered
**Workflow ID:** {workflow_id}
**Branch/Tag:** `{ref}`
**Inputs:** {inputs if inputs else 'None'}
The workflow has been triggered. Check get_workflow_runs() for status."""
# ============================================================================
# CODE SEARCH TOOL
# ============================================================================
@mcp.tool()
async def search_code(
query: str,
per_page: int = 20,
format: str = "markdown"
) -> str:
"""
Search for code across GitHub repositories.
Args:
query: Search query with optional qualifiers
Examples: 'addClass repo:jquery/jquery'
'auth user:defunkt language:ruby'
'import requests language:python'
per_page: Number of results (1-100)
format: Response format - 'json' or 'markdown'
Returns:
Code matches with file paths and repository info
Note: Requires authentication. Only searches indexed code.
"""
params = {
"q": query,
"per_page": min(per_page, 100)
}
result = await github_request("GET", "/search/code", params=params)
if result.get("error"):
return result["message"]
items = result.get("items", [])
total = result.get("total_count", 0)
if format == "json":
return truncate_content(str({
"total_count": total,
"showing": len(items),
"results": [
{
"repository": item["repository"]["full_name"],
"path": item["path"],
"name": item["name"],
"url": item["html_url"]
}
for item in items
]
}))
# Markdown format
lines = [
f"# Code Search Results",
f"**Query:** `{query}`",
f"**Total matches:** {total:,}",
""
]
for item in items:
repo = item["repository"]["full_name"]
path = item["path"]
lines.append(f"- **{repo}**: [`{path}`]({item['html_url']})")
return truncate_content("\n".join(lines))
# ============================================================================
# USER TOOLS
# ============================================================================
@mcp.tool()
async def get_user(
username: str,
format: str = "markdown"
) -> str:
"""
Get information about a GitHub user or organization.
Args:
username: GitHub username or organization name
format: Response format - 'json' or 'markdown'
Returns:
User/org profile with stats and bio
"""
result = await github_request("GET", f"/users/{username}")
if result.get("error"):
return result["message"]
if format == "json":
return truncate_content(str({
"login": result["login"],
"name": result.get("name"),
"type": result["type"],
"bio": result.get("bio"),
"company": result.get("company"),
"location": result.get("location"),
"email": result.get("email"),
"blog": result.get("blog"),
"twitter_username": result.get("twitter_username"),
"public_repos": result["public_repos"],
"public_gists": result.get("public_gists", 0),
"followers": result["followers"],
"following": result["following"],
"created_at": result["created_at"],
"url": result["html_url"]
}))
# Markdown format
user_type = "Organization" if result["type"] == "Organization" else "User"
lines = [
f"# {result.get('name') or result['login']}",
f"**@{result['login']}** • {user_type}",
""
]
if result.get("bio"):
lines.extend([f"> {result['bio']}", ""])
lines.extend([
"## Stats",
f"| Metric | Value |",
f"|--------|-------|",
f"| Public Repos | {result['public_repos']} |",
f"| Followers | {result['followers']} |",
f"| Following | {result['following']} |",
""
])
details = []
if result.get("company"):
details.append(f"**Company:** {result['company']}")
if result.get("location"):
details.append(f"**Location:** {result['location']}")
if result.get("blog"):
details.append(f"**Blog:** {result['blog']}")
if result.get("twitter_username"):
details.append(f"**Twitter:** @{result['twitter_username']}")
if details:
lines.extend(["## Details"] + details + [""])
lines.append(f"**Member since:** {format_datetime(result['created_at'])}")
lines.append(f"\n[View Profile]({result['html_url']})")
return "\n".join(lines)
@mcp.tool()
async def list_user_repos(
username: str,
type: str = "owner",
sort: str = "updated",
direction: str = "desc",
per_page: int = 20,
format: str = "markdown"
) -> str:
"""
List repositories for a user or organization.
Args:
username: GitHub username or organization
type: Filter by type - 'owner', 'member', or 'all'
sort: Sort by 'created', 'updated', 'pushed', or 'full_name'
direction: Sort direction - 'asc' or 'desc'
per_page: Number of repos (1-100)
format: Response format - 'json' or 'markdown'
Returns:
List of repositories with stats
"""
params = {
"type": type,
"sort": sort,
"direction": direction,
"per_page": min(per_page, 100)
}
result = await github_request("GET", f"/users/{username}/repos", params=params)
if result.get("error"):
return result["message"]
if format == "json":
return truncate_content(str({
"count": len(result),
"repositories": [
{
"name": r["name"],
"full_name": r["full_name"],
"description": r.get("description"),
"stars": r["stargazers_count"],
"forks": r["forks_count"],
"language": r.get("language"),
"updated_at": r["updated_at"],
"url": r["html_url"]
}
for r in result
]
}))
# Markdown format
lines = [
f"# Repositories: {username}",
f"**Showing:** {len(result)} repos",
""
]
for r in result:
lines.append(f"## [{r['name']}]({r['html_url']})")
if r.get("description"):
lines.append(f"> {r['description'][:100]}")
lines.append(f"- ⭐ {r['stargazers_count']} | 🍴 {r['forks_count']} | {r.get('language', 'N/A')}")
lines.append(f"- Updated: {format_datetime(r['updated_at'])}")
lines.append("")
return truncate_content("\n".join(lines))
# ============================================================================
# SERVER ENTRY POINT
# ============================================================================
if __name__ == "__main__":
import sys
# Validate token on startup
try:
get_github_token()
print("GitHub MCP Server starting...", file=sys.stderr)
print("Token found. Ready to accept connections.", file=sys.stderr)
except ValueError as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
# Run the MCP server
mcp.run()