Skip to main content
Glama

Jira-GitLab MCP Server

by gabbar910
github_client.py17.9 kB
import asyncio import logging from typing import Dict, List, Any, Optional import base64 import json from github import Github, GithubException from github.Repository import Repository from github.Branch import Branch from utils.error_handler import GitLabError, AuthenticationError, retry_on_failure logger = logging.getLogger(__name__) class GitHubClient: def __init__(self, config: Dict[str, str]): self.config = config self.base_url = config.get("base_url", "https://api.github.com") self.access_token = config["access_token"] # Initialize PyGithub client if self.base_url == "https://api.github.com": self.github = Github(self.access_token) else: # For GitHub Enterprise self.github = Github(base_url=self.base_url, login_or_token=self.access_token) logger.info(f"GitHub client initialized for {self.base_url}") @retry_on_failure(max_retries=3, delay=1.0) async def test_connection(self) -> bool: """Test the GitHub connection""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._test_connection_sync) logger.info("GitHub connection test successful") return result except Exception as e: logger.error(f"GitHub connection test failed: {e}") raise AuthenticationError(f"Failed to connect to GitHub: {str(e)}", "github") def _test_connection_sync(self) -> bool: """Synchronous connection test""" try: # Test authentication by getting current user user = self.github.get_user() user_login = user.login # This will trigger the API call logger.info(f"GitHub connection successful for user: {user_login}") return True except GithubException as e: if e.status == 401: raise AuthenticationError("Invalid GitHub access token", "github") else: raise GitLabError(f"GitHub API error: {e.status} - {e.data}") except Exception as e: raise GitLabError(f"Unexpected error: {str(e)}") @retry_on_failure(max_retries=2, delay=0.5) async def create_branch(self, project_id: str, branch_name: str, ref: str = "main") -> str: """Create a new branch in GitHub repository""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._create_branch_sync, project_id, branch_name, ref) logger.info(f"Created branch '{branch_name}' in repository {project_id}") return result except Exception as e: logger.error(f"Failed to create branch '{branch_name}' in repository {project_id}: {e}") raise GitLabError(f"Failed to create branch: {str(e)}") def _create_branch_sync(self, project_id: str, branch_name: str, ref: str) -> str: """Synchronous branch creation""" try: repo = self.github.get_repo(project_id) # Check if branch already exists try: existing_branch = repo.get_branch(branch_name) if existing_branch: logger.warning(f"Branch '{branch_name}' already exists in repository {project_id}") return f"https://github.com/{project_id}/tree/{branch_name}" except GithubException as e: if e.status == 404: # Branch doesn't exist, which is what we want pass else: raise # Get the reference branch ref_branch = repo.get_branch(ref) ref_sha = ref_branch.commit.sha # Create the new branch repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=ref_sha) branch_url = f"https://github.com/{project_id}/tree/{branch_name}" return branch_url except GithubException as e: if e.status == 404: raise GitLabError(f"Repository {project_id} or reference branch '{ref}' not found") elif e.status == 422: # Branch might already exist if "already exists" in str(e.data).lower(): logger.warning(f"Branch '{branch_name}' already exists in repository {project_id}") return f"https://github.com/{project_id}/tree/{branch_name}" else: raise GitLabError(f"Failed to create branch: {e.data}") else: raise GitLabError(f"GitHub API error: {e.status} - {e.data}") except Exception as e: raise GitLabError(f"Unexpected error creating branch: {str(e)}") async def get_projects(self, owned: bool = True) -> List[Dict[str, Any]]: """Get GitHub repositories""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._get_projects_sync, owned) logger.info(f"Retrieved {len(result)} repositories from GitHub") return result except Exception as e: logger.error(f"Failed to get GitHub repositories: {e}") raise GitLabError(f"Failed to fetch repositories: {str(e)}") def _get_projects_sync(self, owned: bool) -> List[Dict[str, Any]]: """Synchronous repository retrieval""" try: if owned: # Get user's own repositories repos = self.github.get_user().get_repos(type="owner", sort="updated") else: # Get all repositories user has access to repos = self.github.get_user().get_repos(type="all", sort="updated") project_list = [] for repo in repos: project_data = { "id": repo.full_name, # Use owner/repo format "name": repo.name, "path": repo.name, "web_url": repo.html_url, "default_branch": repo.default_branch or "main", "description": repo.description or "", "visibility": "private" if repo.private else "public" } project_list.append(project_data) return project_list except GithubException as e: raise GitLabError(f"GitHub API error: {e.status} - {e.data}") except Exception as e: raise GitLabError(f"Unexpected error fetching repositories: {str(e)}") async def get_project(self, project_id: str) -> Optional[Dict[str, Any]]: """Get a specific GitHub repository""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._get_project_sync, project_id) logger.info(f"Retrieved repository {project_id}") return result except Exception as e: logger.error(f"Failed to get repository {project_id}: {e}") raise GitLabError(f"Failed to fetch repository {project_id}: {str(e)}") def _get_project_sync(self, project_id: str) -> Optional[Dict[str, Any]]: """Synchronous single repository retrieval""" try: repo = self.github.get_repo(project_id) return { "id": repo.full_name, "name": repo.name, "path": repo.name, "web_url": repo.html_url, "default_branch": repo.default_branch or "main", "description": repo.description or "", "visibility": "private" if repo.private else "public", "created_at": repo.created_at.isoformat() if repo.created_at else "", "last_activity_at": repo.updated_at.isoformat() if repo.updated_at else "" } except GithubException as e: if e.status == 404: return None else: raise GitLabError(f"GitHub API error: {e.status} - {e.data}") except Exception as e: raise GitLabError(f"Unexpected error fetching repository: {str(e)}") async def get_branches(self, project_id: str) -> List[Dict[str, Any]]: """Get branches for a GitHub repository""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._get_branches_sync, project_id) logger.info(f"Retrieved {len(result)} branches for repository {project_id}") return result except Exception as e: logger.error(f"Failed to get branches for repository {project_id}: {e}") raise GitLabError(f"Failed to fetch branches: {str(e)}") def _get_branches_sync(self, project_id: str) -> List[Dict[str, Any]]: """Synchronous branch retrieval""" try: repo = self.github.get_repo(project_id) branches = repo.get_branches() default_branch = repo.default_branch or "main" branch_list = [] for branch in branches: branch_data = { "name": branch.name, "protected": branch.protected, "merged": False, # GitHub API doesn't provide this directly "default": branch.name == default_branch, "web_url": f"https://github.com/{project_id}/tree/{branch.name}" } branch_list.append(branch_data) return branch_list except GithubException as e: if e.status == 404: raise GitLabError(f"Repository {project_id} not found") else: raise GitLabError(f"GitHub API error: {e.status} - {e.data}") except Exception as e: raise GitLabError(f"Unexpected error fetching branches: {str(e)}") @retry_on_failure(max_retries=2, delay=0.5) async def commit_files(self, project_id: str, branch_name: str, files: List[Dict[str, Any]], commit_message: str) -> str: """Commit files to a GitHub branch""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._commit_files_sync, project_id, branch_name, files, commit_message) logger.info(f"Committed {len(files)} files to branch '{branch_name}' in repository {project_id}") return result except Exception as e: logger.error(f"Failed to commit files to branch '{branch_name}': {e}") raise GitLabError(f"Failed to commit files: {str(e)}") def _commit_files_sync(self, project_id: str, branch_name: str, files: List[Dict[str, Any]], commit_message: str) -> str: """Synchronous file commit""" try: repo = self.github.get_repo(project_id) # Get the current commit of the branch branch = repo.get_branch(branch_name) base_tree = repo.get_git_tree(branch.commit.sha) # Prepare tree elements tree_elements = [] for file_data in files: action = file_data.get('action', 'create') if action == 'delete': # For deletion, we don't include the file in the tree continue content = file_data.get('content', '') # Create blob for the file content blob = repo.create_git_blob(content, "utf-8") tree_element = { "path": file_data['path'], "mode": "100644", # Regular file "type": "blob", "sha": blob.sha } tree_elements.append(tree_element) # Create new tree tree = repo.create_git_tree(tree_elements, base_tree) # Create commit commit = repo.create_git_commit(commit_message, tree, [branch.commit]) # Update branch reference ref = repo.get_git_ref(f"heads/{branch_name}") ref.edit(commit.sha) commit_url = f"https://github.com/{project_id}/commit/{commit.sha}" return commit_url except GithubException as e: if e.status == 404: raise GitLabError(f"Repository {project_id} or branch '{branch_name}' not found") else: raise GitLabError(f"GitHub API error: {e.status} - {e.data}") except Exception as e: raise GitLabError(f"Unexpected error committing files: {str(e)}") @retry_on_failure(max_retries=2, delay=0.5) async def create_merge_request(self, project_id: str, source_branch: str, target_branch: str = "main", title: str = "", description: str = "", draft: bool = True) -> str: """Create a pull request in GitHub""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._create_merge_request_sync, project_id, source_branch, target_branch, title, description, draft) logger.info(f"Created pull request from '{source_branch}' to '{target_branch}' in repository {project_id}") return result except Exception as e: logger.error(f"Failed to create pull request: {e}") raise GitLabError(f"Failed to create pull request: {str(e)}") def _create_merge_request_sync(self, project_id: str, source_branch: str, target_branch: str, title: str, description: str, draft: bool) -> str: """Synchronous pull request creation""" try: repo = self.github.get_repo(project_id) # Prepare PR data pr_title = title or f"AI-generated fix from {source_branch}" pr_body = description or "Automated fix generated by AI assistant" # Mark as draft if requested if draft: pr_title = f"Draft: {pr_title}" # Create pull request pr = repo.create_pull( title=pr_title, body=pr_body, head=source_branch, base=target_branch, draft=draft ) return pr.html_url except GithubException as e: if e.status == 404: raise GitLabError(f"Repository {project_id} not found") elif e.status == 422: # PR might already exist if "already exists" in str(e.data).lower(): # Try to find existing PR try: prs = repo.get_pulls(head=f"{repo.owner.login}:{source_branch}", base=target_branch) for pr in prs: return pr.html_url except: pass raise GitLabError(f"Pull request already exists") else: raise GitLabError(f"Failed to create pull request: {e.data}") else: raise GitLabError(f"GitHub API error: {e.status} - {e.data}") except Exception as e: raise GitLabError(f"Unexpected error creating pull request: {str(e)}") @retry_on_failure(max_retries=2, delay=0.5) async def get_file_content(self, project_id: str, file_path: str, branch: str = "main") -> Optional[str]: """Get file content from GitHub repository""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._get_file_content_sync, project_id, file_path, branch) logger.info(f"Retrieved file '{file_path}' from branch '{branch}' in repository {project_id}") return result except Exception as e: logger.error(f"Failed to get file content: {e}") raise GitLabError(f"Failed to get file content: {str(e)}") def _get_file_content_sync(self, project_id: str, file_path: str, branch: str) -> Optional[str]: """Synchronous file content retrieval""" try: repo = self.github.get_repo(project_id) try: file_content = repo.get_contents(file_path, ref=branch) if file_content.type == "file": # Decode base64 content content = base64.b64decode(file_content.content).decode('utf-8') return content else: raise GitLabError(f"'{file_path}' is not a file") except GithubException as e: if e.status == 404: return None # File doesn't exist else: raise GitLabError(f"GitHub API error: {e.status} - {e.data}") except GithubException as e: if e.status == 404: raise GitLabError(f"Repository {project_id} not found") else: raise GitLabError(f"GitHub API error: {e.status} - {e.data}") except Exception as e: raise GitLabError(f"Unexpected error getting file content: {str(e)}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gabbar910/MCPJiraGitlab'

If you have feedback or need assistance with the MCP directory API, please join our Discord server