"""GitHub API operations for MCP Git Server"""
import asyncio
import json
import logging
import time
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Any
from .client import get_github_client
logger = logging.getLogger(__name__)
@asynccontextmanager
async def github_client_context():
"""Async context manager for GitHub client with guaranteed resource cleanup."""
client = None
try:
client = get_github_client()
if not client:
raise ValueError(
"GitHub token not configured. Set GITHUB_TOKEN environment variable."
)
yield client
finally:
if client and client.session:
try:
await client.session.close()
except Exception as cleanup_error:
logger.warning(f"Error during client cleanup: {cleanup_error}")
class PatchMemoryManager:
"""Memory-aware patch content manager with configurable limits and streaming support."""
def __init__(self, max_patch_size: int = 1000, max_total_memory: int = 50000):
self.max_patch_size = max_patch_size
self.max_total_memory = max_total_memory
self.current_memory_usage = 0
self.patches_processed = 0
def can_include_patch(self, patch_size: int) -> bool:
"""Check if patch can be included within memory constraints."""
return (self.current_memory_usage + patch_size) <= self.max_total_memory
def process_patch(self, patch_content: str) -> tuple[str, bool]:
"""Process patch content with memory management and truncation.
Returns:
tuple[str, bool]: (processed_content, was_truncated)
"""
patch_size = len(patch_content)
self.patches_processed += 1
# Check memory budget first
if not self.can_include_patch(patch_size):
logger.warning(
f"Patch #{self.patches_processed} skipped: exceeds memory budget ({patch_size} bytes, {self.current_memory_usage}/{self.max_total_memory} used)"
)
return (
f"[Patch skipped - memory limit reached ({self.current_memory_usage}/{self.max_total_memory} bytes used)]",
True,
)
# Apply individual patch size limit
if patch_size > self.max_patch_size:
truncated_patch = patch_content[: self.max_patch_size]
self.current_memory_usage += self.max_patch_size
logger.info(
f"Patch #{self.patches_processed} truncated: {patch_size} -> {self.max_patch_size} bytes"
)
return (
f"```diff\n{truncated_patch}\n... [truncated {patch_size - self.max_patch_size} chars]\n```",
True,
)
else:
self.current_memory_usage += patch_size
return f"```diff\n{patch_content}\n```", False
async def github_get_pr_checks(
repo_owner: str,
repo_name: str,
pr_number: int,
status: str | None = None,
conclusion: str | None = None,
) -> str:
"""Get check runs for a pull request"""
try:
async with github_client_context() as client:
# First get the PR to get the head SHA
pr_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}"
)
if pr_response.status != 200:
return f"❌ Failed to get PR #{pr_number}: {pr_response.status}"
pr_data = await pr_response.json()
head_sha = pr_data["head"]["sha"]
# Get check runs for the head commit
params = {}
if status:
params["status"] = status
checks_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/commits/{head_sha}/check-runs",
params=params,
)
if checks_response.status != 200:
return f"❌ Failed to get check runs: {checks_response.status}"
checks_data = await checks_response.json()
# Filter by conclusion if specified
check_runs = checks_data.get("check_runs", [])
if conclusion:
check_runs = [
run for run in check_runs if run.get("conclusion") == conclusion
]
# Format the output
if not check_runs:
return f"No check runs found for PR #{pr_number}"
output = [f"Check runs for PR #{pr_number} (commit {head_sha[:8]}):\n"]
for run in check_runs:
status_emoji = {
"completed": "✅" if run.get("conclusion") == "success" else "❌",
"in_progress": "🔄",
"queued": "⏳",
}.get(run["status"], "❓")
output.append(f"{status_emoji} {run['name']}")
output.append(f" Status: {run['status']}")
if run.get("conclusion"):
output.append(f" Conclusion: {run['conclusion']}")
output.append(f" Started: {run.get('started_at', 'N/A')}")
if run.get("completed_at"):
output.append(f" Completed: {run['completed_at']}")
if run.get("html_url"):
output.append(f" URL: {run['html_url']}")
output.append("")
return "\n".join(output)
except ValueError as auth_error:
# Handle authentication/configuration errors specifically
logger.error(f"Authentication error getting PR checks: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
# Handle network connectivity issues
logger.error(f"Connection error getting PR checks: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
# Log unexpected errors with full context for debugging
logger.error(
f"Unexpected error getting PR checks for PR #{pr_number}: {e}",
exc_info=True,
)
return f"❌ Error getting PR checks: {str(e)}"
async def github_get_failing_jobs(
repo_owner: str,
repo_name: str,
pr_number: int,
include_logs: bool = True,
include_annotations: bool = True,
) -> str:
"""Get detailed information about failing jobs in a PR"""
try:
async with github_client_context() as client:
# Get PR details
pr_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}"
)
if pr_response.status != 200:
return f"❌ Failed to get PR #{pr_number}: {pr_response.status}"
pr_data = await pr_response.json()
head_sha = pr_data["head"]["sha"]
# Get check runs and filter for failures
checks_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/commits/{head_sha}/check-runs"
)
if checks_response.status != 200:
return f"❌ Failed to get check runs: {checks_response.status}"
checks_data = await checks_response.json()
failing_runs = [
run
for run in checks_data.get("check_runs", [])
if run["status"] == "completed"
and run.get("conclusion") in ["failure", "cancelled", "timed_out"]
]
if not failing_runs:
return f"No failing jobs found for PR #{pr_number}"
output = [f"Failing jobs for PR #{pr_number}:\n"]
for run in failing_runs:
output.append(f"❌ {run['name']}")
output.append(f" Conclusion: {run['conclusion']}")
output.append(f" Started: {run.get('started_at', 'N/A')}")
output.append(f" Completed: {run.get('completed_at', 'N/A')}")
# Get annotations if requested
if include_annotations and run.get("id"):
try:
annotations_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/check-runs/{run['id']}/annotations"
)
if annotations_response.status == 200:
annotations_data = await annotations_response.json()
if annotations_data:
output.append(" Annotations:")
for annotation in annotations_data[
:5
]: # Limit to first 5
output.append(
f" • {annotation.get('title', 'Error')}: {annotation.get('message', 'No message')}"
)
if annotation.get("path"):
output.append(
f" File: {annotation['path']} (line {annotation.get('start_line', 'unknown')})"
)
except (ConnectionError, ValueError) as annotation_error:
# Log specific annotation errors but continue processing
logger.warning(
f"Failed to get annotations for run {run.get('id')}: {annotation_error}"
)
except Exception as annotation_error:
# Annotations might not be available - log but continue
logger.debug(
f"Annotations unavailable for run {run.get('id')}: {annotation_error}"
)
# Get logs if requested (simplified)
if include_logs and run.get("html_url"):
output.append(f" Details: {run['html_url']}")
output.append("")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error getting failing jobs: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error getting failing jobs: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error getting failing jobs for PR #{pr_number}: {e}",
exc_info=True,
)
return f"❌ Error getting failing jobs: {str(e)}"
async def github_get_workflow_run(
repo_owner: str, repo_name: str, run_id: int, include_logs: bool = False
) -> str:
"""Get detailed workflow run information"""
try:
async with github_client_context() as client:
# Get workflow run details
run_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/actions/runs/{run_id}"
)
if run_response.status != 200:
return f"❌ Failed to get workflow run #{run_id}: {run_response.status}"
run_data = await run_response.json()
output = [f"Workflow Run #{run_id}:\n"]
output.append(f"Name: {run_data.get('name', 'N/A')}")
output.append(f"Status: {run_data.get('status', 'N/A')}")
output.append(f"Conclusion: {run_data.get('conclusion', 'N/A')}")
output.append(f"Branch: {run_data.get('head_branch', 'N/A')}")
output.append(f"Commit: {run_data.get('head_sha', 'N/A')[:8]}")
output.append(f"Started: {run_data.get('created_at', 'N/A')}")
output.append(f"Updated: {run_data.get('updated_at', 'N/A')}")
if run_data.get("html_url"):
output.append(f"URL: {run_data['html_url']}")
# Get jobs if available
jobs_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/actions/runs/{run_id}/jobs"
)
if jobs_response.status == 200:
jobs_data = await jobs_response.json()
jobs = jobs_data.get("jobs", [])
if jobs:
output.append("\nJobs:")
for job in jobs:
status_emoji = {
"completed": "✅"
if job.get("conclusion") == "success"
else "❌",
"in_progress": "🔄",
"queued": "⏳",
}.get(job["status"], "❓")
output.append(f" {status_emoji} {job['name']}")
output.append(f" Status: {job['status']}")
if job.get("conclusion"):
output.append(f" Conclusion: {job['conclusion']}")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error getting workflow run: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error getting workflow run: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error getting workflow run #{run_id}: {e}", exc_info=True
)
return f"❌ Error getting workflow run: {str(e)}"
async def github_get_pr_details(
repo_owner: str,
repo_name: str,
pr_number: int,
include_files: bool = False,
include_reviews: bool = False,
) -> str:
"""Get comprehensive PR details"""
try:
async with github_client_context() as client:
# Get PR details
pr_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}"
)
if pr_response.status != 200:
return f"❌ Failed to get PR #{pr_number}: {pr_response.status}"
pr_data = await pr_response.json()
output = [f"Pull Request #{pr_number}:\n"]
output.append(f"Title: {pr_data.get('title', 'N/A')}")
output.append(f"State: {pr_data.get('state', 'N/A')}")
output.append(f"Author: {pr_data.get('user', {}).get('login', 'N/A')}")
output.append(f"Base: {pr_data.get('base', {}).get('ref', 'N/A')}")
output.append(f"Head: {pr_data.get('head', {}).get('ref', 'N/A')}")
output.append(f"Created: {pr_data.get('created_at', 'N/A')}")
output.append(f"Updated: {pr_data.get('updated_at', 'N/A')}")
if pr_data.get("body"):
output.append(
f"\nDescription:\n{pr_data['body'][:500]}{'...' if len(pr_data['body']) > 500 else ''}"
)
if pr_data.get("html_url"):
output.append(f"\nURL: {pr_data['html_url']}")
# Get files if requested
if include_files:
try:
files_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/files"
)
if files_response.status == 200:
files_data = await files_response.json()
if files_data:
output.append(f"\nFiles ({len(files_data)}):")
for file in files_data[:10]: # Limit to first 10
output.append(
f" {file['status'][0].upper()} {file['filename']} (+{file['additions']}, -{file['deletions']})"
)
if len(files_data) > 10:
output.append(
f" ... and {len(files_data) - 10} more files"
)
except (ConnectionError, ValueError) as files_error:
logger.warning(
f"Failed to get files for PR #{pr_number}: {files_error}"
)
output.append("\n⚠️ Could not retrieve files information")
# Get reviews if requested
if include_reviews:
try:
reviews_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/reviews"
)
if reviews_response.status == 200:
reviews_data = await reviews_response.json()
if reviews_data:
output.append(f"\nReviews ({len(reviews_data)}):")
for review in reviews_data[-5:]: # Show last 5
state_emoji = {
"APPROVED": "✅",
"CHANGES_REQUESTED": "❌",
"COMMENTED": "💬",
}.get(review.get("state"), "❓")
output.append(
f" {state_emoji} {review.get('user', {}).get('login', 'N/A')}: {review.get('state', 'N/A')}"
)
except (ConnectionError, ValueError) as reviews_error:
logger.warning(
f"Failed to get reviews for PR #{pr_number}: {reviews_error}"
)
output.append("\n⚠️ Could not retrieve reviews information")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error getting PR details: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error getting PR details: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error getting PR details for PR #{pr_number}: {e}",
exc_info=True,
)
return f"❌ Error getting PR details: {str(e)}"
async def github_list_pull_requests(
repo_owner: str,
repo_name: str,
state: str = "open",
head: str | None = None,
base: str | None = None,
sort: str = "created",
direction: str = "desc",
per_page: int = 30,
page: int = 1,
) -> str:
"""List pull requests for a repository"""
logger.debug(f"🔍 Starting github_list_pull_requests for {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
logger.debug("✅ GitHub client obtained successfully")
logger.debug(
f"🔗 Token prefix: {client.token[:8]}..."
if client.token
else "No token"
)
params = {
"state": state,
"sort": sort,
"direction": direction,
"per_page": per_page,
"page": page,
}
if head:
params["head"] = head
if base:
params["base"] = base
logger.debug(
f"📡 Making API call to /repos/{repo_owner}/{repo_name}/pulls with params: {params}"
)
response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls", params=params
)
logger.debug(f"📨 GitHub API response status: {response.status}")
if response.status == 401:
response_text = await response.text()
logger.error(
f"🔒 GitHub API authentication failed (401): {response_text}"
)
return f"❌ GitHub API error 401: {response_text}"
elif response.status != 200:
response_text = await response.text()
logger.error(f"❌ GitHub API error {response.status}: {response_text}")
return f"❌ Failed to list pull requests: {response.status} - {response_text}"
prs = await response.json()
if not prs:
return f"No {state} pull requests found"
output = [f"{state.title()} Pull Requests for {repo_owner}/{repo_name}:\n"]
for pr in prs:
state_emoji = {"open": "🟢", "closed": "🔴", "merged": "🟣"}.get(
pr.get("state"), "❓"
)
output.append(f"{state_emoji} #{pr['number']}: {pr['title']}")
output.append(f" Author: {pr.get('user', {}).get('login', 'N/A')}")
base_ref = pr.get("base", {}).get("ref", "N/A")
head_ref = pr.get("head", {}).get("ref", "N/A")
output.append(f" Base: {base_ref} ← Head: {head_ref}")
output.append(f" Created: {pr.get('created_at', 'N/A')}")
output.append("")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error listing pull requests: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error listing pull requests: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error listing pull requests for {repo_owner}/{repo_name}: {e}",
exc_info=True,
)
return f"❌ Error listing pull requests: {str(e)}"
async def github_get_pr_status(repo_owner: str, repo_name: str, pr_number: int) -> str:
"""Get the status and check runs for a pull request"""
try:
async with github_client_context() as client:
# Get PR details
pr_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}"
)
if pr_response.status != 200:
return f"❌ Failed to get PR #{pr_number}: {pr_response.status}"
pr_data = await pr_response.json()
head_sha = pr_data["head"]["sha"]
output = [f"Status for PR #{pr_number}:\n"]
output.append(f"State: {pr_data.get('state', 'N/A')}")
output.append(f"Mergeable: {pr_data.get('mergeable', 'N/A')}")
output.append(f"Merge State: {pr_data.get('mergeable_state', 'N/A')}")
output.append("")
# Get check runs
checks_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/commits/{head_sha}/check-runs"
)
if checks_response.status == 200:
checks_data = await checks_response.json()
check_runs = checks_data.get("check_runs", [])
if check_runs:
output.append("Check Runs:")
for run in check_runs:
status_emoji = {
"completed": "✅"
if run.get("conclusion") == "success"
else "❌",
"in_progress": "🔄",
"queued": "⏳",
}.get(run["status"], "❓")
output.append(
f" {status_emoji} {run['name']}: {run['status']}"
)
if run.get("conclusion"):
output.append(f" Conclusion: {run['conclusion']}")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error getting PR status: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error getting PR status: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error getting PR status for PR #{pr_number}: {e}",
exc_info=True,
)
return f"❌ Error getting PR status: {str(e)}"
async def github_get_pr_files(
repo_owner: str,
repo_name: str,
pr_number: int,
per_page: int = 30,
page: int = 1,
include_patch: bool = False,
) -> str:
"""Get files changed in a pull request with memory-aware patch handling"""
try:
async with github_client_context() as client:
params = {"per_page": per_page, "page": page}
response = await client.get(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/files",
params=params,
)
if response.status != 200:
return f"❌ Failed to get PR files: {response.status}"
files = await response.json()
if not files:
return f"No files found for PR #{pr_number}"
output = [f"Files changed in PR #{pr_number}:\n"]
total_additions = 0
total_deletions = 0
# Initialize memory manager for patch processing
patch_manager = PatchMemoryManager(
max_patch_size=1000, max_total_memory=50000
)
for file in files:
status_emoji = {
"added": "➕",
"modified": "📝",
"removed": "➖",
"renamed": "📝",
}.get(file.get("status"), "❓")
additions = file.get("additions", 0)
deletions = file.get("deletions", 0)
total_additions += additions
total_deletions += deletions
output.append(
f"{status_emoji} {file['filename']} (+{additions}, -{deletions})"
)
if include_patch and file.get("patch"):
# Use memory manager to safely process patch content
processed_patch, was_truncated = patch_manager.process_patch(
file["patch"]
)
output.append(processed_patch)
if was_truncated:
logger.info(
f"Patch for {file['filename']} was truncated or skipped for memory management"
)
output.append("")
output.append(f"Total: +{total_additions}, -{total_deletions}")
# Add memory usage summary if patches were included
if include_patch:
output.append(
f"\nMemory usage: {patch_manager.current_memory_usage}/{patch_manager.max_total_memory} bytes"
)
output.append(f"Patches processed: {patch_manager.patches_processed}")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error getting PR files: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error getting PR files: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error getting PR files for PR #{pr_number}: {e}", exc_info=True
)
return f"❌ Error getting PR files: {str(e)}"
async def github_update_pr(
repo_owner: str,
repo_name: str,
pr_number: int,
title: str | None = None,
body: str | None = None,
state: str | None = None,
) -> str:
"""Update a pull request's title, body, or state."""
logger.debug(f"🚀 Updating PR #{pr_number} in {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
payload: dict[str, Any] = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if state is not None:
if state not in ["open", "closed"]:
return "❌ State must be 'open' or 'closed'"
payload["state"] = state
if not payload:
return "⚠️ No update parameters provided. Please specify title, body, or state."
response = await client.patch(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}", json=payload
)
if response.status != 200:
error_text = await response.text()
return f"❌ Failed to update PR #{pr_number}: {response.status} - {error_text}"
result = await response.json()
logger.info(f"✅ Successfully updated PR #{pr_number}")
return (
f"✅ Successfully updated PR #{result['number']}: {result['html_url']}"
)
except ValueError as auth_error:
logger.error(f"Authentication error updating PR: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error updating PR: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error updating PR #{pr_number}: {e}", exc_info=True)
return f"❌ Error updating PR: {str(e)}"
async def github_create_pr(
repo_owner: str,
repo_name: str,
title: str,
head: str,
base: str,
body: str | None = None,
draft: bool = False,
) -> str:
"""Create a new pull request."""
logger.debug(f"🚀 Creating PR in {repo_owner}/{repo_name} from {head} to {base}")
try:
async with github_client_context() as client:
payload = {"title": title, "head": head, "base": base, "draft": draft}
if body is not None:
payload["body"] = body
response = await client.post(
f"/repos/{repo_owner}/{repo_name}/pulls", json=payload
)
if response.status != 201:
error_text = await response.text()
# Provide more helpful error for common cases
if (
"No commits between" in error_text
or "A pull request already exists" in error_text
):
return f"❌ Could not create PR. Reason: {error_text}"
return f"❌ Failed to create PR: {response.status} - {error_text}"
result = await response.json()
logger.info(f"✅ Successfully created PR #{result['number']}")
return (
f"✅ Successfully created PR #{result['number']}: {result['html_url']}"
)
except ValueError as auth_error:
logger.error(f"Authentication error creating PR: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error creating PR: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error creating PR: {e}", exc_info=True)
return f"❌ Error creating PR: {str(e)}"
async def github_merge_pr(
repo_owner: str,
repo_name: str,
pr_number: int,
commit_title: str | None = None,
commit_message: str | None = None,
merge_method: str = "merge",
) -> str:
"""Merge a pull request."""
logger.debug(
f"🚀 Merging PR #{pr_number} in {repo_owner}/{repo_name} using '{merge_method}' method"
)
try:
async with github_client_context() as client:
if merge_method not in ["merge", "squash", "rebase"]:
return "❌ merge_method must be one of 'merge', 'squash', or 'rebase'"
payload = {"merge_method": merge_method}
if commit_title:
payload["commit_title"] = commit_title
if commit_message:
payload["commit_message"] = commit_message
response = await client.put(
f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/merge", json=payload
)
if response.status != 200:
error_text = await response.text()
if response.status in [405, 409]:
return f"❌ Could not merge PR. Reason: {error_text}. This may be due to merge conflicts or failing status checks."
return f"❌ Failed to merge PR: {response.status} - {error_text}"
result = await response.json()
if result.get("merged"):
logger.info(f"✅ Successfully merged PR #{pr_number}")
return f"✅ {result['message']}"
else:
logger.warning(
f"⚠️ Merge attempt for PR #{pr_number} returned 200 OK but 'merged' is false: {result.get('message')}"
)
return f"⚠️ {result.get('message', 'Merge was not successful but API returned 200 OK. Check PR status.')}"
except ValueError as auth_error:
logger.error(f"Authentication error merging PR: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error merging PR: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error merging PR #{pr_number}: {e}", exc_info=True)
return f"❌ Error merging PR: {str(e)}"
async def github_add_pr_comment(
repo_owner: str, repo_name: str, pr_number: int, body: str
) -> str:
"""Add a comment to a pull request."""
logger.debug(f"🚀 Adding comment to PR #{pr_number} in {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
# Comments are added to the corresponding issue
payload = {"body": body}
response = await client.post(
f"/repos/{repo_owner}/{repo_name}/issues/{pr_number}/comments",
json=payload,
)
if response.status != 201:
error_text = await response.text()
return f"❌ Failed to add comment: {response.status} - {error_text}"
result = await response.json()
logger.info(f"✅ Successfully added comment to PR #{pr_number}")
return f"✅ Successfully added comment: {result['html_url']}"
except ValueError as auth_error:
logger.error(f"Authentication error adding PR comment: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error adding PR comment: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error adding comment to PR #{pr_number}: {e}", exc_info=True
)
return f"❌ Error adding comment: {str(e)}"
async def github_close_pr(repo_owner: str, repo_name: str, pr_number: int) -> str:
"""Close a pull request."""
logger.debug(f"🚀 Closing PR #{pr_number} in {repo_owner}/{repo_name}")
return await github_update_pr(repo_owner, repo_name, pr_number, state="closed")
async def github_reopen_pr(repo_owner: str, repo_name: str, pr_number: int) -> str:
"""Reopen a closed pull request."""
logger.debug(f"🚀 Reopening PR #{pr_number} in {repo_owner}/{repo_name}")
return await github_update_pr(repo_owner, repo_name, pr_number, state="open")
# GitHub Issues API Functions
async def github_create_issue(
repo_owner: str,
repo_name: str,
title: str,
body: str | None = None,
labels: list[str] | None = None,
assignees: list[str] | None = None,
milestone: int | None = None,
) -> str:
"""Create a new GitHub issue."""
logger.debug(f"🚀 Creating issue in {repo_owner}/{repo_name}: {title}")
try:
async with github_client_context() as client:
payload: dict[str, Any] = {"title": title}
if body is not None:
payload["body"] = body
if labels is not None:
payload["labels"] = labels
if assignees is not None:
payload["assignees"] = assignees
if milestone is not None:
payload["milestone"] = milestone
response = await client.post(
f"/repos/{repo_owner}/{repo_name}/issues", json=payload
)
if response.status != 201:
error_text = await response.text()
return f"❌ Failed to create issue: {response.status} - {error_text}"
result = await response.json()
logger.info(f"✅ Successfully created issue #{result['number']}")
return f"✅ Successfully created issue #{result['number']}: {result['html_url']}"
except ValueError as auth_error:
logger.error(f"Authentication error creating issue: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error creating issue: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error creating issue: {e}", exc_info=True)
return f"❌ Error creating issue: {str(e)}"
async def github_list_issues(
repo_owner: str,
repo_name: str,
state: str = "open",
labels: list[str] | None = None,
assignee: str | None = None,
creator: str | None = None,
mentioned: str | None = None,
milestone: str | None = None,
sort: str = "created",
direction: str = "desc",
since: str | None = None,
per_page: int = 30,
page: int = 1,
) -> str:
"""List issues for a repository."""
logger.debug(f"🔍 Listing issues for {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
params = {
"state": state,
"sort": sort,
"direction": direction,
"per_page": per_page,
"page": page,
}
if labels:
params["labels"] = ",".join(labels)
if assignee:
params["assignee"] = assignee
if creator:
params["creator"] = creator
if mentioned:
params["mentioned"] = mentioned
if milestone:
params["milestone"] = milestone
if since:
params["since"] = since
response = await client.get(
f"/repos/{repo_owner}/{repo_name}/issues", params=params
)
if response.status != 200:
error_text = await response.text()
return f"❌ Failed to list issues: {response.status} - {error_text}"
issues = await response.json()
if not issues:
return f"No {state} issues found"
output = [f"{state.title()} Issues for {repo_owner}/{repo_name}:\n"]
for issue in issues:
# Skip pull requests (they appear in issues API but have 'pull_request' key)
if issue.get("pull_request"):
continue
state_emoji = {"open": "🟢", "closed": "🔴"}.get(
issue.get("state"), "❓"
)
output.append(f"{state_emoji} #{issue['number']}: {issue['title']}")
output.append(f" Author: {issue.get('user', {}).get('login', 'N/A')}")
# Show labels if any
if issue.get("labels"):
label_names = [label["name"] for label in issue["labels"]]
output.append(f" Labels: {', '.join(label_names)}")
# Show assignees if any
if issue.get("assignees"):
assignee_names = [
assignee["login"] for assignee in issue["assignees"]
]
output.append(f" Assignees: {', '.join(assignee_names)}")
output.append(f" Created: {issue.get('created_at', 'N/A')}")
output.append("")
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error listing issues: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error listing issues: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error listing issues: {e}", exc_info=True)
return f"❌ Error listing issues: {str(e)}"
async def github_update_issue(
repo_owner: str,
repo_name: str,
issue_number: int,
title: str | None = None,
body: str | None = None,
state: str | None = None,
labels: list[str] | None = None,
assignees: list[str] | None = None,
milestone: int | None = None,
) -> str:
"""Update a GitHub issue."""
logger.debug(f"🚀 Updating issue #{issue_number} in {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
payload: dict[str, Any] = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if state is not None:
if state not in ["open", "closed"]:
return "❌ State must be 'open' or 'closed'"
payload["state"] = state
if labels is not None:
payload["labels"] = labels
if assignees is not None:
payload["assignees"] = assignees
if milestone is not None:
payload["milestone"] = milestone
if not payload:
return "⚠️ No update parameters provided. Please specify title, body, state, labels, assignees, or milestone."
response = await client.patch(
f"/repos/{repo_owner}/{repo_name}/issues/{issue_number}", json=payload
)
if response.status != 200:
error_text = await response.text()
return f"❌ Failed to update issue #{issue_number}: {response.status} - {error_text}"
result = await response.json()
logger.info(f"✅ Successfully updated issue #{issue_number}")
return f"✅ Successfully updated issue #{result['number']}: {result['html_url']}"
except ValueError as auth_error:
logger.error(f"Authentication error updating issue: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error updating issue: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error updating issue #{issue_number}: {e}", exc_info=True
)
return f"❌ Error updating issue: {str(e)}"
async def github_edit_pr_description(
repo_owner: str,
repo_name: str,
pr_number: int,
description: str,
) -> str:
"""Edit a pull request's description/body."""
logger.debug(f"🚀 Updating PR #{pr_number} description in {repo_owner}/{repo_name}")
# Use the existing github_update_pr function to update just the body
return await github_update_pr(
repo_owner=repo_owner,
repo_name=repo_name,
pr_number=pr_number,
body=description,
)
async def github_search_issues(
repo_owner: str,
repo_name: str,
query: str,
sort: str = "created",
order: str = "desc",
per_page: int = 30,
page: int = 1,
) -> str:
"""Search issues using GitHub's advanced search API.
Supports GitHub's search qualifiers like:
- is:issue is:open author:username
- label:bug label:"help wanted"
- created:2023-01-01..2023-12-31
- updated:>2023-06-01
- milestone:"v1.0" assignee:username
"""
logger.debug(f"🔍 Searching issues in {repo_owner}/{repo_name}: {query}")
try:
async with github_client_context() as client:
# Add repository scope to query
search_query = f"repo:{repo_owner}/{repo_name} is:issue {query}"
params = {
"q": search_query,
"sort": sort,
"order": order,
"per_page": per_page,
"page": page,
}
response = await client.get("/search/issues", params=params)
if response.status != 200:
error_text = await response.text()
return f"❌ Failed to search issues: {response.status} - {error_text}"
data = await response.json()
issues = data.get("items", [])
total_count = data.get("total_count", 0)
if not issues:
return f"No issues found matching query: {query}"
output = [f"Search Results for '{query}' in {repo_owner}/{repo_name}:\n"]
output.append(f"Found {total_count} total issues (showing page {page})\n")
for issue in issues:
# Skip pull requests
if issue.get("pull_request"):
continue
state_emoji = {"open": "🟢", "closed": "🔴"}.get(
issue.get("state"), "❓"
)
output.append(f"{state_emoji} #{issue['number']}: {issue['title']}")
output.append(f" Author: {issue.get('user', {}).get('login', 'N/A')}")
# Show labels
if issue.get("labels"):
label_names = [label["name"] for label in issue["labels"]]
output.append(f" Labels: {', '.join(label_names)}")
# Show assignees
if issue.get("assignees"):
assignee_names = [
assignee["login"] for assignee in issue["assignees"]
]
output.append(f" Assignees: {', '.join(assignee_names)}")
# Show milestone
if issue.get("milestone"):
output.append(f" Milestone: {issue['milestone']['title']}")
output.append(f" Created: {issue.get('created_at', 'N/A')}")
output.append(
f" Score: {issue.get('score', 'N/A')}"
) # Search relevance score
output.append("")
# Add pagination info
max_results = min(1000, total_count) # GitHub limits search to 1000 results
if total_count > len(issues):
max_page = (max_results + per_page - 1) // per_page
output.append(
f"📄 Page {page} of {max_page} (max {max_results} results from GitHub)"
)
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error searching issues: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error searching issues: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error searching issues: {e}", exc_info=True)
return f"❌ Error searching issues: {str(e)}"
async def github_create_issue_from_template(
repo_owner: str,
repo_name: str,
title: str,
template_name: str = "bug_report",
template_data: dict | None = None,
) -> str:
"""Create a GitHub issue using a predefined template.
Templates include:
- bug_report: Bug report with reproduction steps
- feature_request: Feature request with use cases
- question: Question or discussion starter
- custom: Use template_data to define custom format
"""
logger.debug(
f"🚀 Creating issue from template '{template_name}' in {repo_owner}/{repo_name}"
)
templates = {
"bug_report": {
"body": f"""## Bug Report
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
4. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Additional context**
Add any other context about the problem here.
**Environment**
- OS: [e.g. iOS]
- Browser [e.g. chrome, safari]
- Version [e.g. 22]
{template_data.get("additional_info", "") if template_data else ""}
""",
"labels": ["bug", "triage"],
},
"feature_request": {
"body": f"""## Feature Request
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.
{template_data.get("additional_info", "") if template_data else ""}
""",
"labels": ["enhancement", "feature-request"],
},
"question": {
"body": f"""## Question
**What would you like to know?**
Please describe your question clearly.
**Context**
Provide any relevant context that might help answer your question.
**What have you tried?**
Let us know what research or attempts you've already made.
{template_data.get("additional_info", "") if template_data else ""}
""",
"labels": ["question"],
},
}
if template_name == "custom" and template_data:
template = {
"body": template_data.get("body", ""),
"labels": template_data.get("labels", []),
}
else:
template = templates.get(template_name)
if not template:
available = ", ".join(templates.keys()) + ", custom"
return f"❌ Unknown template '{template_name}'. Available templates: {available}"
# Apply template data customizations
if template_data:
if "labels" in template_data:
template["labels"] = template["labels"] + template_data["labels"]
if "assignees" in template_data:
template["assignees"] = template_data["assignees"]
if "milestone" in template_data:
template["milestone"] = template_data["milestone"]
# Create issue using template
return await github_create_issue(
repo_owner=repo_owner,
repo_name=repo_name,
title=title,
body=template["body"],
labels=template.get("labels"),
assignees=template.get("assignees"),
milestone=template.get("milestone"),
)
async def github_bulk_update_issues(
repo_owner: str,
repo_name: str,
issue_numbers: list[int],
labels: list[str] | None = None,
assignees: list[str] | None = None,
milestone: int | None = None,
state: str | None = None,
) -> str:
"""Bulk update multiple issues with common properties.
Useful for:
- Adding labels to multiple issues
- Assigning multiple issues to same milestone
- Bulk closing/reopening issues
- Mass assignment operations
"""
logger.debug(
f"🚀 Bulk updating {len(issue_numbers)} issues in {repo_owner}/{repo_name}"
)
if not issue_numbers:
return "⚠️ No issue numbers provided for bulk update"
if not any([labels, assignees, milestone is not None, state]):
return "⚠️ No update parameters provided. Specify labels, assignees, milestone, or state"
results = []
successful_updates = 0
failed_updates = 0
for issue_number in issue_numbers:
try:
result = await github_update_issue(
repo_owner=repo_owner,
repo_name=repo_name,
issue_number=issue_number,
labels=labels,
assignees=assignees,
milestone=milestone,
state=state,
)
if result.startswith("✅"):
successful_updates += 1
results.append(f"✅ Issue #{issue_number}: Updated")
else:
failed_updates += 1
results.append(f"❌ Issue #{issue_number}: {result}")
except Exception as e:
failed_updates += 1
results.append(f"❌ Issue #{issue_number}: Error - {str(e)}")
# Summary
summary = [
f"Bulk Update Results for {len(issue_numbers)} issues:",
f"✅ Successful: {successful_updates}",
f"❌ Failed: {failed_updates}",
"",
]
# Detailed results (limit to first 10 for readability)
summary.append("Details:")
for result in results[:10]:
summary.append(f" {result}")
if len(results) > 10:
summary.append(f" ... and {len(results) - 10} more results")
return "\n".join(summary)
async def github_await_workflow_completion(
repo_owner: str,
repo_name: str,
run_id: int | None = None,
timeout_minutes: int = 15,
poll_interval_seconds: int = 20,
) -> str:
"""Monitor a GitHub Actions workflow run until completion.
This tool allows Claude Code to wait for CI runs to complete, enabling
automated CI response workflows. When a workflow run fails, it automatically
fetches failure details.
Args:
repo_owner: Repository owner/organization
repo_name: Repository name
run_id: Specific workflow run ID to monitor. If None, monitors the latest run.
timeout_minutes: Maximum time to wait in minutes (default: 15)
poll_interval_seconds: Time between status checks in seconds (default: 20)
Returns:
JSON-formatted string with workflow run results including:
- status: "success", "failure", or "timeout"
- conclusion: GitHub's conclusion value
- run_id: The workflow run ID that was monitored
- run_url: Direct link to the workflow run
- duration_seconds: How long the run took
- failed_jobs: List of jobs that failed (if any)
- logs_note: URL to view detailed logs (for failed runs)
"""
logger.debug(
f"Awaiting workflow completion for {repo_owner}/{repo_name}, run_id={run_id}"
)
try:
async with github_client_context() as client:
# If no run_id provided, get the latest run
if run_id is None:
logger.debug("No run_id provided, fetching latest workflow run...")
response = await client.get(
f"/repos/{repo_owner}/{repo_name}/actions/runs", params={"per_page": 1}
)
if response.status != 200:
error_text = await response.text()
return f"Failed to get latest workflow run: {response.status} - {error_text}"
data = await response.json()
workflow_runs = data.get("workflow_runs", [])
if not workflow_runs:
return f"No workflow runs found for {repo_owner}/{repo_name}"
# Safely extract run_id
run_id = workflow_runs[0].get("id")
if run_id is None:
return "Latest workflow run has no ID"
logger.info(f"Using latest workflow run ID: {run_id}")
# Start polling
start_time = time.time()
timeout_seconds = timeout_minutes * 60
poll_count = 0
logger.info(
f"Starting to monitor run #{run_id} (timeout: {timeout_minutes}m, poll interval: {poll_interval_seconds}s)"
)
while True:
poll_count += 1
elapsed_time = time.time() - start_time
# Check for timeout
if elapsed_time >= timeout_seconds:
# Cleanup any pending operations before timeout
logger.info(f"Cleaning up resources after {elapsed_time:.1f}s of monitoring")
logger.warning(
f"Timeout reached after {elapsed_time:.1f}s ({poll_count} polls)"
)
timeout_result = {
"status": "timeout",
"run_id": run_id,
"run_url": f"https://github.com/{repo_owner}/{repo_name}/actions/runs/{run_id}",
"elapsed_seconds": elapsed_time,
"message": f"Workflow run did not complete within {timeout_minutes} minutes. Consider increasing timeout_minutes for very long-running workflows (max: 350 minutes).",
"polls_performed": poll_count
}
return json.dumps(timeout_result, indent=2)
# Get workflow run status
logger.debug(f"Poll #{poll_count}: Fetching run status...")
run_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/actions/runs/{run_id}"
)
if run_response.status != 200:
error_text = await run_response.text()
return f"Failed to get workflow run #{run_id}: {run_response.status} - {error_text}"
run_data = await run_response.json()
run_status = run_data.get("status")
run_conclusion = run_data.get("conclusion")
logger.debug(
f"Poll #{poll_count}: status={run_status}, conclusion={run_conclusion}"
)
# Check if run is complete
if run_status == "completed":
logger.info(
f"Workflow run completed with conclusion: {run_conclusion}"
)
# Calculate duration
created_at = run_data.get("created_at")
updated_at = run_data.get("updated_at")
duration_seconds = 0
if created_at and updated_at:
try:
start_dt = datetime.fromisoformat(
created_at.replace("Z", "+00:00")
)
end_dt = datetime.fromisoformat(
updated_at.replace("Z", "+00:00")
)
duration_seconds = (end_dt - start_dt).total_seconds()
except Exception as e:
logger.debug(f"Could not calculate duration: {e}")
# Prepare basic response
result = {
"status": "success"
if run_conclusion == "success"
else "failure",
"conclusion": run_conclusion,
"run_id": run_id,
"run_url": run_data.get("html_url"),
"duration_seconds": duration_seconds,
"workflow_name": run_data.get("name"),
"head_branch": run_data.get("head_branch"),
"head_sha": run_data.get("head_sha", "")[:8],
}
# If run failed, get failed jobs and logs
if run_conclusion != "success":
logger.debug("Fetching failed jobs...")
jobs_response = await client.get(
f"/repos/{repo_owner}/{repo_name}/actions/runs/{run_id}/jobs"
)
if jobs_response.status == 200:
jobs_data = await jobs_response.json()
failed_jobs = []
for job in jobs_data.get("jobs", []):
if (
job.get("status") == "completed"
and job.get("conclusion") != "success"
):
failed_job_info = {
"name": job.get("name"),
"conclusion": job.get("conclusion"),
"html_url": job.get("html_url"),
}
# Get failed steps
failed_steps = [
step["name"]
for step in job.get("steps", [])
if step.get("conclusion") == "failure"
]
if failed_steps:
failed_job_info["failed_steps"] = failed_steps
failed_jobs.append(failed_job_info)
result["failed_jobs"] = failed_jobs
# Try to get logs summary (truncated)
jobs_list = jobs_data.get("jobs", [])
if failed_jobs and len(jobs_list) > 0:
logger.debug("Fetching failure logs summary...")
# Get logs for first job in the list
first_job = jobs_list[0]
if first_job.get("id"):
try:
# Note: GitHub API doesn't provide direct log text access via REST API
# We'll include a note about where to find logs
result[
"logs_note"
] = f"View detailed logs at: {first_job.get('html_url')}"
except Exception as log_error:
logger.debug(
f"Could not fetch logs: {log_error}"
)
# Return JSON result
return json.dumps(result, indent=2)
# Not complete yet, wait before next poll
logger.debug(f"Workflow still {run_status}, waiting {poll_interval_seconds}s before next poll...")
await asyncio.sleep(poll_interval_seconds)
except ValueError as auth_error:
logger.error(f"Authentication error awaiting workflow completion: {auth_error}")
return f"Authentication error: {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error awaiting workflow completion: {conn_error}")
return f"Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(
f"Unexpected error awaiting workflow completion: {e}", exc_info=True
)
return f"Error awaiting workflow completion: {str(e)}"
async def github_list_workflow_runs(
repo_owner: str,
repo_name: str,
workflow_id: str | None = None,
actor: str | None = None,
branch: str | None = None,
event: str | None = None,
status: str | None = None,
conclusion: str | None = None,
per_page: int = 30,
page: int = 1,
created: str | None = None,
exclude_pull_requests: bool = False,
check_suite_id: int | None = None,
head_sha: str | None = None,
) -> str:
"""List workflow runs for a repository with comprehensive filtering options.
This provides essential CI/CD monitoring capabilities for GitHub Actions workflows.
Args:
repo_owner: Repository owner/organization
repo_name: Repository name
workflow_id: Filter by specific workflow ID or filename (e.g., "ci.yml")
actor: Filter by GitHub username who triggered the run
branch: Filter by branch name
event: Filter by event type (push, pull_request, schedule, etc.)
status: Filter by run status (queued, in_progress, completed)
conclusion: Filter by conclusion (success, failure, neutral, cancelled, timed_out, action_required, stale)
per_page: Number of results per page (1-100, default: 30)
page: Page number to retrieve (default: 1)
created: Filter by creation date (ISO 8601 format or relative like >2023-01-01)
exclude_pull_requests: If true, exclude workflow runs triggered by pull requests
check_suite_id: Filter by specific check suite ID
head_sha: Filter by specific commit SHA
Returns:
Formatted string with workflow run information including status, conclusion,
timing, and links for CI/CD monitoring and debugging.
"""
logger.debug(f"🔍 Listing workflow runs for {repo_owner}/{repo_name}")
try:
async with github_client_context() as client:
# Build query parameters with validation
params = {
"per_page": min(max(per_page, 1), 100), # Enforce GitHub API limits
"page": max(page, 1),
}
# Add optional filters
if actor:
params["actor"] = actor
if branch:
params["branch"] = branch
if event:
params["event"] = event
if status and status in ["queued", "in_progress", "completed"]:
params["status"] = status
if conclusion and conclusion in [
"success",
"failure",
"neutral",
"cancelled",
"timed_out",
"action_required",
"stale",
]:
params["conclusion"] = conclusion
if created:
params["created"] = created
if exclude_pull_requests:
params["exclude_pull_requests"] = "true"
if check_suite_id:
params["check_suite_id"] = check_suite_id
if head_sha:
params["head_sha"] = head_sha
# Determine API endpoint - workflow-specific or repository-wide
if workflow_id:
# Get runs for specific workflow
endpoint = f"/repos/{repo_owner}/{repo_name}/actions/workflows/{workflow_id}/runs"
logger.debug(f"📡 Fetching workflow-specific runs: {workflow_id}")
else:
# Get all workflow runs for repository
endpoint = f"/repos/{repo_owner}/{repo_name}/actions/runs"
logger.debug("📡 Fetching all repository workflow runs")
logger.debug(f"📡 Making API call to {endpoint} with params: {params}")
response = await client.get(endpoint, params=params)
logger.debug(f"📨 GitHub API response status: {response.status}")
if response.status == 401:
response_text = await response.text()
logger.error(
f"🔒 GitHub API authentication failed (401): {response_text}"
)
return "❌ GitHub API authentication failed: Verify your GITHUB_TOKEN has Actions read permissions"
elif response.status == 404:
if workflow_id:
return f"❌ Workflow '{workflow_id}' not found in {repo_owner}/{repo_name}. Check workflow file name or ID."
else:
return f"❌ Repository {repo_owner}/{repo_name} not found or Actions not enabled"
elif response.status != 200:
response_text = await response.text()
logger.error(f"❌ GitHub API error {response.status}: {response_text}")
return f"❌ Failed to list workflow runs: {response.status} - {response_text}"
data = await response.json()
workflow_runs = data.get("workflow_runs", [])
if not workflow_runs:
filter_desc = (
f" (filtered by: {', '.join(f'{k}={v}' for k, v in params.items() if k not in ['per_page', 'page'])})"
if len(params) > 2
else ""
)
return (
f"No workflow runs found for {repo_owner}/{repo_name}{filter_desc}"
)
# Build formatted output
filter_info = []
if workflow_id:
filter_info.append(f"workflow: {workflow_id}")
if actor:
filter_info.append(f"actor: {actor}")
if branch:
filter_info.append(f"branch: {branch}")
if event:
filter_info.append(f"event: {event}")
if status:
filter_info.append(f"status: {status}")
if conclusion:
filter_info.append(f"conclusion: {conclusion}")
header = f"Workflow Runs for {repo_owner}/{repo_name}"
if filter_info:
header += f" ({', '.join(filter_info)})"
output = [f"{header}:\n"]
# Add summary statistics
total_count = data.get("total_count", len(workflow_runs))
if total_count > len(workflow_runs):
output.append(
f"Showing {len(workflow_runs)} of {total_count} total runs (page {page})\n"
)
# Group runs by status for quick overview
status_counts = {}
for run in workflow_runs:
run_status = run.get("status", "unknown")
status_counts[run_status] = status_counts.get(run_status, 0) + 1
if len(status_counts) > 1:
status_summary = ", ".join(
[f"{status}: {count}" for status, count in status_counts.items()]
)
output.append(f"Status summary: {status_summary}\n")
# Format individual workflow runs
for run in workflow_runs:
# Status and conclusion emojis
status_emoji = {
"completed": "✅" if run.get("conclusion") == "success" else "❌",
"in_progress": "🔄",
"queued": "⏳",
"requested": "📋",
"waiting": "⏸️",
}.get(run.get("status"), "❓")
# Enhanced status display
status_text = run.get("status", "unknown")
if run.get("conclusion"):
status_text += f" ({run['conclusion']})"
# Workflow name and run number
workflow_name = run.get("name", "Unknown Workflow")
run_number = run.get("run_number", "?")
output.append(f"{status_emoji} {workflow_name} #{run_number}")
output.append(f" ID: {run.get('id', 'N/A')}")
output.append(f" Status: {status_text}")
output.append(f" Branch: {run.get('head_branch', 'N/A')}")
output.append(f" Commit: {run.get('head_sha', 'N/A')[:8]}...")
output.append(f" Actor: {run.get('actor', {}).get('login', 'N/A')}")
output.append(f" Event: {run.get('event', 'N/A')}")
# Timing information
created_at = run.get("created_at", "N/A")
updated_at = run.get("updated_at", "N/A")
if created_at != "N/A":
output.append(f" Started: {created_at}")
if updated_at != "N/A" and updated_at != created_at:
output.append(f" Updated: {updated_at}")
# Duration calculation for completed runs
if (
run.get("status") == "completed"
and run.get("created_at")
and run.get("updated_at")
):
try:
from datetime import datetime
start = datetime.fromisoformat(
run["created_at"].replace("Z", "+00:00")
)
end = datetime.fromisoformat(
run["updated_at"].replace("Z", "+00:00")
)
duration = end - start
output.append(f" Duration: {duration}")
except Exception:
pass # Skip duration calculation if parsing fails
# Links for further investigation
if run.get("html_url"):
output.append(f" URL: {run['html_url']}")
output.append("")
# Add pagination info if applicable
if total_count > len(workflow_runs):
max_page = (total_count + per_page - 1) // per_page
output.append(
f"📄 Page {page} of {max_page} (use page parameter to see more)"
)
return "\n".join(output)
except ValueError as auth_error:
logger.error(f"Authentication error listing workflow runs: {auth_error}")
return f"❌ {str(auth_error)}"
except ConnectionError as conn_error:
logger.error(f"Connection error listing workflow runs: {conn_error}")
return f"❌ Network connection failed: {str(conn_error)}"
except Exception as e:
logger.error(f"Unexpected error listing workflow runs: {e}", exc_info=True)
return f"❌ Error listing workflow runs: {str(e)}"