api.py•64 kB
"""GitHub API operations for MCP Git Server"""
import logging
from contextlib import asynccontextmanager
from typing import Any
from .client import get_github_client
logger = logging.getLogger(__name__)
@asynccontextmanager
async def github_client_context():
"""Async context manager for GitHub client with guaranteed resource cleanup."""
client = None
try:
client = get_github_client()
if not client:
raise ValueError(
"GitHub token not configured. Set GITHUB_TOKEN environment variable."
)
yield client
finally:
if client and client.session:
try:
await client.session.close()
except Exception as cleanup_error:
logger.warning(f"Error during client cleanup: {cleanup_error}")
class PatchMemoryManager:
"""Memory-aware patch content manager with configurable limits and streaming support."""
def __init__(self, max_patch_size: int = 1000, max_total_memory: int = 50000):
self.max_patch_size = max_patch_size
self.max_total_memory = max_total_memory
self.current_memory_usage = 0
self.patches_processed = 0
def can_include_patch(self, patch_size: int) -> bool:
"""Check if patch can be included within memory constraints."""
return (self.current_memory_usage + patch_size) <= self.max_total_memory
def process_patch(self, patch_content: str) -> tuple[str, bool]:
"""Process patch content with memory management and truncation.
Returns:
tuple[str, bool]: (processed_content, was_truncated)
"""
patch_size = len(patch_content)
self.patches_processed += 1
# Check memory budget first
if not self.can_include_patch(patch_size):
logger.warning(
f"Patch #{self.patches_processed} skipped: exceeds memory budget ({patch_size} bytes, {self.current_memory_usage}/{self.max_total_memory} used)"
)
return (
f"[Patch skipped - memory limit reached ({self.current_memory_usage}/{self.max_total_memory} bytes used)]",
True,
)
# Apply individual patch size limit
if patch_size > self.max_patch_size:
truncated_patch = patch_content[: self.max_patch_size]
self.current_memory_usage += self.max_patch_size
logger.info(
f"Patch #{self.patches_processed} truncated: {patch_size} -> {self.max_patch_size} bytes"
)
return (
f"```diff\n{truncated_patch}\n... [truncated {patch_size - self.max_patch_size} chars]\n```",
True,
)
else:
self.current_memory_usage += patch_size
return f"```diff\n{patch_content}\n```", False
async def github_get_pr_checks(
repo_owner: str,
repo_name: str,
pr_number: int,
status: str | None = None,
conclusion: str | None = None,
) -> str:
"""Get check runs for a pull request"""
try:
async with github_client_context() as client:
# First get the PR to get the head SHA
pr_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}"
)
if pr_response.status != 200:
return f"❌ Failed to get PR #{pr_number}: {pr_response.status}"
pr_data = await pr_response.json()
head_sha = pr_data["head"]["sha"]
# Get check runs for the head commit
params = {}
if status:
params["status"] = status
checks_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/commits/{head_sha}/check-runs",
params=params,
)
if checks_response.status != 200:
return f"❌ Failed to get check runs: {checks_response.status}"
checks_data = await checks_response.json()
# Filter by conclusion if specified
check_runs = checks_data.get("check_runs", [])
if conclusion:
check_runs = [
run for run in check_runs if run.get("conclusion") == conclusion
]
# Format the output
if not check_runs:
return f"No check runs found for PR #{pr_number}"
output = [f"Check runs for PR #{pr_number} (commit {head_sha[:8]}):\n"]
for run in check_runs:
status_emoji = {
"completed": "✅" if run.get("conclusion") == "success" else "❌",
"in_progress": "🔄",
"queued": "⏳",
}.get(run["status"], "❓")
output.append(f"{status_emoji} {run['name']}")
output.append(f" Status: {run['status']}")
if run.get("conclusion"):
output.append(f" Conclusion: {run['conclusion']}")
output.append(f" Started: {run.get('started_at', 'N/A')}")
if run.get("completed_at"):
output.append(f" Completed: {run['completed_at']}")
if run.get("html_url"):
output.append(f" URL: {run['html_url']}")
output.append("")
return "\n".join(output)
except ValueError as auth_error:
# Handle authentication/configuration errors specifically
logger.error(f"Authentication error getting PR checks: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
# Handle network connectivity issues
logger.error(f"Connection error getting PR checks: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
# Log unexpected errors with full context for debugging
logger.error(
f"Unexpected error getting PR checks for PR #{pr_number}: {e}",
exc_info=True,
)
return f"❌ Error getting PR checks: {str(e)}"
async def github_get_failing_jobs(
repo_owner: str,
repo_name: str,
pr_number: int,
include_logs: bool = True,
include_annotations: bool = True,
) -> str:
"""Get detailed information about failing jobs in a PR"""
try:
async with github_client_context() as client:
# Get PR details
pr_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}"
)
if pr_response.status != 200:
return f"❌ Failed to get PR #{pr_number}: {pr_response.status}"
pr_data = await pr_response.json()
head_sha = pr_data["head"]["sha"]
# Get check runs and filter for failures
checks_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/commits/{head_sha}/check-runs"
)
if checks_response.status != 200:
return f"❌ Failed to get check runs: {checks_response.status}"
checks_data = await checks_response.json()
failing_runs = [
run
for run in checks_data.get("check_runs", [])
if run["status"] == "completed"
and run.get("conclusion") in ["failure", "cancelled", "timed_out"]
]
if not failing_runs:
return f"No failing jobs found for PR #{pr_number}"
output = [f"Failing jobs for PR #{pr_number}:\n"]
for run in failing_runs:
output.append(f"❌ {run['name']}")
output.append(f" Conclusion: {run['conclusion']}")
output.append(f" Started: {run.get('started_at', 'N/A')}")
output.append(f" Completed: {run.get('completed_at', 'N/A')}")
# Get annotations if requested
if include_annotations and run.get("id"):
try:
annotations_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/check-runs/{run['id']}/annotations"
)
if annotations_response.status == 200:
annotations_data = await annotations_response.json()
if annotations_data:
output.append(" Annotations:")
for annotation in annotations_data[
:5
]: # Limit to first 5
output.append(
f" • {annotation.get('title', 'Error')}: {annotation.get('message', 'No message')}"
)
if annotation.get("path"):
output.append(
f" File: {annotation['path']} (line {annotation.get('start_line', 'unknown')})"
)
except (ConnectionError, ValueError) as annotation_error:
# Log specific annotation errors but continue processing
logger.warning(
f"Failed to get annotations for run {run.get('id')}: {annotation_error}"
)
except Exception as annotation_error:
# Annotations might not be available - log but continue
logger.debug(
f"Annotations unavailable for run {run.get('id')}: {annotation_error}"
)
# Get logs if requested (simplified)
if include_logs and run.get("html_url"):
output.append(f" Details: {run['html_url']}")
output.append("")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error getting failing jobs: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error getting failing jobs: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error getting failing jobs for PR #{pr_number}: {e}",
exc_info=True,
)
return f"❌ Error getting failing jobs: {str(e)}"
async def github_get_workflow_run(
repo_owner: str, repo_name: str, run_id: int, include_logs: bool = False
) -> str:
"""Get detailed workflow run information"""
try:
async with github_client_context() as client:
# Get workflow run details
run_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/actions/runs/{run_id}"
)
if run_response.status != 200:
return f"❌ Failed to get workflow run #{run_id}: {run_response.status}"
run_data = await run_response.json()
output = [f"Workflow Run #{run_id}:\n"]
output.append(f"Name: {run_data.get('name', 'N/A')}")
output.append(f"Status: {run_data.get('status', 'N/A')}")
output.append(f"Conclusion: {run_data.get('conclusion', 'N/A')}")
output.append(f"Branch: {run_data.get('head_branch', 'N/A')}")
output.append(f"Commit: {run_data.get('head_sha', 'N/A')[:8]}")
output.append(f"Started: {run_data.get('created_at', 'N/A')}")
output.append(f"Updated: {run_data.get('updated_at', 'N/A')}")
if run_data.get("html_url"):
output.append(f"URL: {run_data['html_url']}")
# Get jobs if available
jobs_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/actions/runs/{run_id}/jobs"
)
if jobs_response.status == 200:
jobs_data = await jobs_response.json()
jobs = jobs_data.get("jobs", [])
if jobs:
output.append("\nJobs:")
for job in jobs:
status_emoji = {
"completed": "✅"
if job.get("conclusion") == "success"
else "❌",
"in_progress": "🔄",
"queued": "⏳",
}.get(job["status"], "❓")
output.append(f" {status_emoji} {job['name']}")
output.append(f" Status: {job['status']}")
if job.get("conclusion"):
output.append(f" Conclusion: {job['conclusion']}")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error getting workflow run: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error getting workflow run: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error getting workflow run #{run_id}: {e}", exc_info=True
)
return f"❌ Error getting workflow run: {str(e)}"
async def github_get_pr_details(
repo_owner: str,
repo_name: str,
pr_number: int,
include_files: bool = False,
include_reviews: bool = False,
) -> str:
"""Get comprehensive PR details"""
try:
async with github_client_context() as client:
# Get PR details
pr_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}"
)
if pr_response.status != 200:
return f"❌ Failed to get PR #{pr_number}: {pr_response.status}"
pr_data = await pr_response.json()
output = [f"Pull Request #{pr_number}:\n"]
output.append(f"Title: {pr_data.get('title', 'N/A')}")
output.append(f"State: {pr_data.get('state', 'N/A')}")
output.append(f"Author: {pr_data.get('user', {}).get('login', 'N/A')}")
output.append(f"Base: {pr_data.get('base', {}).get('ref', 'N/A')}")
output.append(f"Head: {pr_data.get('head', {}).get('ref', 'N/A')}")
output.append(f"Created: {pr_data.get('created_at', 'N/A')}")
output.append(f"Updated: {pr_data.get('updated_at', 'N/A')}")
if pr_data.get("body"):
output.append(
f"\nDescription:\n{pr_data['body'][:500]}{'...' if len(pr_data['body']) > 500 else ''}"
)
if pr_data.get("html_url"):
output.append(f"\nURL: {pr_data['html_url']}")
# Get files if requested
if include_files:
try:
files_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/files"
)
if files_response.status == 200:
files_data = await files_response.json()
if files_data:
output.append(f"\nFiles ({len(files_data)}):")
for file in files_data[:10]: # Limit to first 10
output.append(
f" {file['status'][0].upper()} {file['filename']} (+{file['additions']}, -{file['deletions']})"
)
if len(files_data) > 10:
output.append(
f" ... and {len(files_data) - 10} more files"
)
except (ConnectionError, ValueError) as files_error:
logger.warning(
f"Failed to get files for PR #{pr_number}: {files_error}"
)
output.append("\n⚠️ Could not retrieve files information")
# Get reviews if requested
if include_reviews:
try:
reviews_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/reviews"
)
if reviews_response.status == 200:
reviews_data = await reviews_response.json()
if reviews_data:
output.append(f"\nReviews ({len(reviews_data)}):")
for review in reviews_data[-5:]: # Show last 5
state_emoji = {
"APPROVED": "✅",
"CHANGES_REQUESTED": "❌",
"COMMENTED": "💬",
}.get(review.get("state"), "❓")
output.append(
f" {state_emoji} {review.get('user', {}).get('login', 'N/A')}: {review.get('state', 'N/A')}"
)
except (ConnectionError, ValueError) as reviews_error:
logger.warning(
f"Failed to get reviews for PR #{pr_number}: {reviews_error}"
)
output.append("\n⚠️ Could not retrieve reviews information")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error getting PR details: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error getting PR details: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error getting PR details for PR #{pr_number}: {e}",
exc_info=True,
)
return f"❌ Error getting PR details: {str(e)}"
async def github_list_pull_requests(
repo_owner: str,
repo_name: str,
state: str = "open",
head: str | None = None,
base: str | None = None,
sort: str = "created",
direction: str = "desc",
per_page: int = 30,
page: int = 1,
) -> str:
"""List pull requests for a repository"""
logger.debug(f"🔍 Starting github_list_pull_requests for {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
logger.debug("✅ GitHub client obtained successfully")
logger.debug(
f"🔗 Token prefix: {client.token[:8]}..."
if client.token
else "No token"
)
params = {
"state": state,
"sort": sort,
"direction": direction,
"per_page": per_page,
"page": page,
}
if head:
params["head"] = head
if base:
params["base"] = base
logger.debug(
f"📡 Making API call to /repos/{repo_owner}/{repo_name}/pulls with params: {params}"
)
response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls", params=params
)
logger.debug(f"📨 GitHub API response status: {response.status}")
if response.status == 401:
response_text = await response.text()
logger.error(
f"🔒 GitHub API authentication failed (401): {response_text}"
)
return f"❌ GitHub API error 401: {response_text}"
elif response.status != 200:
response_text = await response.text()
logger.error(f"❌ GitHub API error {response.status}: {response_text}")
return f"❌ Failed to list pull requests: {response.status} - {response_text}"
prs = await response.json()
if not prs:
return f"No {state} pull requests found"
output = [f"{state.title()} Pull Requests for {repo_owner}/{repo_name}:\n"]
for pr in prs:
state_emoji = {"open": "🟢", "closed": "🔴", "merged": "🟣"}.get(
pr.get("state"), "❓"
)
output.append(f"{state_emoji} #{pr['number']}: {pr['title']}")
output.append(f" Author: {pr.get('user', {}).get('login', 'N/A')}")
base_ref = pr.get("base", {}).get("ref", "N/A")
head_ref = pr.get("head", {}).get("ref", "N/A")
output.append(f" Base: {base_ref} ← Head: {head_ref}")
output.append(f" Created: {pr.get('created_at', 'N/A')}")
output.append("")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error listing pull requests: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error listing pull requests: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error listing pull requests for {repo_owner}/{repo_name}: {e}",
exc_info=True,
)
return f"❌ Error listing pull requests: {str(e)}"
async def github_get_pr_status(repo_owner: str, repo_name: str, pr_number: int) -> str:
"""Get the status and check runs for a pull request"""
try:
async with github_client_context() as client:
# Get PR details
pr_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}"
)
if pr_response.status != 200:
return f"❌ Failed to get PR #{pr_number}: {pr_response.status}"
pr_data = await pr_response.json()
head_sha = pr_data["head"]["sha"]
output = [f"Status for PR #{pr_number}:\n"]
output.append(f"State: {pr_data.get('state', 'N/A')}")
output.append(f"Mergeable: {pr_data.get('mergeable', 'N/A')}")
output.append(f"Merge State: {pr_data.get('mergeable_state', 'N/A')}")
output.append("")
# Get check runs
checks_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/commits/{head_sha}/check-runs"
)
if checks_response.status == 200:
checks_data = await checks_response.json()
check_runs = checks_data.get("check_runs", [])
if check_runs:
output.append("Check Runs:")
for run in check_runs:
status_emoji = {
"completed": "✅"
if run.get("conclusion") == "success"
else "❌",
"in_progress": "🔄",
"queued": "⏳",
}.get(run["status"], "❓")
output.append(
f" {status_emoji} {run['name']}: {run['status']}"
)
if run.get("conclusion"):
output.append(f" Conclusion: {run['conclusion']}")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error getting PR status: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error getting PR status: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error getting PR status for PR #{pr_number}: {e}",
exc_info=True,
)
return f"❌ Error getting PR status: {str(e)}"
async def github_get_pr_files(
repo_owner: str,
repo_name: str,
pr_number: int,
per_page: int = 30,
page: int = 1,
include_patch: bool = False,
) -> str:
"""Get files changed in a pull request with memory-aware patch handling"""
try:
async with github_client_context() as client:
params = {"per_page": per_page, "page": page}
response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/files",
params=params,
)
if response.status != 200:
return f"❌ Failed to get PR files: {response.status}"
files = await response.json()
if not files:
return f"No files found for PR #{pr_number}"
output = [f"Files changed in PR #{pr_number}:\n"]
total_additions = 0
total_deletions = 0
# Initialize memory manager for patch processing
patch_manager = PatchMemoryManager(
max_patch_size=1000, max_total_memory=50000
)
for file in files:
status_emoji = {
"added": "➕",
"modified": "📝",
"removed": "➖",
"renamed": "📝",
}.get(file.get("status"), "❓")
additions = file.get("additions", 0)
deletions = file.get("deletions", 0)
total_additions += additions
total_deletions += deletions
output.append(
f"{status_emoji} {file['filename']} (+{additions}, -{deletions})"
)
if include_patch and file.get("patch"):
# Use memory manager to safely process patch content
processed_patch, was_truncated = patch_manager.process_patch(
file["patch"]
)
output.append(processed_patch)
if was_truncated:
logger.info(
f"Patch for {file['filename']} was truncated or skipped for memory management"
)
output.append("")
output.append(f"Total: +{total_additions}, -{total_deletions}")
# Add memory usage summary if patches were included
if include_patch:
output.append(
f"\nMemory usage: {patch_manager.current_memory_usage}/{patch_manager.max_total_memory} bytes"
)
output.append(f"Patches processed: {patch_manager.patches_processed}")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error getting PR files: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error getting PR files: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error getting PR files for PR #{pr_number}: {e}", exc_info=True
)
return f"❌ Error getting PR files: {str(e)}"
async def github_update_pr(
repo_owner: str,
repo_name: str,
pr_number: int,
title: str | None = None,
body: str | None = None,
state: str | None = None,
) -> str:
"""Update a pull request's title, body, or state."""
logger.debug(f"🚀 Updating PR #{pr_number} in {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
payload: dict[str, Any] = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if state is not None:
if state not in ["open", "closed"]:
return "❌ State must be 'open' or 'closed'"
payload["state"] = state
if not payload:
return "⚠️ No update parameters provided. Please specify title, body, or state."
response = await client.patch(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}", json=payload
)
if response.status != 200:
error_text = await response.text()
return f"❌ Failed to update PR #{pr_number}: {response.status} - {error_text}"
result = await response.json()
logger.info(f"✅ Successfully updated PR #{pr_number}")
return (
f"✅ Successfully updated PR #{result['number']}: {result['html_url']}"
)
except ValueError as auth_error:
logger.error(f"Authentication error updating PR: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error updating PR: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error updating PR #{pr_number}: {e}", exc_info=True)
return f"❌ Error updating PR: {str(e)}"
async def github_create_pr(
repo_owner: str,
repo_name: str,
title: str,
head: str,
base: str,
body: str | None = None,
draft: bool = False,
) -> str:
"""Create a new pull request."""
logger.debug(f"🚀 Creating PR in {repo_owner}/{repo_name} from {head} to {base}")
try:
async with github_client_context() as client:
payload = {"title": title, "head": head, "base": base, "draft": draft}
if body is not None:
payload["body"] = body
response = await client.post(
f"/repos/{repo_owner}/{repo_name}/pulls", json=payload
)
if response.status != 201:
error_text = await response.text()
# Provide more helpful error for common cases
if (
"No commits between" in error_text
or "A pull request already exists" in error_text
):
return f"❌ Could not create PR. Reason: {error_text}"
return f"❌ Failed to create PR: {response.status} - {error_text}"
result = await response.json()
logger.info(f"✅ Successfully created PR #{result['number']}")
return (
f"✅ Successfully created PR #{result['number']}: {result['html_url']}"
)
except ValueError as auth_error:
logger.error(f"Authentication error creating PR: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error creating PR: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error creating PR: {e}", exc_info=True)
return f"❌ Error creating PR: {str(e)}"
async def github_merge_pr(
repo_owner: str,
repo_name: str,
pr_number: int,
commit_title: str | None = None,
commit_message: str | None = None,
merge_method: str = "merge",
) -> str:
"""Merge a pull request."""
logger.debug(
f"🚀 Merging PR #{pr_number} in {repo_owner}/{repo_name} using '{merge_method}' method"
)
try:
async with github_client_context() as client:
if merge_method not in ["merge", "squash", "rebase"]:
return "❌ merge_method must be one of 'merge', 'squash', or 'rebase'"
payload = {"merge_method": merge_method}
if commit_title:
payload["commit_title"] = commit_title
if commit_message:
payload["commit_message"] = commit_message
response = await client.put(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/merge", json=payload
)
if response.status != 200:
error_text = await response.text()
if response.status in [405, 409]:
return f"❌ Could not merge PR. Reason: {error_text}. This may be due to merge conflicts or failing status checks."
return f"❌ Failed to merge PR: {response.status} - {error_text}"
result = await response.json()
if result.get("merged"):
logger.info(f"✅ Successfully merged PR #{pr_number}")
return f"✅ {result['message']}"
else:
logger.warning(
f"⚠️ Merge attempt for PR #{pr_number} returned 200 OK but 'merged' is false: {result.get('message')}"
)
return f"⚠️ {result.get('message', 'Merge was not successful but API returned 200 OK. Check PR status.')}"
except ValueError as auth_error:
logger.error(f"Authentication error merging PR: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error merging PR: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error merging PR #{pr_number}: {e}", exc_info=True)
return f"❌ Error merging PR: {str(e)}"
async def github_add_pr_comment(
repo_owner: str, repo_name: str, pr_number: int, body: str
) -> str:
"""Add a comment to a pull request."""
logger.debug(f"🚀 Adding comment to PR #{pr_number} in {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
# Comments are added to the corresponding issue
payload = {"body": body}
response = await client.post(
f"/repos/{repo_owner}/{repo_name}/issues/{pr_number}/comments",
json=payload,
)
if response.status != 201:
error_text = await response.text()
return f"❌ Failed to add comment: {response.status} - {error_text}"
result = await response.json()
logger.info(f"✅ Successfully added comment to PR #{pr_number}")
return f"✅ Successfully added comment: {result['html_url']}"
except ValueError as auth_error:
logger.error(f"Authentication error adding PR comment: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error adding PR comment: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error adding comment to PR #{pr_number}: {e}", exc_info=True
)
return f"❌ Error adding comment: {str(e)}"
async def github_close_pr(repo_owner: str, repo_name: str, pr_number: int) -> str:
"""Close a pull request."""
logger.debug(f"🚀 Closing PR #{pr_number} in {repo_owner}/{repo_name}")
return await github_update_pr(repo_owner, repo_name, pr_number, state="closed")
async def github_reopen_pr(repo_owner: str, repo_name: str, pr_number: int) -> str:
"""Reopen a closed pull request."""
logger.debug(f"🚀 Reopening PR #{pr_number} in {repo_owner}/{repo_name}")
return await github_update_pr(repo_owner, repo_name, pr_number, state="open")
# GitHub Issues API Functions
async def github_create_issue(
repo_owner: str,
repo_name: str,
title: str,
body: str | None = None,
labels: list[str] | None = None,
assignees: list[str] | None = None,
milestone: int | None = None,
) -> str:
"""Create a new GitHub issue."""
logger.debug(f"🚀 Creating issue in {repo_owner}/{repo_name}: {title}")
try:
async with github_client_context() as client:
payload: dict[str, Any] = {"title": title}
if body is not None:
payload["body"] = body
if labels is not None:
payload["labels"] = labels
if assignees is not None:
payload["assignees"] = assignees
if milestone is not None:
payload["milestone"] = milestone
response = await client.post(
f"/repos/{repo_owner}/{repo_name}/issues", json=payload
)
if response.status != 201:
error_text = await response.text()
return f"❌ Failed to create issue: {response.status} - {error_text}"
result = await response.json()
logger.info(f"✅ Successfully created issue #{result['number']}")
return f"✅ Successfully created issue #{result['number']}: {result['html_url']}"
except ValueError as auth_error:
logger.error(f"Authentication error creating issue: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error creating issue: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error creating issue: {e}", exc_info=True)
return f"❌ Error creating issue: {str(e)}"
async def github_list_issues(
repo_owner: str,
repo_name: str,
state: str = "open",
labels: list[str] | None = None,
assignee: str | None = None,
creator: str | None = None,
mentioned: str | None = None,
milestone: str | None = None,
sort: str = "created",
direction: str = "desc",
since: str | None = None,
per_page: int = 30,
page: int = 1,
) -> str:
"""List issues for a repository."""
logger.debug(f"🔍 Listing issues for {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
params = {
"state": state,
"sort": sort,
"direction": direction,
"per_page": per_page,
"page": page,
}
if labels:
params["labels"] = ",".join(labels)
if assignee:
params["assignee"] = assignee
if creator:
params["creator"] = creator
if mentioned:
params["mentioned"] = mentioned
if milestone:
params["milestone"] = milestone
if since:
params["since"] = since
response = await client.get(
f"/repos/{repo_owner}/{repo_name}/issues", params=params
)
if response.status != 200:
error_text = await response.text()
return f"❌ Failed to list issues: {response.status} - {error_text}"
issues = await response.json()
if not issues:
return f"No {state} issues found"
output = [f"{state.title()} Issues for {repo_owner}/{repo_name}:\n"]
for issue in issues:
# Skip pull requests (they appear in issues API but have 'pull_request' key)
if issue.get("pull_request"):
continue
state_emoji = {"open": "🟢", "closed": "🔴"}.get(
issue.get("state"), "❓"
)
output.append(f"{state_emoji} #{issue['number']}: {issue['title']}")
output.append(f" Author: {issue.get('user', {}).get('login', 'N/A')}")
# Show labels if any
if issue.get("labels"):
label_names = [label["name"] for label in issue["labels"]]
output.append(f" Labels: {', '.join(label_names)}")
# Show assignees if any
if issue.get("assignees"):
assignee_names = [
assignee["login"] for assignee in issue["assignees"]
]
output.append(f" Assignees: {', '.join(assignee_names)}")
output.append(f" Created: {issue.get('created_at', 'N/A')}")
output.append("")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error listing issues: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error listing issues: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error listing issues: {e}", exc_info=True)
return f"❌ Error listing issues: {str(e)}"
async def github_update_issue(
repo_owner: str,
repo_name: str,
issue_number: int,
title: str | None = None,
body: str | None = None,
state: str | None = None,
labels: list[str] | None = None,
assignees: list[str] | None = None,
milestone: int | None = None,
) -> str:
"""Update a GitHub issue."""
logger.debug(f"🚀 Updating issue #{issue_number} in {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
payload: dict[str, Any] = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if state is not None:
if state not in ["open", "closed"]:
return "❌ State must be 'open' or 'closed'"
payload["state"] = state
if labels is not None:
payload["labels"] = labels
if assignees is not None:
payload["assignees"] = assignees
if milestone is not None:
payload["milestone"] = milestone
if not payload:
return "⚠️ No update parameters provided. Please specify title, body, state, labels, assignees, or milestone."
response = await client.patch(
f"/repos/{repo_owner}/{repo_name}/issues/{issue_number}", json=payload
)
if response.status != 200:
error_text = await response.text()
return f"❌ Failed to update issue #{issue_number}: {response.status} - {error_text}"
result = await response.json()
logger.info(f"✅ Successfully updated issue #{issue_number}")
return f"✅ Successfully updated issue #{result['number']}: {result['html_url']}"
except ValueError as auth_error:
logger.error(f"Authentication error updating issue: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error updating issue: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error updating issue #{issue_number}: {e}", exc_info=True
)
return f"❌ Error updating issue: {str(e)}"
async def github_edit_pr_description(
repo_owner: str,
repo_name: str,
pr_number: int,
description: str,
) -> str:
"""Edit a pull request's description/body."""
logger.debug(f"🚀 Updating PR #{pr_number} description in {repo_owner}/{repo_name}")
# Use the existing github_update_pr function to update just the body
return await github_update_pr(
repo_owner=repo_owner,
repo_name=repo_name,
pr_number=pr_number,
body=description,
)
async def github_search_issues(
repo_owner: str,
repo_name: str,
query: str,
sort: str = "created",
order: str = "desc",
per_page: int = 30,
page: int = 1,
) -> str:
"""Search issues using GitHub's advanced search API.
Supports GitHub's search qualifiers like:
- is:issue is:open author:username
- label:bug label:"help wanted"
- created:2023-01-01..2023-12-31
- updated:>2023-06-01
- milestone:"v1.0" assignee:username
"""
logger.debug(f"🔍 Searching issues in {repo_owner}/{repo_name}: {query}")
try:
async with github_client_context() as client:
# Add repository scope to query
search_query = f"repo:{repo_owner}/{repo_name} is:issue {query}"
params = {
"q": search_query,
"sort": sort,
"order": order,
"per_page": per_page,
"page": page,
}
response = await client.get("/search/issues", params=params)
if response.status != 200:
error_text = await response.text()
return f"❌ Failed to search issues: {response.status} - {error_text}"
data = await response.json()
issues = data.get("items", [])
total_count = data.get("total_count", 0)
if not issues:
return f"No issues found matching query: {query}"
output = [f"Search Results for '{query}' in {repo_owner}/{repo_name}:\n"]
output.append(f"Found {total_count} total issues (showing page {page})\n")
for issue in issues:
# Skip pull requests
if issue.get("pull_request"):
continue
state_emoji = {"open": "🟢", "closed": "🔴"}.get(
issue.get("state"), "❓"
)
output.append(f"{state_emoji} #{issue['number']}: {issue['title']}")
output.append(f" Author: {issue.get('user', {}).get('login', 'N/A')}")
# Show labels
if issue.get("labels"):
label_names = [label["name"] for label in issue["labels"]]
output.append(f" Labels: {', '.join(label_names)}")
# Show assignees
if issue.get("assignees"):
assignee_names = [
assignee["login"] for assignee in issue["assignees"]
]
output.append(f" Assignees: {', '.join(assignee_names)}")
# Show milestone
if issue.get("milestone"):
output.append(f" Milestone: {issue['milestone']['title']}")
output.append(f" Created: {issue.get('created_at', 'N/A')}")
output.append(
f" Score: {issue.get('score', 'N/A')}"
) # Search relevance score
output.append("")
# Add pagination info
max_results = min(1000, total_count) # GitHub limits search to 1000 results
if total_count > len(issues):
max_page = (max_results + per_page - 1) // per_page
output.append(
f"📄 Page {page} of {max_page} (max {max_results} results from GitHub)"
)
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error searching issues: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error searching issues: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error searching issues: {e}", exc_info=True)
return f"❌ Error searching issues: {str(e)}"
async def github_create_issue_from_template(
repo_owner: str,
repo_name: str,
title: str,
template_name: str = "bug_report",
template_data: dict | None = None,
) -> str:
"""Create a GitHub issue using a predefined template.
Templates include:
- bug_report: Bug report with reproduction steps
- feature_request: Feature request with use cases
- question: Question or discussion starter
- custom: Use template_data to define custom format
"""
logger.debug(
f"🚀 Creating issue from template '{template_name}' in {repo_owner}/{repo_name}"
)
templates = {
"bug_report": {
"body": f"""## Bug Report
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
4. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Additional context**
Add any other context about the problem here.
**Environment**
- OS: [e.g. iOS]
- Browser [e.g. chrome, safari]
- Version [e.g. 22]
{template_data.get("additional_info", "") if template_data else ""}
""",
"labels": ["bug", "triage"],
},
"feature_request": {
"body": f"""## Feature Request
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.
{template_data.get("additional_info", "") if template_data else ""}
""",
"labels": ["enhancement", "feature-request"],
},
"question": {
"body": f"""## Question
**What would you like to know?**
Please describe your question clearly.
**Context**
Provide any relevant context that might help answer your question.
**What have you tried?**
Let us know what research or attempts you've already made.
{template_data.get("additional_info", "") if template_data else ""}
""",
"labels": ["question"],
},
}
if template_name == "custom" and template_data:
template = {
"body": template_data.get("body", ""),
"labels": template_data.get("labels", []),
}
else:
template = templates.get(template_name)
if not template:
available = ", ".join(templates.keys()) + ", custom"
return f"❌ Unknown template '{template_name}'. Available templates: {available}"
# Apply template data customizations
if template_data:
if "labels" in template_data:
template["labels"] = template["labels"] + template_data["labels"]
if "assignees" in template_data:
template["assignees"] = template_data["assignees"]
if "milestone" in template_data:
template["milestone"] = template_data["milestone"]
# Create issue using template
return await github_create_issue(
repo_owner=repo_owner,
repo_name=repo_name,
title=title,
body=template["body"],
labels=template.get("labels"),
assignees=template.get("assignees"),
milestone=template.get("milestone"),
)
async def github_bulk_update_issues(
repo_owner: str,
repo_name: str,
issue_numbers: list[int],
labels: list[str] | None = None,
assignees: list[str] | None = None,
milestone: int | None = None,
state: str | None = None,
) -> str:
"""Bulk update multiple issues with common properties.
Useful for:
- Adding labels to multiple issues
- Assigning multiple issues to same milestone
- Bulk closing/reopening issues
- Mass assignment operations
"""
logger.debug(
f"🚀 Bulk updating {len(issue_numbers)} issues in {repo_owner}/{repo_name}"
)
if not issue_numbers:
return "⚠️ No issue numbers provided for bulk update"
if not any([labels, assignees, milestone is not None, state]):
return "⚠️ No update parameters provided. Specify labels, assignees, milestone, or state"
results = []
successful_updates = 0
failed_updates = 0
for issue_number in issue_numbers:
try:
result = await github_update_issue(
repo_owner=repo_owner,
repo_name=repo_name,
issue_number=issue_number,
labels=labels,
assignees=assignees,
milestone=milestone,
state=state,
)
if result.startswith("✅"):
successful_updates += 1
results.append(f"✅ Issue #{issue_number}: Updated")
else:
failed_updates += 1
results.append(f"❌ Issue #{issue_number}: {result}")
except Exception as e:
failed_updates += 1
results.append(f"❌ Issue #{issue_number}: Error - {str(e)}")
# Summary
summary = [
f"Bulk Update Results for {len(issue_numbers)} issues:",
f"✅ Successful: {successful_updates}",
f"❌ Failed: {failed_updates}",
"",
]
# Detailed results (limit to first 10 for readability)
summary.append("Details:")
for result in results[:10]:
summary.append(f" {result}")
if len(results) > 10:
summary.append(f" ... and {len(results) - 10} more results")
return "\n".join(summary)
async def github_list_workflow_runs(
repo_owner: str,
repo_name: str,
workflow_id: str | None = None,
actor: str | None = None,
branch: str | None = None,
event: str | None = None,
status: str | None = None,
conclusion: str | None = None,
per_page: int = 30,
page: int = 1,
created: str | None = None,
exclude_pull_requests: bool = False,
check_suite_id: int | None = None,
head_sha: str | None = None,
) -> str:
"""List workflow runs for a repository with comprehensive filtering options.
This provides essential CI/CD monitoring capabilities for GitHub Actions workflows.
Args:
repo_owner: Repository owner/organization
repo_name: Repository name
workflow_id: Filter by specific workflow ID or filename (e.g., "ci.yml")
actor: Filter by GitHub username who triggered the run
branch: Filter by branch name
event: Filter by event type (push, pull_request, schedule, etc.)
status: Filter by run status (queued, in_progress, completed)
conclusion: Filter by conclusion (success, failure, neutral, cancelled, timed_out, action_required, stale)
per_page: Number of results per page (1-100, default: 30)
page: Page number to retrieve (default: 1)
created: Filter by creation date (ISO 8601 format or relative like >2023-01-01)
exclude_pull_requests: If true, exclude workflow runs triggered by pull requests
check_suite_id: Filter by specific check suite ID
head_sha: Filter by specific commit SHA
Returns:
Formatted string with workflow run information including status, conclusion,
timing, and links for CI/CD monitoring and debugging.
"""
logger.debug(f"🔍 Listing workflow runs for {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
# Build query parameters with validation
params = {
"per_page": min(max(per_page, 1), 100), # Enforce GitHub API limits
"page": max(page, 1),
}
# Add optional filters
if actor:
params["actor"] = actor
if branch:
params["branch"] = branch
if event:
params["event"] = event
if status and status in ["queued", "in_progress", "completed"]:
params["status"] = status
if conclusion and conclusion in [
"success",
"failure",
"neutral",
"cancelled",
"timed_out",
"action_required",
"stale",
]:
params["conclusion"] = conclusion
if created:
params["created"] = created
if exclude_pull_requests:
params["exclude_pull_requests"] = "true"
if check_suite_id:
params["check_suite_id"] = check_suite_id
if head_sha:
params["head_sha"] = head_sha
# Determine API endpoint - workflow-specific or repository-wide
if workflow_id:
# Get runs for specific workflow
endpoint = f"/repos/{repo_owner}/{repo_name}/actions/workflows/{workflow_id}/runs"
logger.debug(f"📡 Fetching workflow-specific runs: {workflow_id}")
else:
# Get all workflow runs for repository
endpoint = f"/repos/{repo_owner}/{repo_name}/actions/runs"
logger.debug("📡 Fetching all repository workflow runs")
logger.debug(f"📡 Making API call to {endpoint} with params: {params}")
response = await client.get(endpoint, params=params)
logger.debug(f"📨 GitHub API response status: {response.status}")
if response.status == 401:
response_text = await response.text()
logger.error(
f"🔒 GitHub API authentication failed (401): {response_text}"
)
return "❌ GitHub API authentication failed: Verify your GITHUB_TOKEN has Actions read permissions"
elif response.status == 404:
if workflow_id:
return f"❌ Workflow '{workflow_id}' not found in {repo_owner}/{repo_name}. Check workflow file name or ID."
else:
return f"❌ Repository {repo_owner}/{repo_name} not found or Actions not enabled"
elif response.status != 200:
response_text = await response.text()
logger.error(f"❌ GitHub API error {response.status}: {response_text}")
return f"❌ Failed to list workflow runs: {response.status} - {response_text}"
data = await response.json()
workflow_runs = data.get("workflow_runs", [])
if not workflow_runs:
filter_desc = (
f" (filtered by: {', '.join(f'{k}={v}' for k, v in params.items() if k not in ['per_page', 'page'])})"
if len(params) > 2
else ""
)
return (
f"No workflow runs found for {repo_owner}/{repo_name}{filter_desc}"
)
# Build formatted output
filter_info = []
if workflow_id:
filter_info.append(f"workflow: {workflow_id}")
if actor:
filter_info.append(f"actor: {actor}")
if branch:
filter_info.append(f"branch: {branch}")
if event:
filter_info.append(f"event: {event}")
if status:
filter_info.append(f"status: {status}")
if conclusion:
filter_info.append(f"conclusion: {conclusion}")
header = f"Workflow Runs for {repo_owner}/{repo_name}"
if filter_info:
header += f" ({', '.join(filter_info)})"
output = [f"{header}:\n"]
# Add summary statistics
total_count = data.get("total_count", len(workflow_runs))
if total_count > len(workflow_runs):
output.append(
f"Showing {len(workflow_runs)} of {total_count} total runs (page {page})\n"
)
# Group runs by status for quick overview
status_counts = {}
for run in workflow_runs:
run_status = run.get("status", "unknown")
status_counts[run_status] = status_counts.get(run_status, 0) + 1
if len(status_counts) > 1:
status_summary = ", ".join(
[f"{status}: {count}" for status, count in status_counts.items()]
)
output.append(f"Status summary: {status_summary}\n")
# Format individual workflow runs
for run in workflow_runs:
# Status and conclusion emojis
status_emoji = {
"completed": "✅" if run.get("conclusion") == "success" else "❌",
"in_progress": "🔄",
"queued": "⏳",
"requested": "📋",
"waiting": "⏸️",
}.get(run.get("status"), "❓")
# Enhanced status display
status_text = run.get("status", "unknown")
if run.get("conclusion"):
status_text += f" ({run['conclusion']})"
# Workflow name and run number
workflow_name = run.get("name", "Unknown Workflow")
run_number = run.get("run_number", "?")
output.append(f"{status_emoji} {workflow_name} #{run_number}")
output.append(f" ID: {run.get('id', 'N/A')}")
output.append(f" Status: {status_text}")
output.append(f" Branch: {run.get('head_branch', 'N/A')}")
output.append(f" Commit: {run.get('head_sha', 'N/A')[:8]}...")
output.append(f" Actor: {run.get('actor', {}).get('login', 'N/A')}")
output.append(f" Event: {run.get('event', 'N/A')}")
# Timing information
created_at = run.get("created_at", "N/A")
updated_at = run.get("updated_at", "N/A")
if created_at != "N/A":
output.append(f" Started: {created_at}")
if updated_at != "N/A" and updated_at != created_at:
output.append(f" Updated: {updated_at}")
# Duration calculation for completed runs
if (
run.get("status") == "completed"
and run.get("created_at")
and run.get("updated_at")
):
try:
from datetime import datetime
start = datetime.fromisoformat(
run["created_at"].replace("Z", "+00:00")
)
end = datetime.fromisoformat(
run["updated_at"].replace("Z", "+00:00")
)
duration = end - start
output.append(f" Duration: {duration}")
except Exception:
pass # Skip duration calculation if parsing fails
# Links for further investigation
if run.get("html_url"):
output.append(f" URL: {run['html_url']}")
output.append("")
# Add pagination info if applicable
if total_count > len(workflow_runs):
max_page = (total_count + per_page - 1) // per_page
output.append(
f"📄 Page {page} of {max_page} (use page parameter to see more)"
)
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error listing workflow runs: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error listing workflow runs: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error listing workflow runs: {e}", exc_info=True)
return f"❌ Error listing workflow runs: {str(e)}"