github_client.py•17.9 kB
import asyncio
import logging
from typing import Dict, List, Any, Optional
import base64
import json
from github import Github, GithubException
from github.Repository import Repository
from github.Branch import Branch
from utils.error_handler import GitLabError, AuthenticationError, retry_on_failure
logger = logging.getLogger(__name__)
class GitHubClient:
def __init__(self, config: Dict[str, str]):
self.config = config
self.base_url = config.get("base_url", "https://api.github.com")
self.access_token = config["access_token"]
# Initialize PyGithub client
if self.base_url == "https://api.github.com":
self.github = Github(self.access_token)
else:
# For GitHub Enterprise
self.github = Github(base_url=self.base_url, login_or_token=self.access_token)
logger.info(f"GitHub client initialized for {self.base_url}")
@retry_on_failure(max_retries=3, delay=1.0)
async def test_connection(self) -> bool:
"""Test the GitHub connection"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._test_connection_sync)
logger.info("GitHub connection test successful")
return result
except Exception as e:
logger.error(f"GitHub connection test failed: {e}")
raise AuthenticationError(f"Failed to connect to GitHub: {str(e)}", "github")
def _test_connection_sync(self) -> bool:
"""Synchronous connection test"""
try:
# Test authentication by getting current user
user = self.github.get_user()
user_login = user.login # This will trigger the API call
logger.info(f"GitHub connection successful for user: {user_login}")
return True
except GithubException as e:
if e.status == 401:
raise AuthenticationError("Invalid GitHub access token", "github")
else:
raise GitLabError(f"GitHub API error: {e.status} - {e.data}")
except Exception as e:
raise GitLabError(f"Unexpected error: {str(e)}")
@retry_on_failure(max_retries=2, delay=0.5)
async def create_branch(self, project_id: str, branch_name: str, ref: str = "main") -> str:
"""Create a new branch in GitHub repository"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._create_branch_sync, project_id, branch_name, ref)
logger.info(f"Created branch '{branch_name}' in repository {project_id}")
return result
except Exception as e:
logger.error(f"Failed to create branch '{branch_name}' in repository {project_id}: {e}")
raise GitLabError(f"Failed to create branch: {str(e)}")
def _create_branch_sync(self, project_id: str, branch_name: str, ref: str) -> str:
"""Synchronous branch creation"""
try:
repo = self.github.get_repo(project_id)
# Check if branch already exists
try:
existing_branch = repo.get_branch(branch_name)
if existing_branch:
logger.warning(f"Branch '{branch_name}' already exists in repository {project_id}")
return f"https://github.com/{project_id}/tree/{branch_name}"
except GithubException as e:
if e.status == 404:
# Branch doesn't exist, which is what we want
pass
else:
raise
# Get the reference branch
ref_branch = repo.get_branch(ref)
ref_sha = ref_branch.commit.sha
# Create the new branch
repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=ref_sha)
branch_url = f"https://github.com/{project_id}/tree/{branch_name}"
return branch_url
except GithubException as e:
if e.status == 404:
raise GitLabError(f"Repository {project_id} or reference branch '{ref}' not found")
elif e.status == 422:
# Branch might already exist
if "already exists" in str(e.data).lower():
logger.warning(f"Branch '{branch_name}' already exists in repository {project_id}")
return f"https://github.com/{project_id}/tree/{branch_name}"
else:
raise GitLabError(f"Failed to create branch: {e.data}")
else:
raise GitLabError(f"GitHub API error: {e.status} - {e.data}")
except Exception as e:
raise GitLabError(f"Unexpected error creating branch: {str(e)}")
async def get_projects(self, owned: bool = True) -> List[Dict[str, Any]]:
"""Get GitHub repositories"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._get_projects_sync, owned)
logger.info(f"Retrieved {len(result)} repositories from GitHub")
return result
except Exception as e:
logger.error(f"Failed to get GitHub repositories: {e}")
raise GitLabError(f"Failed to fetch repositories: {str(e)}")
def _get_projects_sync(self, owned: bool) -> List[Dict[str, Any]]:
"""Synchronous repository retrieval"""
try:
if owned:
# Get user's own repositories
repos = self.github.get_user().get_repos(type="owner", sort="updated")
else:
# Get all repositories user has access to
repos = self.github.get_user().get_repos(type="all", sort="updated")
project_list = []
for repo in repos:
project_data = {
"id": repo.full_name, # Use owner/repo format
"name": repo.name,
"path": repo.name,
"web_url": repo.html_url,
"default_branch": repo.default_branch or "main",
"description": repo.description or "",
"visibility": "private" if repo.private else "public"
}
project_list.append(project_data)
return project_list
except GithubException as e:
raise GitLabError(f"GitHub API error: {e.status} - {e.data}")
except Exception as e:
raise GitLabError(f"Unexpected error fetching repositories: {str(e)}")
async def get_project(self, project_id: str) -> Optional[Dict[str, Any]]:
"""Get a specific GitHub repository"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._get_project_sync, project_id)
logger.info(f"Retrieved repository {project_id}")
return result
except Exception as e:
logger.error(f"Failed to get repository {project_id}: {e}")
raise GitLabError(f"Failed to fetch repository {project_id}: {str(e)}")
def _get_project_sync(self, project_id: str) -> Optional[Dict[str, Any]]:
"""Synchronous single repository retrieval"""
try:
repo = self.github.get_repo(project_id)
return {
"id": repo.full_name,
"name": repo.name,
"path": repo.name,
"web_url": repo.html_url,
"default_branch": repo.default_branch or "main",
"description": repo.description or "",
"visibility": "private" if repo.private else "public",
"created_at": repo.created_at.isoformat() if repo.created_at else "",
"last_activity_at": repo.updated_at.isoformat() if repo.updated_at else ""
}
except GithubException as e:
if e.status == 404:
return None
else:
raise GitLabError(f"GitHub API error: {e.status} - {e.data}")
except Exception as e:
raise GitLabError(f"Unexpected error fetching repository: {str(e)}")
async def get_branches(self, project_id: str) -> List[Dict[str, Any]]:
"""Get branches for a GitHub repository"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._get_branches_sync, project_id)
logger.info(f"Retrieved {len(result)} branches for repository {project_id}")
return result
except Exception as e:
logger.error(f"Failed to get branches for repository {project_id}: {e}")
raise GitLabError(f"Failed to fetch branches: {str(e)}")
def _get_branches_sync(self, project_id: str) -> List[Dict[str, Any]]:
"""Synchronous branch retrieval"""
try:
repo = self.github.get_repo(project_id)
branches = repo.get_branches()
default_branch = repo.default_branch or "main"
branch_list = []
for branch in branches:
branch_data = {
"name": branch.name,
"protected": branch.protected,
"merged": False, # GitHub API doesn't provide this directly
"default": branch.name == default_branch,
"web_url": f"https://github.com/{project_id}/tree/{branch.name}"
}
branch_list.append(branch_data)
return branch_list
except GithubException as e:
if e.status == 404:
raise GitLabError(f"Repository {project_id} not found")
else:
raise GitLabError(f"GitHub API error: {e.status} - {e.data}")
except Exception as e:
raise GitLabError(f"Unexpected error fetching branches: {str(e)}")
@retry_on_failure(max_retries=2, delay=0.5)
async def commit_files(self, project_id: str, branch_name: str, files: List[Dict[str, Any]],
commit_message: str) -> str:
"""Commit files to a GitHub branch"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._commit_files_sync,
project_id, branch_name, files, commit_message)
logger.info(f"Committed {len(files)} files to branch '{branch_name}' in repository {project_id}")
return result
except Exception as e:
logger.error(f"Failed to commit files to branch '{branch_name}': {e}")
raise GitLabError(f"Failed to commit files: {str(e)}")
def _commit_files_sync(self, project_id: str, branch_name: str, files: List[Dict[str, Any]],
commit_message: str) -> str:
"""Synchronous file commit"""
try:
repo = self.github.get_repo(project_id)
# Get the current commit of the branch
branch = repo.get_branch(branch_name)
base_tree = repo.get_git_tree(branch.commit.sha)
# Prepare tree elements
tree_elements = []
for file_data in files:
action = file_data.get('action', 'create')
if action == 'delete':
# For deletion, we don't include the file in the tree
continue
content = file_data.get('content', '')
# Create blob for the file content
blob = repo.create_git_blob(content, "utf-8")
tree_element = {
"path": file_data['path'],
"mode": "100644", # Regular file
"type": "blob",
"sha": blob.sha
}
tree_elements.append(tree_element)
# Create new tree
tree = repo.create_git_tree(tree_elements, base_tree)
# Create commit
commit = repo.create_git_commit(commit_message, tree, [branch.commit])
# Update branch reference
ref = repo.get_git_ref(f"heads/{branch_name}")
ref.edit(commit.sha)
commit_url = f"https://github.com/{project_id}/commit/{commit.sha}"
return commit_url
except GithubException as e:
if e.status == 404:
raise GitLabError(f"Repository {project_id} or branch '{branch_name}' not found")
else:
raise GitLabError(f"GitHub API error: {e.status} - {e.data}")
except Exception as e:
raise GitLabError(f"Unexpected error committing files: {str(e)}")
@retry_on_failure(max_retries=2, delay=0.5)
async def create_merge_request(self, project_id: str, source_branch: str, target_branch: str = "main",
title: str = "", description: str = "", draft: bool = True) -> str:
"""Create a pull request in GitHub"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._create_merge_request_sync,
project_id, source_branch, target_branch, title, description, draft)
logger.info(f"Created pull request from '{source_branch}' to '{target_branch}' in repository {project_id}")
return result
except Exception as e:
logger.error(f"Failed to create pull request: {e}")
raise GitLabError(f"Failed to create pull request: {str(e)}")
def _create_merge_request_sync(self, project_id: str, source_branch: str, target_branch: str,
title: str, description: str, draft: bool) -> str:
"""Synchronous pull request creation"""
try:
repo = self.github.get_repo(project_id)
# Prepare PR data
pr_title = title or f"AI-generated fix from {source_branch}"
pr_body = description or "Automated fix generated by AI assistant"
# Mark as draft if requested
if draft:
pr_title = f"Draft: {pr_title}"
# Create pull request
pr = repo.create_pull(
title=pr_title,
body=pr_body,
head=source_branch,
base=target_branch,
draft=draft
)
return pr.html_url
except GithubException as e:
if e.status == 404:
raise GitLabError(f"Repository {project_id} not found")
elif e.status == 422:
# PR might already exist
if "already exists" in str(e.data).lower():
# Try to find existing PR
try:
prs = repo.get_pulls(head=f"{repo.owner.login}:{source_branch}", base=target_branch)
for pr in prs:
return pr.html_url
except:
pass
raise GitLabError(f"Pull request already exists")
else:
raise GitLabError(f"Failed to create pull request: {e.data}")
else:
raise GitLabError(f"GitHub API error: {e.status} - {e.data}")
except Exception as e:
raise GitLabError(f"Unexpected error creating pull request: {str(e)}")
@retry_on_failure(max_retries=2, delay=0.5)
async def get_file_content(self, project_id: str, file_path: str, branch: str = "main") -> Optional[str]:
"""Get file content from GitHub repository"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._get_file_content_sync,
project_id, file_path, branch)
logger.info(f"Retrieved file '{file_path}' from branch '{branch}' in repository {project_id}")
return result
except Exception as e:
logger.error(f"Failed to get file content: {e}")
raise GitLabError(f"Failed to get file content: {str(e)}")
def _get_file_content_sync(self, project_id: str, file_path: str, branch: str) -> Optional[str]:
"""Synchronous file content retrieval"""
try:
repo = self.github.get_repo(project_id)
try:
file_content = repo.get_contents(file_path, ref=branch)
if file_content.type == "file":
# Decode base64 content
content = base64.b64decode(file_content.content).decode('utf-8')
return content
else:
raise GitLabError(f"'{file_path}' is not a file")
except GithubException as e:
if e.status == 404:
return None # File doesn't exist
else:
raise GitLabError(f"GitHub API error: {e.status} - {e.data}")
except GithubException as e:
if e.status == 404:
raise GitLabError(f"Repository {project_id} not found")
else:
raise GitLabError(f"GitHub API error: {e.status} - {e.data}")
except Exception as e:
raise GitLabError(f"Unexpected error getting file content: {str(e)}")