Skip to main content
Glama

MCP Git Server

by MementoRC
api.py64 kB
"""GitHub API operations for MCP Git Server""" import logging from contextlib import asynccontextmanager from typing import Any from .client import get_github_client logger = logging.getLogger(__name__) @asynccontextmanager async def github_client_context(): """Async context manager for GitHub client with guaranteed resource cleanup.""" client = None try: client = get_github_client() if not client: raise ValueError( "GitHub token not configured. Set GITHUB_TOKEN environment variable." ) yield client finally: if client and client.session: try: await client.session.close() except Exception as cleanup_error: logger.warning(f"Error during client cleanup: {cleanup_error}") class PatchMemoryManager: """Memory-aware patch content manager with configurable limits and streaming support.""" def __init__(self, max_patch_size: int = 1000, max_total_memory: int = 50000): self.max_patch_size = max_patch_size self.max_total_memory = max_total_memory self.current_memory_usage = 0 self.patches_processed = 0 def can_include_patch(self, patch_size: int) -> bool: """Check if patch can be included within memory constraints.""" return (self.current_memory_usage + patch_size) <= self.max_total_memory def process_patch(self, patch_content: str) -> tuple[str, bool]: """Process patch content with memory management and truncation. Returns: tuple[str, bool]: (processed_content, was_truncated) """ patch_size = len(patch_content) self.patches_processed += 1 # Check memory budget first if not self.can_include_patch(patch_size): logger.warning( f"Patch #{self.patches_processed} skipped: exceeds memory budget ({patch_size} bytes, {self.current_memory_usage}/{self.max_total_memory} used)" ) return ( f"[Patch skipped - memory limit reached ({self.current_memory_usage}/{self.max_total_memory} bytes used)]", True, ) # Apply individual patch size limit if patch_size > self.max_patch_size: truncated_patch = patch_content[: self.max_patch_size] self.current_memory_usage += self.max_patch_size logger.info( f"Patch #{self.patches_processed} truncated: {patch_size} -> {self.max_patch_size} bytes" ) return ( f"```diff\n{truncated_patch}\n... [truncated {patch_size - self.max_patch_size} chars]\n```", True, ) else: self.current_memory_usage += patch_size return f"```diff\n{patch_content}\n```", False async def github_get_pr_checks( repo_owner: str, repo_name: str, pr_number: int, status: str | None = None, conclusion: str | None = None, ) -> str: """Get check runs for a pull request""" try: async with github_client_context() as client: # First get the PR to get the head SHA pr_response = await client.get( f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}" ) if pr_response.status != 200: return f"❌ Failed to get PR #{pr_number}: {pr_response.status}" pr_data = await pr_response.json() head_sha = pr_data["head"]["sha"] # Get check runs for the head commit params = {} if status: params["status"] = status checks_response = await client.get( f"/repos/{repo_owner}/{repo_name}/commits/{head_sha}/check-runs", params=params, ) if checks_response.status != 200: return f"❌ Failed to get check runs: {checks_response.status}" checks_data = await checks_response.json() # Filter by conclusion if specified check_runs = checks_data.get("check_runs", []) if conclusion: check_runs = [ run for run in check_runs if run.get("conclusion") == conclusion ] # Format the output if not check_runs: return f"No check runs found for PR #{pr_number}" output = [f"Check runs for PR #{pr_number} (commit {head_sha[:8]}):\n"] for run in check_runs: status_emoji = { "completed": "✅" if run.get("conclusion") == "success" else "❌", "in_progress": "🔄", "queued": "⏳", }.get(run["status"], "❓") output.append(f"{status_emoji} {run['name']}") output.append(f" Status: {run['status']}") if run.get("conclusion"): output.append(f" Conclusion: {run['conclusion']}") output.append(f" Started: {run.get('started_at', 'N/A')}") if run.get("completed_at"): output.append(f" Completed: {run['completed_at']}") if run.get("html_url"): output.append(f" URL: {run['html_url']}") output.append("") return "\n".join(output) except ValueError as auth_error: # Handle authentication/configuration errors specifically logger.error(f"Authentication error getting PR checks: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: # Handle network connectivity issues logger.error(f"Connection error getting PR checks: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: # Log unexpected errors with full context for debugging logger.error( f"Unexpected error getting PR checks for PR #{pr_number}: {e}", exc_info=True, ) return f"❌ Error getting PR checks: {str(e)}" async def github_get_failing_jobs( repo_owner: str, repo_name: str, pr_number: int, include_logs: bool = True, include_annotations: bool = True, ) -> str: """Get detailed information about failing jobs in a PR""" try: async with github_client_context() as client: # Get PR details pr_response = await client.get( f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}" ) if pr_response.status != 200: return f"❌ Failed to get PR #{pr_number}: {pr_response.status}" pr_data = await pr_response.json() head_sha = pr_data["head"]["sha"] # Get check runs and filter for failures checks_response = await client.get( f"/repos/{repo_owner}/{repo_name}/commits/{head_sha}/check-runs" ) if checks_response.status != 200: return f"❌ Failed to get check runs: {checks_response.status}" checks_data = await checks_response.json() failing_runs = [ run for run in checks_data.get("check_runs", []) if run["status"] == "completed" and run.get("conclusion") in ["failure", "cancelled", "timed_out"] ] if not failing_runs: return f"No failing jobs found for PR #{pr_number}" output = [f"Failing jobs for PR #{pr_number}:\n"] for run in failing_runs: output.append(f"❌ {run['name']}") output.append(f" Conclusion: {run['conclusion']}") output.append(f" Started: {run.get('started_at', 'N/A')}") output.append(f" Completed: {run.get('completed_at', 'N/A')}") # Get annotations if requested if include_annotations and run.get("id"): try: annotations_response = await client.get( f"/repos/{repo_owner}/{repo_name}/check-runs/{run['id']}/annotations" ) if annotations_response.status == 200: annotations_data = await annotations_response.json() if annotations_data: output.append(" Annotations:") for annotation in annotations_data[ :5 ]: # Limit to first 5 output.append( f" • {annotation.get('title', 'Error')}: {annotation.get('message', 'No message')}" ) if annotation.get("path"): output.append( f" File: {annotation['path']} (line {annotation.get('start_line', 'unknown')})" ) except (ConnectionError, ValueError) as annotation_error: # Log specific annotation errors but continue processing logger.warning( f"Failed to get annotations for run {run.get('id')}: {annotation_error}" ) except Exception as annotation_error: # Annotations might not be available - log but continue logger.debug( f"Annotations unavailable for run {run.get('id')}: {annotation_error}" ) # Get logs if requested (simplified) if include_logs and run.get("html_url"): output.append(f" Details: {run['html_url']}") output.append("") return "\n".join(output) except ValueError as auth_error: logger.error(f"Authentication error getting failing jobs: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error getting failing jobs: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error( f"Unexpected error getting failing jobs for PR #{pr_number}: {e}", exc_info=True, ) return f"❌ Error getting failing jobs: {str(e)}" async def github_get_workflow_run( repo_owner: str, repo_name: str, run_id: int, include_logs: bool = False ) -> str: """Get detailed workflow run information""" try: async with github_client_context() as client: # Get workflow run details run_response = await client.get( f"/repos/{repo_owner}/{repo_name}/actions/runs/{run_id}" ) if run_response.status != 200: return f"❌ Failed to get workflow run #{run_id}: {run_response.status}" run_data = await run_response.json() output = [f"Workflow Run #{run_id}:\n"] output.append(f"Name: {run_data.get('name', 'N/A')}") output.append(f"Status: {run_data.get('status', 'N/A')}") output.append(f"Conclusion: {run_data.get('conclusion', 'N/A')}") output.append(f"Branch: {run_data.get('head_branch', 'N/A')}") output.append(f"Commit: {run_data.get('head_sha', 'N/A')[:8]}") output.append(f"Started: {run_data.get('created_at', 'N/A')}") output.append(f"Updated: {run_data.get('updated_at', 'N/A')}") if run_data.get("html_url"): output.append(f"URL: {run_data['html_url']}") # Get jobs if available jobs_response = await client.get( f"/repos/{repo_owner}/{repo_name}/actions/runs/{run_id}/jobs" ) if jobs_response.status == 200: jobs_data = await jobs_response.json() jobs = jobs_data.get("jobs", []) if jobs: output.append("\nJobs:") for job in jobs: status_emoji = { "completed": "✅" if job.get("conclusion") == "success" else "❌", "in_progress": "🔄", "queued": "⏳", }.get(job["status"], "❓") output.append(f" {status_emoji} {job['name']}") output.append(f" Status: {job['status']}") if job.get("conclusion"): output.append(f" Conclusion: {job['conclusion']}") return "\n".join(output) except ValueError as auth_error: logger.error(f"Authentication error getting workflow run: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error getting workflow run: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error( f"Unexpected error getting workflow run #{run_id}: {e}", exc_info=True ) return f"❌ Error getting workflow run: {str(e)}" async def github_get_pr_details( repo_owner: str, repo_name: str, pr_number: int, include_files: bool = False, include_reviews: bool = False, ) -> str: """Get comprehensive PR details""" try: async with github_client_context() as client: # Get PR details pr_response = await client.get( f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}" ) if pr_response.status != 200: return f"❌ Failed to get PR #{pr_number}: {pr_response.status}" pr_data = await pr_response.json() output = [f"Pull Request #{pr_number}:\n"] output.append(f"Title: {pr_data.get('title', 'N/A')}") output.append(f"State: {pr_data.get('state', 'N/A')}") output.append(f"Author: {pr_data.get('user', {}).get('login', 'N/A')}") output.append(f"Base: {pr_data.get('base', {}).get('ref', 'N/A')}") output.append(f"Head: {pr_data.get('head', {}).get('ref', 'N/A')}") output.append(f"Created: {pr_data.get('created_at', 'N/A')}") output.append(f"Updated: {pr_data.get('updated_at', 'N/A')}") if pr_data.get("body"): output.append( f"\nDescription:\n{pr_data['body'][:500]}{'...' if len(pr_data['body']) > 500 else ''}" ) if pr_data.get("html_url"): output.append(f"\nURL: {pr_data['html_url']}") # Get files if requested if include_files: try: files_response = await client.get( f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/files" ) if files_response.status == 200: files_data = await files_response.json() if files_data: output.append(f"\nFiles ({len(files_data)}):") for file in files_data[:10]: # Limit to first 10 output.append( f" {file['status'][0].upper()} {file['filename']} (+{file['additions']}, -{file['deletions']})" ) if len(files_data) > 10: output.append( f" ... and {len(files_data) - 10} more files" ) except (ConnectionError, ValueError) as files_error: logger.warning( f"Failed to get files for PR #{pr_number}: {files_error}" ) output.append("\n⚠️ Could not retrieve files information") # Get reviews if requested if include_reviews: try: reviews_response = await client.get( f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/reviews" ) if reviews_response.status == 200: reviews_data = await reviews_response.json() if reviews_data: output.append(f"\nReviews ({len(reviews_data)}):") for review in reviews_data[-5:]: # Show last 5 state_emoji = { "APPROVED": "✅", "CHANGES_REQUESTED": "❌", "COMMENTED": "💬", }.get(review.get("state"), "❓") output.append( f" {state_emoji} {review.get('user', {}).get('login', 'N/A')}: {review.get('state', 'N/A')}" ) except (ConnectionError, ValueError) as reviews_error: logger.warning( f"Failed to get reviews for PR #{pr_number}: {reviews_error}" ) output.append("\n⚠️ Could not retrieve reviews information") return "\n".join(output) except ValueError as auth_error: logger.error(f"Authentication error getting PR details: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error getting PR details: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error( f"Unexpected error getting PR details for PR #{pr_number}: {e}", exc_info=True, ) return f"❌ Error getting PR details: {str(e)}" async def github_list_pull_requests( repo_owner: str, repo_name: str, state: str = "open", head: str | None = None, base: str | None = None, sort: str = "created", direction: str = "desc", per_page: int = 30, page: int = 1, ) -> str: """List pull requests for a repository""" logger.debug(f"🔍 Starting github_list_pull_requests for {repo_owner}/{repo_name}") try: async with github_client_context() as client: logger.debug("✅ GitHub client obtained successfully") logger.debug( f"🔗 Token prefix: {client.token[:8]}..." if client.token else "No token" ) params = { "state": state, "sort": sort, "direction": direction, "per_page": per_page, "page": page, } if head: params["head"] = head if base: params["base"] = base logger.debug( f"📡 Making API call to /repos/{repo_owner}/{repo_name}/pulls with params: {params}" ) response = await client.get( f"/repos/{repo_owner}/{repo_name}/pulls", params=params ) logger.debug(f"📨 GitHub API response status: {response.status}") if response.status == 401: response_text = await response.text() logger.error( f"🔒 GitHub API authentication failed (401): {response_text}" ) return f"❌ GitHub API error 401: {response_text}" elif response.status != 200: response_text = await response.text() logger.error(f"❌ GitHub API error {response.status}: {response_text}") return f"❌ Failed to list pull requests: {response.status} - {response_text}" prs = await response.json() if not prs: return f"No {state} pull requests found" output = [f"{state.title()} Pull Requests for {repo_owner}/{repo_name}:\n"] for pr in prs: state_emoji = {"open": "🟢", "closed": "🔴", "merged": "🟣"}.get( pr.get("state"), "❓" ) output.append(f"{state_emoji} #{pr['number']}: {pr['title']}") output.append(f" Author: {pr.get('user', {}).get('login', 'N/A')}") base_ref = pr.get("base", {}).get("ref", "N/A") head_ref = pr.get("head", {}).get("ref", "N/A") output.append(f" Base: {base_ref} ← Head: {head_ref}") output.append(f" Created: {pr.get('created_at', 'N/A')}") output.append("") return "\n".join(output) except ValueError as auth_error: logger.error(f"Authentication error listing pull requests: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error listing pull requests: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error( f"Unexpected error listing pull requests for {repo_owner}/{repo_name}: {e}", exc_info=True, ) return f"❌ Error listing pull requests: {str(e)}" async def github_get_pr_status(repo_owner: str, repo_name: str, pr_number: int) -> str: """Get the status and check runs for a pull request""" try: async with github_client_context() as client: # Get PR details pr_response = await client.get( f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}" ) if pr_response.status != 200: return f"❌ Failed to get PR #{pr_number}: {pr_response.status}" pr_data = await pr_response.json() head_sha = pr_data["head"]["sha"] output = [f"Status for PR #{pr_number}:\n"] output.append(f"State: {pr_data.get('state', 'N/A')}") output.append(f"Mergeable: {pr_data.get('mergeable', 'N/A')}") output.append(f"Merge State: {pr_data.get('mergeable_state', 'N/A')}") output.append("") # Get check runs checks_response = await client.get( f"/repos/{repo_owner}/{repo_name}/commits/{head_sha}/check-runs" ) if checks_response.status == 200: checks_data = await checks_response.json() check_runs = checks_data.get("check_runs", []) if check_runs: output.append("Check Runs:") for run in check_runs: status_emoji = { "completed": "✅" if run.get("conclusion") == "success" else "❌", "in_progress": "🔄", "queued": "⏳", }.get(run["status"], "❓") output.append( f" {status_emoji} {run['name']}: {run['status']}" ) if run.get("conclusion"): output.append(f" Conclusion: {run['conclusion']}") return "\n".join(output) except ValueError as auth_error: logger.error(f"Authentication error getting PR status: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error getting PR status: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error( f"Unexpected error getting PR status for PR #{pr_number}: {e}", exc_info=True, ) return f"❌ Error getting PR status: {str(e)}" async def github_get_pr_files( repo_owner: str, repo_name: str, pr_number: int, per_page: int = 30, page: int = 1, include_patch: bool = False, ) -> str: """Get files changed in a pull request with memory-aware patch handling""" try: async with github_client_context() as client: params = {"per_page": per_page, "page": page} response = await client.get( f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/files", params=params, ) if response.status != 200: return f"❌ Failed to get PR files: {response.status}" files = await response.json() if not files: return f"No files found for PR #{pr_number}" output = [f"Files changed in PR #{pr_number}:\n"] total_additions = 0 total_deletions = 0 # Initialize memory manager for patch processing patch_manager = PatchMemoryManager( max_patch_size=1000, max_total_memory=50000 ) for file in files: status_emoji = { "added": "➕", "modified": "📝", "removed": "➖", "renamed": "📝", }.get(file.get("status"), "❓") additions = file.get("additions", 0) deletions = file.get("deletions", 0) total_additions += additions total_deletions += deletions output.append( f"{status_emoji} {file['filename']} (+{additions}, -{deletions})" ) if include_patch and file.get("patch"): # Use memory manager to safely process patch content processed_patch, was_truncated = patch_manager.process_patch( file["patch"] ) output.append(processed_patch) if was_truncated: logger.info( f"Patch for {file['filename']} was truncated or skipped for memory management" ) output.append("") output.append(f"Total: +{total_additions}, -{total_deletions}") # Add memory usage summary if patches were included if include_patch: output.append( f"\nMemory usage: {patch_manager.current_memory_usage}/{patch_manager.max_total_memory} bytes" ) output.append(f"Patches processed: {patch_manager.patches_processed}") return "\n".join(output) except ValueError as auth_error: logger.error(f"Authentication error getting PR files: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error getting PR files: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error( f"Unexpected error getting PR files for PR #{pr_number}: {e}", exc_info=True ) return f"❌ Error getting PR files: {str(e)}" async def github_update_pr( repo_owner: str, repo_name: str, pr_number: int, title: str | None = None, body: str | None = None, state: str | None = None, ) -> str: """Update a pull request's title, body, or state.""" logger.debug(f"🚀 Updating PR #{pr_number} in {repo_owner}/{repo_name}") try: async with github_client_context() as client: payload: dict[str, Any] = {} if title is not None: payload["title"] = title if body is not None: payload["body"] = body if state is not None: if state not in ["open", "closed"]: return "❌ State must be 'open' or 'closed'" payload["state"] = state if not payload: return "⚠️ No update parameters provided. Please specify title, body, or state." response = await client.patch( f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}", json=payload ) if response.status != 200: error_text = await response.text() return f"❌ Failed to update PR #{pr_number}: {response.status} - {error_text}" result = await response.json() logger.info(f"✅ Successfully updated PR #{pr_number}") return ( f"✅ Successfully updated PR #{result['number']}: {result['html_url']}" ) except ValueError as auth_error: logger.error(f"Authentication error updating PR: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error updating PR: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error(f"Unexpected error updating PR #{pr_number}: {e}", exc_info=True) return f"❌ Error updating PR: {str(e)}" async def github_create_pr( repo_owner: str, repo_name: str, title: str, head: str, base: str, body: str | None = None, draft: bool = False, ) -> str: """Create a new pull request.""" logger.debug(f"🚀 Creating PR in {repo_owner}/{repo_name} from {head} to {base}") try: async with github_client_context() as client: payload = {"title": title, "head": head, "base": base, "draft": draft} if body is not None: payload["body"] = body response = await client.post( f"/repos/{repo_owner}/{repo_name}/pulls", json=payload ) if response.status != 201: error_text = await response.text() # Provide more helpful error for common cases if ( "No commits between" in error_text or "A pull request already exists" in error_text ): return f"❌ Could not create PR. Reason: {error_text}" return f"❌ Failed to create PR: {response.status} - {error_text}" result = await response.json() logger.info(f"✅ Successfully created PR #{result['number']}") return ( f"✅ Successfully created PR #{result['number']}: {result['html_url']}" ) except ValueError as auth_error: logger.error(f"Authentication error creating PR: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error creating PR: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error(f"Unexpected error creating PR: {e}", exc_info=True) return f"❌ Error creating PR: {str(e)}" async def github_merge_pr( repo_owner: str, repo_name: str, pr_number: int, commit_title: str | None = None, commit_message: str | None = None, merge_method: str = "merge", ) -> str: """Merge a pull request.""" logger.debug( f"🚀 Merging PR #{pr_number} in {repo_owner}/{repo_name} using '{merge_method}' method" ) try: async with github_client_context() as client: if merge_method not in ["merge", "squash", "rebase"]: return "❌ merge_method must be one of 'merge', 'squash', or 'rebase'" payload = {"merge_method": merge_method} if commit_title: payload["commit_title"] = commit_title if commit_message: payload["commit_message"] = commit_message response = await client.put( f"/repos/{repo_owner}/{repo_name}/pulls/{pr_number}/merge", json=payload ) if response.status != 200: error_text = await response.text() if response.status in [405, 409]: return f"❌ Could not merge PR. Reason: {error_text}. This may be due to merge conflicts or failing status checks." return f"❌ Failed to merge PR: {response.status} - {error_text}" result = await response.json() if result.get("merged"): logger.info(f"✅ Successfully merged PR #{pr_number}") return f"✅ {result['message']}" else: logger.warning( f"⚠️ Merge attempt for PR #{pr_number} returned 200 OK but 'merged' is false: {result.get('message')}" ) return f"⚠️ {result.get('message', 'Merge was not successful but API returned 200 OK. Check PR status.')}" except ValueError as auth_error: logger.error(f"Authentication error merging PR: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error merging PR: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error(f"Unexpected error merging PR #{pr_number}: {e}", exc_info=True) return f"❌ Error merging PR: {str(e)}" async def github_add_pr_comment( repo_owner: str, repo_name: str, pr_number: int, body: str ) -> str: """Add a comment to a pull request.""" logger.debug(f"🚀 Adding comment to PR #{pr_number} in {repo_owner}/{repo_name}") try: async with github_client_context() as client: # Comments are added to the corresponding issue payload = {"body": body} response = await client.post( f"/repos/{repo_owner}/{repo_name}/issues/{pr_number}/comments", json=payload, ) if response.status != 201: error_text = await response.text() return f"❌ Failed to add comment: {response.status} - {error_text}" result = await response.json() logger.info(f"✅ Successfully added comment to PR #{pr_number}") return f"✅ Successfully added comment: {result['html_url']}" except ValueError as auth_error: logger.error(f"Authentication error adding PR comment: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error adding PR comment: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error( f"Unexpected error adding comment to PR #{pr_number}: {e}", exc_info=True ) return f"❌ Error adding comment: {str(e)}" async def github_close_pr(repo_owner: str, repo_name: str, pr_number: int) -> str: """Close a pull request.""" logger.debug(f"🚀 Closing PR #{pr_number} in {repo_owner}/{repo_name}") return await github_update_pr(repo_owner, repo_name, pr_number, state="closed") async def github_reopen_pr(repo_owner: str, repo_name: str, pr_number: int) -> str: """Reopen a closed pull request.""" logger.debug(f"🚀 Reopening PR #{pr_number} in {repo_owner}/{repo_name}") return await github_update_pr(repo_owner, repo_name, pr_number, state="open") # GitHub Issues API Functions async def github_create_issue( repo_owner: str, repo_name: str, title: str, body: str | None = None, labels: list[str] | None = None, assignees: list[str] | None = None, milestone: int | None = None, ) -> str: """Create a new GitHub issue.""" logger.debug(f"🚀 Creating issue in {repo_owner}/{repo_name}: {title}") try: async with github_client_context() as client: payload: dict[str, Any] = {"title": title} if body is not None: payload["body"] = body if labels is not None: payload["labels"] = labels if assignees is not None: payload["assignees"] = assignees if milestone is not None: payload["milestone"] = milestone response = await client.post( f"/repos/{repo_owner}/{repo_name}/issues", json=payload ) if response.status != 201: error_text = await response.text() return f"❌ Failed to create issue: {response.status} - {error_text}" result = await response.json() logger.info(f"✅ Successfully created issue #{result['number']}") return f"✅ Successfully created issue #{result['number']}: {result['html_url']}" except ValueError as auth_error: logger.error(f"Authentication error creating issue: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error creating issue: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error(f"Unexpected error creating issue: {e}", exc_info=True) return f"❌ Error creating issue: {str(e)}" async def github_list_issues( repo_owner: str, repo_name: str, state: str = "open", labels: list[str] | None = None, assignee: str | None = None, creator: str | None = None, mentioned: str | None = None, milestone: str | None = None, sort: str = "created", direction: str = "desc", since: str | None = None, per_page: int = 30, page: int = 1, ) -> str: """List issues for a repository.""" logger.debug(f"🔍 Listing issues for {repo_owner}/{repo_name}") try: async with github_client_context() as client: params = { "state": state, "sort": sort, "direction": direction, "per_page": per_page, "page": page, } if labels: params["labels"] = ",".join(labels) if assignee: params["assignee"] = assignee if creator: params["creator"] = creator if mentioned: params["mentioned"] = mentioned if milestone: params["milestone"] = milestone if since: params["since"] = since response = await client.get( f"/repos/{repo_owner}/{repo_name}/issues", params=params ) if response.status != 200: error_text = await response.text() return f"❌ Failed to list issues: {response.status} - {error_text}" issues = await response.json() if not issues: return f"No {state} issues found" output = [f"{state.title()} Issues for {repo_owner}/{repo_name}:\n"] for issue in issues: # Skip pull requests (they appear in issues API but have 'pull_request' key) if issue.get("pull_request"): continue state_emoji = {"open": "🟢", "closed": "🔴"}.get( issue.get("state"), "❓" ) output.append(f"{state_emoji} #{issue['number']}: {issue['title']}") output.append(f" Author: {issue.get('user', {}).get('login', 'N/A')}") # Show labels if any if issue.get("labels"): label_names = [label["name"] for label in issue["labels"]] output.append(f" Labels: {', '.join(label_names)}") # Show assignees if any if issue.get("assignees"): assignee_names = [ assignee["login"] for assignee in issue["assignees"] ] output.append(f" Assignees: {', '.join(assignee_names)}") output.append(f" Created: {issue.get('created_at', 'N/A')}") output.append("") return "\n".join(output) except ValueError as auth_error: logger.error(f"Authentication error listing issues: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error listing issues: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error(f"Unexpected error listing issues: {e}", exc_info=True) return f"❌ Error listing issues: {str(e)}" async def github_update_issue( repo_owner: str, repo_name: str, issue_number: int, title: str | None = None, body: str | None = None, state: str | None = None, labels: list[str] | None = None, assignees: list[str] | None = None, milestone: int | None = None, ) -> str: """Update a GitHub issue.""" logger.debug(f"🚀 Updating issue #{issue_number} in {repo_owner}/{repo_name}") try: async with github_client_context() as client: payload: dict[str, Any] = {} if title is not None: payload["title"] = title if body is not None: payload["body"] = body if state is not None: if state not in ["open", "closed"]: return "❌ State must be 'open' or 'closed'" payload["state"] = state if labels is not None: payload["labels"] = labels if assignees is not None: payload["assignees"] = assignees if milestone is not None: payload["milestone"] = milestone if not payload: return "⚠️ No update parameters provided. Please specify title, body, state, labels, assignees, or milestone." response = await client.patch( f"/repos/{repo_owner}/{repo_name}/issues/{issue_number}", json=payload ) if response.status != 200: error_text = await response.text() return f"❌ Failed to update issue #{issue_number}: {response.status} - {error_text}" result = await response.json() logger.info(f"✅ Successfully updated issue #{issue_number}") return f"✅ Successfully updated issue #{result['number']}: {result['html_url']}" except ValueError as auth_error: logger.error(f"Authentication error updating issue: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error updating issue: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error( f"Unexpected error updating issue #{issue_number}: {e}", exc_info=True ) return f"❌ Error updating issue: {str(e)}" async def github_edit_pr_description( repo_owner: str, repo_name: str, pr_number: int, description: str, ) -> str: """Edit a pull request's description/body.""" logger.debug(f"🚀 Updating PR #{pr_number} description in {repo_owner}/{repo_name}") # Use the existing github_update_pr function to update just the body return await github_update_pr( repo_owner=repo_owner, repo_name=repo_name, pr_number=pr_number, body=description, ) async def github_search_issues( repo_owner: str, repo_name: str, query: str, sort: str = "created", order: str = "desc", per_page: int = 30, page: int = 1, ) -> str: """Search issues using GitHub's advanced search API. Supports GitHub's search qualifiers like: - is:issue is:open author:username - label:bug label:"help wanted" - created:2023-01-01..2023-12-31 - updated:>2023-06-01 - milestone:"v1.0" assignee:username """ logger.debug(f"🔍 Searching issues in {repo_owner}/{repo_name}: {query}") try: async with github_client_context() as client: # Add repository scope to query search_query = f"repo:{repo_owner}/{repo_name} is:issue {query}" params = { "q": search_query, "sort": sort, "order": order, "per_page": per_page, "page": page, } response = await client.get("/search/issues", params=params) if response.status != 200: error_text = await response.text() return f"❌ Failed to search issues: {response.status} - {error_text}" data = await response.json() issues = data.get("items", []) total_count = data.get("total_count", 0) if not issues: return f"No issues found matching query: {query}" output = [f"Search Results for '{query}' in {repo_owner}/{repo_name}:\n"] output.append(f"Found {total_count} total issues (showing page {page})\n") for issue in issues: # Skip pull requests if issue.get("pull_request"): continue state_emoji = {"open": "🟢", "closed": "🔴"}.get( issue.get("state"), "❓" ) output.append(f"{state_emoji} #{issue['number']}: {issue['title']}") output.append(f" Author: {issue.get('user', {}).get('login', 'N/A')}") # Show labels if issue.get("labels"): label_names = [label["name"] for label in issue["labels"]] output.append(f" Labels: {', '.join(label_names)}") # Show assignees if issue.get("assignees"): assignee_names = [ assignee["login"] for assignee in issue["assignees"] ] output.append(f" Assignees: {', '.join(assignee_names)}") # Show milestone if issue.get("milestone"): output.append(f" Milestone: {issue['milestone']['title']}") output.append(f" Created: {issue.get('created_at', 'N/A')}") output.append( f" Score: {issue.get('score', 'N/A')}" ) # Search relevance score output.append("") # Add pagination info max_results = min(1000, total_count) # GitHub limits search to 1000 results if total_count > len(issues): max_page = (max_results + per_page - 1) // per_page output.append( f"📄 Page {page} of {max_page} (max {max_results} results from GitHub)" ) return "\n".join(output) except ValueError as auth_error: logger.error(f"Authentication error searching issues: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error searching issues: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error(f"Unexpected error searching issues: {e}", exc_info=True) return f"❌ Error searching issues: {str(e)}" async def github_create_issue_from_template( repo_owner: str, repo_name: str, title: str, template_name: str = "bug_report", template_data: dict | None = None, ) -> str: """Create a GitHub issue using a predefined template. Templates include: - bug_report: Bug report with reproduction steps - feature_request: Feature request with use cases - question: Question or discussion starter - custom: Use template_data to define custom format """ logger.debug( f"🚀 Creating issue from template '{template_name}' in {repo_owner}/{repo_name}" ) templates = { "bug_report": { "body": f"""## Bug Report **Describe the bug** A clear and concise description of what the bug is. **To Reproduce** Steps to reproduce the behavior: 1. Go to '...' 2. Click on '....' 3. Scroll down to '....' 4. See error **Expected behavior** A clear and concise description of what you expected to happen. **Additional context** Add any other context about the problem here. **Environment** - OS: [e.g. iOS] - Browser [e.g. chrome, safari] - Version [e.g. 22] {template_data.get("additional_info", "") if template_data else ""} """, "labels": ["bug", "triage"], }, "feature_request": { "body": f"""## Feature Request **Is your feature request related to a problem? Please describe.** A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] **Describe the solution you'd like** A clear and concise description of what you want to happen. **Describe alternatives you've considered** A clear and concise description of any alternative solutions or features you've considered. **Additional context** Add any other context or screenshots about the feature request here. {template_data.get("additional_info", "") if template_data else ""} """, "labels": ["enhancement", "feature-request"], }, "question": { "body": f"""## Question **What would you like to know?** Please describe your question clearly. **Context** Provide any relevant context that might help answer your question. **What have you tried?** Let us know what research or attempts you've already made. {template_data.get("additional_info", "") if template_data else ""} """, "labels": ["question"], }, } if template_name == "custom" and template_data: template = { "body": template_data.get("body", ""), "labels": template_data.get("labels", []), } else: template = templates.get(template_name) if not template: available = ", ".join(templates.keys()) + ", custom" return f"❌ Unknown template '{template_name}'. Available templates: {available}" # Apply template data customizations if template_data: if "labels" in template_data: template["labels"] = template["labels"] + template_data["labels"] if "assignees" in template_data: template["assignees"] = template_data["assignees"] if "milestone" in template_data: template["milestone"] = template_data["milestone"] # Create issue using template return await github_create_issue( repo_owner=repo_owner, repo_name=repo_name, title=title, body=template["body"], labels=template.get("labels"), assignees=template.get("assignees"), milestone=template.get("milestone"), ) async def github_bulk_update_issues( repo_owner: str, repo_name: str, issue_numbers: list[int], labels: list[str] | None = None, assignees: list[str] | None = None, milestone: int | None = None, state: str | None = None, ) -> str: """Bulk update multiple issues with common properties. Useful for: - Adding labels to multiple issues - Assigning multiple issues to same milestone - Bulk closing/reopening issues - Mass assignment operations """ logger.debug( f"🚀 Bulk updating {len(issue_numbers)} issues in {repo_owner}/{repo_name}" ) if not issue_numbers: return "⚠️ No issue numbers provided for bulk update" if not any([labels, assignees, milestone is not None, state]): return "⚠️ No update parameters provided. Specify labels, assignees, milestone, or state" results = [] successful_updates = 0 failed_updates = 0 for issue_number in issue_numbers: try: result = await github_update_issue( repo_owner=repo_owner, repo_name=repo_name, issue_number=issue_number, labels=labels, assignees=assignees, milestone=milestone, state=state, ) if result.startswith("✅"): successful_updates += 1 results.append(f"✅ Issue #{issue_number}: Updated") else: failed_updates += 1 results.append(f"❌ Issue #{issue_number}: {result}") except Exception as e: failed_updates += 1 results.append(f"❌ Issue #{issue_number}: Error - {str(e)}") # Summary summary = [ f"Bulk Update Results for {len(issue_numbers)} issues:", f"✅ Successful: {successful_updates}", f"❌ Failed: {failed_updates}", "", ] # Detailed results (limit to first 10 for readability) summary.append("Details:") for result in results[:10]: summary.append(f" {result}") if len(results) > 10: summary.append(f" ... and {len(results) - 10} more results") return "\n".join(summary) async def github_list_workflow_runs( repo_owner: str, repo_name: str, workflow_id: str | None = None, actor: str | None = None, branch: str | None = None, event: str | None = None, status: str | None = None, conclusion: str | None = None, per_page: int = 30, page: int = 1, created: str | None = None, exclude_pull_requests: bool = False, check_suite_id: int | None = None, head_sha: str | None = None, ) -> str: """List workflow runs for a repository with comprehensive filtering options. This provides essential CI/CD monitoring capabilities for GitHub Actions workflows. Args: repo_owner: Repository owner/organization repo_name: Repository name workflow_id: Filter by specific workflow ID or filename (e.g., "ci.yml") actor: Filter by GitHub username who triggered the run branch: Filter by branch name event: Filter by event type (push, pull_request, schedule, etc.) status: Filter by run status (queued, in_progress, completed) conclusion: Filter by conclusion (success, failure, neutral, cancelled, timed_out, action_required, stale) per_page: Number of results per page (1-100, default: 30) page: Page number to retrieve (default: 1) created: Filter by creation date (ISO 8601 format or relative like >2023-01-01) exclude_pull_requests: If true, exclude workflow runs triggered by pull requests check_suite_id: Filter by specific check suite ID head_sha: Filter by specific commit SHA Returns: Formatted string with workflow run information including status, conclusion, timing, and links for CI/CD monitoring and debugging. """ logger.debug(f"🔍 Listing workflow runs for {repo_owner}/{repo_name}") try: async with github_client_context() as client: # Build query parameters with validation params = { "per_page": min(max(per_page, 1), 100), # Enforce GitHub API limits "page": max(page, 1), } # Add optional filters if actor: params["actor"] = actor if branch: params["branch"] = branch if event: params["event"] = event if status and status in ["queued", "in_progress", "completed"]: params["status"] = status if conclusion and conclusion in [ "success", "failure", "neutral", "cancelled", "timed_out", "action_required", "stale", ]: params["conclusion"] = conclusion if created: params["created"] = created if exclude_pull_requests: params["exclude_pull_requests"] = "true" if check_suite_id: params["check_suite_id"] = check_suite_id if head_sha: params["head_sha"] = head_sha # Determine API endpoint - workflow-specific or repository-wide if workflow_id: # Get runs for specific workflow endpoint = f"/repos/{repo_owner}/{repo_name}/actions/workflows/{workflow_id}/runs" logger.debug(f"📡 Fetching workflow-specific runs: {workflow_id}") else: # Get all workflow runs for repository endpoint = f"/repos/{repo_owner}/{repo_name}/actions/runs" logger.debug("📡 Fetching all repository workflow runs") logger.debug(f"📡 Making API call to {endpoint} with params: {params}") response = await client.get(endpoint, params=params) logger.debug(f"📨 GitHub API response status: {response.status}") if response.status == 401: response_text = await response.text() logger.error( f"🔒 GitHub API authentication failed (401): {response_text}" ) return "❌ GitHub API authentication failed: Verify your GITHUB_TOKEN has Actions read permissions" elif response.status == 404: if workflow_id: return f"❌ Workflow '{workflow_id}' not found in {repo_owner}/{repo_name}. Check workflow file name or ID." else: return f"❌ Repository {repo_owner}/{repo_name} not found or Actions not enabled" elif response.status != 200: response_text = await response.text() logger.error(f"❌ GitHub API error {response.status}: {response_text}") return f"❌ Failed to list workflow runs: {response.status} - {response_text}" data = await response.json() workflow_runs = data.get("workflow_runs", []) if not workflow_runs: filter_desc = ( f" (filtered by: {', '.join(f'{k}={v}' for k, v in params.items() if k not in ['per_page', 'page'])})" if len(params) > 2 else "" ) return ( f"No workflow runs found for {repo_owner}/{repo_name}{filter_desc}" ) # Build formatted output filter_info = [] if workflow_id: filter_info.append(f"workflow: {workflow_id}") if actor: filter_info.append(f"actor: {actor}") if branch: filter_info.append(f"branch: {branch}") if event: filter_info.append(f"event: {event}") if status: filter_info.append(f"status: {status}") if conclusion: filter_info.append(f"conclusion: {conclusion}") header = f"Workflow Runs for {repo_owner}/{repo_name}" if filter_info: header += f" ({', '.join(filter_info)})" output = [f"{header}:\n"] # Add summary statistics total_count = data.get("total_count", len(workflow_runs)) if total_count > len(workflow_runs): output.append( f"Showing {len(workflow_runs)} of {total_count} total runs (page {page})\n" ) # Group runs by status for quick overview status_counts = {} for run in workflow_runs: run_status = run.get("status", "unknown") status_counts[run_status] = status_counts.get(run_status, 0) + 1 if len(status_counts) > 1: status_summary = ", ".join( [f"{status}: {count}" for status, count in status_counts.items()] ) output.append(f"Status summary: {status_summary}\n") # Format individual workflow runs for run in workflow_runs: # Status and conclusion emojis status_emoji = { "completed": "✅" if run.get("conclusion") == "success" else "❌", "in_progress": "🔄", "queued": "⏳", "requested": "📋", "waiting": "⏸️", }.get(run.get("status"), "❓") # Enhanced status display status_text = run.get("status", "unknown") if run.get("conclusion"): status_text += f" ({run['conclusion']})" # Workflow name and run number workflow_name = run.get("name", "Unknown Workflow") run_number = run.get("run_number", "?") output.append(f"{status_emoji} {workflow_name} #{run_number}") output.append(f" ID: {run.get('id', 'N/A')}") output.append(f" Status: {status_text}") output.append(f" Branch: {run.get('head_branch', 'N/A')}") output.append(f" Commit: {run.get('head_sha', 'N/A')[:8]}...") output.append(f" Actor: {run.get('actor', {}).get('login', 'N/A')}") output.append(f" Event: {run.get('event', 'N/A')}") # Timing information created_at = run.get("created_at", "N/A") updated_at = run.get("updated_at", "N/A") if created_at != "N/A": output.append(f" Started: {created_at}") if updated_at != "N/A" and updated_at != created_at: output.append(f" Updated: {updated_at}") # Duration calculation for completed runs if ( run.get("status") == "completed" and run.get("created_at") and run.get("updated_at") ): try: from datetime import datetime start = datetime.fromisoformat( run["created_at"].replace("Z", "+00:00") ) end = datetime.fromisoformat( run["updated_at"].replace("Z", "+00:00") ) duration = end - start output.append(f" Duration: {duration}") except Exception: pass # Skip duration calculation if parsing fails # Links for further investigation if run.get("html_url"): output.append(f" URL: {run['html_url']}") output.append("") # Add pagination info if applicable if total_count > len(workflow_runs): max_page = (total_count + per_page - 1) // per_page output.append( f"📄 Page {page} of {max_page} (use page parameter to see more)" ) return "\n".join(output) except ValueError as auth_error: logger.error(f"Authentication error listing workflow runs: {auth_error}") return f"❌ {str(auth_error)}" except ConnectionError as conn_error: logger.error(f"Connection error listing workflow runs: {conn_error}") return f"❌ Network connection failed: {str(conn_error)}" except Exception as e: logger.error(f"Unexpected error listing workflow runs: {e}", exc_info=True) return f"❌ Error listing workflow runs: {str(e)}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server