client.py•8.45 kB
"""
GitHub API Client
This module handles all interactions with the GitHub API.
Uses PyGithub library for easy API access.
"""
import structlog
from github import Github, GithubException
from typing import Dict, Any, List, Optional
import asyncio
from config import settings
logger = structlog.get_logger()
class GitHubClient:
"""
GitHub API client for fetching repository data.
This will be used by the MCP server to retrieve GitHub information
that will be provided to Claude for the RAG pipeline.
"""
def __init__(self, access_token: Optional[str] = None):
"""
Initialize GitHub client.
Args:
access_token: GitHub personal access token (optional)
If not provided, uses token from settings
"""
token = access_token or settings.github_token
if not token:
logger.warning("No GitHub token provided - API rate limits will be very low")
self.github = Github() # Unauthenticated
else:
self.github = Github(token)
logger.info("GitHub client initialized with authentication")
# Test connection
try:
user = self.github.get_user()
logger.info("GitHub connection successful", user=user.login if token else "anonymous")
except Exception as e:
logger.warning("Could not verify GitHub connection", error=str(e))
async def get_repository(self, owner: str, repo: str) -> Dict[str, Any]:
"""
Get repository information.
Args:
owner: Repository owner (user or organization)
repo: Repository name
Returns:
Dictionary with repository information
"""
logger.info("Fetching repository", owner=owner, repo=repo)
try:
# Fetch repository (sync call - will make async later if needed)
repository = await asyncio.to_thread(
self.github.get_repo, f"{owner}/{repo}"
)
# Get README
try:
readme = await asyncio.to_thread(repository.get_readme)
readme_content = await asyncio.to_thread(readme.decoded_content.decode)
except:
readme_content = "No README found"
# Compile repository info
repo_info = {
"name": repository.name,
"full_name": repository.full_name,
"description": repository.description or "No description",
"stars": repository.stargazers_count,
"forks": repository.forks_count,
"language": repository.language,
"topics": repository.get_topics(),
"created_at": repository.created_at.isoformat(),
"updated_at": repository.updated_at.isoformat(),
"url": repository.html_url,
"readme": readme_content[:2000], # First 2000 chars
"default_branch": repository.default_branch,
}
logger.info("Repository fetched successfully", repo=repository.full_name)
return repo_info
except GithubException as e:
logger.error("GitHub API error", error=str(e), status=e.status)
raise ValueError(f"GitHub API error: {e.data.get('message', str(e))}")
except Exception as e:
logger.error("Error fetching repository", error=str(e))
raise
async def get_file_content(self, owner: str, repo: str, path: str) -> Dict[str, Any]:
"""
Get content of a specific file.
Args:
owner: Repository owner
repo: Repository name
path: File path in repository
Returns:
Dictionary with file information and content
"""
logger.info("Fetching file", owner=owner, repo=repo, path=path)
try:
repository = await asyncio.to_thread(
self.github.get_repo, f"{owner}/{repo}"
)
# Get file content
file_content = await asyncio.to_thread(
repository.get_contents, path
)
# Decode content
if isinstance(file_content, list):
# It's a directory
return {
"type": "directory",
"path": path,
"files": [f.path for f in file_content]
}
else:
# It's a file
content = await asyncio.to_thread(file_content.decoded_content.decode)
return {
"type": "file",
"path": file_content.path,
"name": file_content.name,
"size": file_content.size,
"sha": file_content.sha,
"content": content,
"url": file_content.html_url
}
except GithubException as e:
logger.error("GitHub API error", error=str(e))
raise ValueError(f"GitHub API error: {e.data.get('message', str(e))}")
except Exception as e:
logger.error("Error fetching file", error=str(e))
raise
async def search_code(self, query: str, repo: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Search code in GitHub repositories.
Args:
query: Search query
repo: Optional repository filter (format: owner/repo)
Returns:
List of search results
"""
logger.info("Searching code", query=query, repo=repo)
try:
# Build search query
search_query = query
if repo:
search_query = f"{query} repo:{repo}"
# Execute search (limited to top 10 results for now)
results = await asyncio.to_thread(
lambda: list(self.github.search_code(search_query)[:10])
)
# Format results
formatted_results = []
for result in results:
formatted_results.append({
"name": result.name,
"path": result.path,
"repository": result.repository.full_name,
"url": result.html_url,
"sha": result.sha,
})
logger.info("Code search completed", results_count=len(formatted_results))
return formatted_results
except GithubException as e:
logger.error("GitHub API error", error=str(e))
raise ValueError(f"GitHub API error: {e.data.get('message', str(e))}")
except Exception as e:
logger.error("Error searching code", error=str(e))
raise
async def get_repository_structure(self, owner: str, repo: str, max_depth: int = 2) -> Dict[str, Any]:
"""
Get repository file structure.
Args:
owner: Repository owner
repo: Repository name
max_depth: Maximum depth to traverse
Returns:
Dictionary representing file structure
"""
logger.info("Fetching repository structure", owner=owner, repo=repo)
try:
repository = await asyncio.to_thread(
self.github.get_repo, f"{owner}/{repo}"
)
# Get default branch contents
contents = await asyncio.to_thread(
repository.get_contents, ""
)
structure = {
"name": repository.name,
"type": "repository",
"children": []
}
# List top-level items
for content in contents:
item = {
"name": content.name,
"path": content.path,
"type": content.type, # "file" or "dir"
}
if content.type == "file":
item["size"] = content.size
structure["children"].append(item)
return structure
except Exception as e:
logger.error("Error fetching structure", error=str(e))
raise
def close(self):
"""
Close GitHub client connections.
"""
logger.info("Closing GitHub client")
# PyGithub doesn't need explicit cleanup
pass