"""
GitHub repository analyzer for extracting business value using Gemma3.
This module analyzes GitHub repositories to extract business insights,
technical benefits, and value propositions for case study generation.
"""
import logging
import re
import asyncio
from typing import Dict, Any, List, Optional, Tuple
from urllib.parse import urlparse
import requests
from github import Github, GithubException
from gemma3_client import get_gemma3_client, Gemma3ClientError
from prompts import get_github_prompt, get_analysis_config
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GitHubAnalyzerError(Exception):
"""Base exception for GitHub analyzer errors."""
pass
class GitHubAnalyzer:
"""
Analyzes GitHub repositories for business value using Gemma3.
Fetches repository metadata, analyzes key files, and extracts
structured business insights suitable for case study generation.
"""
def __init__(self, gemma3_client=None, github_token=None):
"""
Initialize GitHub analyzer.
Args:
gemma3_client: Optional Gemma3Client instance
github_token: Optional GitHub API token for higher rate limits
"""
self.gemma3 = gemma3_client or get_gemma3_client()
# Initialize GitHub client
if github_token:
self.github = Github(github_token)
else:
self.github = Github() # Anonymous access with lower rate limits
# File patterns to analyze for tech stack detection
self.tech_indicators = {
"Python": [r"\.py$", r"requirements\.txt", r"setup\.py", r"pyproject\.toml"],
"JavaScript/Node.js": [r"\.js$", r"\.ts$", r"package\.json", r"yarn\.lock"],
"Java": [r"\.java$", r"pom\.xml", r"build\.gradle"],
"C#/.NET": [r"\.cs$", r"\.csproj$", r"\.sln$"],
"Go": [r"\.go$", r"go\.mod"],
"Rust": [r"\.rs$", r"Cargo\.toml"],
"Docker": [r"Dockerfile", r"docker-compose\.yml"],
"Kubernetes": [r"\.yaml$", r"\.yml$"],
"React": [r"package\.json"],
"Vue": [r"vue\.config\.js"],
"Angular": [r"angular\.json"]
}
# Key files to analyze for insights
self.key_files_patterns = [
r"README\.md$",
r"README\.rst$",
r"CHANGELOG\.md$",
r"package\.json$",
r"requirements\.txt$",
r"pyproject\.toml$",
r"Dockerfile$",
r"docker-compose\.yml$",
r"\.github/workflows/.*\.yml$"
]
def _parse_repository_url(self, repo_url: str) -> Tuple[str, str]:
"""
Parse GitHub repository URL to extract owner and repo name.
Args:
repo_url: GitHub repository URL
Returns:
Tuple of (owner, repo_name)
Raises:
GitHubAnalyzerError: If URL is invalid
"""
try:
# Clean up URL
url = repo_url.strip()
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
parsed = urlparse(url)
# Extract path components
path_parts = [p for p in parsed.path.split('/') if p]
if len(path_parts) < 2:
raise GitHubAnalyzerError(f"Invalid repository URL: {repo_url}")
owner = path_parts[0]
repo_name = path_parts[1]
# Remove .git suffix if present
if repo_name.endswith('.git'):
repo_name = repo_name[:-4]
return owner, repo_name
except Exception as e:
raise GitHubAnalyzerError(f"Failed to parse repository URL: {e}")
def _detect_tech_stack(self, files: List[str]) -> List[str]:
"""
Detect technology stack from file list.
Args:
files: List of file paths in repository
Returns:
List of detected technologies
"""
detected_tech = set()
for tech, patterns in self.tech_indicators.items():
for pattern in patterns:
if any(re.search(pattern, file_path, re.IGNORECASE) for file_path in files):
detected_tech.add(tech)
break
return list(detected_tech)
def _fetch_repository_data(self, repo_url: str) -> Dict[str, Any]:
"""
Fetch repository metadata and key files from GitHub.
Args:
repo_url: GitHub repository URL
Returns:
Repository data dictionary
Raises:
GitHubAnalyzerError: If repository fetch fails
"""
try:
owner, repo_name = self._parse_repository_url(repo_url)
logger.info(f"Fetching repository data for {owner}/{repo_name}")
# Get repository object
repo = self.github.get_repo(f"{owner}/{repo_name}")
# Get basic repository info
repo_data = {
"name": repo.full_name,
"description": repo.description or "No description available",
"language": repo.language,
"stars": repo.stargazers_count,
"forks": repo.forks_count,
"created_at": repo.created_at.isoformat() if repo.created_at else None,
"updated_at": repo.updated_at.isoformat() if repo.updated_at else None,
"topics": repo.get_topics(),
"license": repo.license.name if repo.license else "No license",
"homepage": repo.homepage
}
# Get file list (limited to avoid rate limits)
try:
contents = repo.get_contents("")
all_files = []
def collect_files(contents_list, depth=0, max_depth=2):
"""Recursively collect files with depth limit."""
if depth > max_depth:
return
for content in contents_list:
if content.type == "file":
all_files.append(content.path)
elif content.type == "dir" and depth < max_depth:
try:
sub_contents = repo.get_contents(content.path)
collect_files(sub_contents, depth + 1, max_depth)
except:
continue # Skip directories we can't access
collect_files(contents)
repo_data["files"] = all_files[:100] # Limit for performance
except Exception as e:
logger.warning(f"Could not fetch file list: {e}")
repo_data["files"] = []
# Detect tech stack
repo_data["tech_stack"] = self._detect_tech_stack(repo_data["files"])
# Get README content
try:
readme = repo.get_readme()
readme_content = readme.decoded_content.decode('utf-8')
# Truncate README if too long
if len(readme_content) > 3000:
readme_content = readme_content[:3000] + "... [truncated]"
repo_data["readme_content"] = readme_content
except:
repo_data["readme_content"] = "No README available"
# Get key files
key_files_content = {}
for file_path in repo_data["files"]:
if any(re.search(pattern, file_path, re.IGNORECASE) for pattern in self.key_files_patterns):
try:
file_content = repo.get_contents(file_path)
content = file_content.decoded_content.decode('utf-8')
# Truncate if too long
if len(content) > 1000:
content = content[:1000] + "... [truncated]"
key_files_content[file_path] = content
except:
continue # Skip files we can't read
# Limit number of files to avoid overwhelming the prompt
if len(key_files_content) >= 5:
break
repo_data["key_files"] = key_files_content
return repo_data
except GithubException as e:
if e.status == 404:
raise GitHubAnalyzerError(f"Repository not found: {repo_url}")
elif e.status == 403:
raise GitHubAnalyzerError(f"Access denied to repository: {repo_url}")
else:
raise GitHubAnalyzerError(f"GitHub API error: {e}")
except Exception as e:
raise GitHubAnalyzerError(f"Failed to fetch repository data: {e}")
def _validate_analysis_response(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate and sanitize Gemma3 analysis response.
Args:
response: Raw response from Gemma3
Returns:
Validated and structured response
"""
# Check for processing errors
if "error" in response:
logger.warning(f"Gemma3 analysis error: {response.get('error')}")
return self._create_fallback_analysis(response.get("raw_response", ""))
# Expected fields for GitHub analysis
expected_fields = {
"problem_solved": "",
"key_features": [],
"business_value": "",
"technical_benefits": [],
"target_users": "",
"scalability": "",
"integration_points": []
}
# Validate and fill missing fields
validated = {}
for field, default in expected_fields.items():
value = response.get(field, default)
# Ensure lists are actually lists
if field in ["key_features", "technical_benefits", "integration_points"]:
if not isinstance(value, list):
if isinstance(value, str) and value:
value = [value]
else:
value = default
# Filter empty strings
value = [item for item in value if item and isinstance(item, str)]
# Ensure strings are strings
else:
if not isinstance(value, str):
value = str(value) if value else default
validated[field] = value
return validated
def _create_fallback_analysis(self, raw_response: str) -> Dict[str, Any]:
"""
Create a fallback analysis when Gemma3 processing fails.
Args:
raw_response: Original response text
Returns:
Basic structured analysis
"""
return {
"problem_solved": "Repository analysis failed - manual review required",
"key_features": ["Code repository with technical implementation"],
"business_value": "Technical solution with potential business applications",
"technical_benefits": ["Open source code available for review"],
"target_users": "Developers and technical teams",
"scalability": "Scalability assessment requires manual review",
"integration_points": ["Integration capabilities require further analysis"],
"analysis_note": "Fallback response due to processing error",
"raw_response_sample": raw_response[:200] + "..." if len(raw_response) > 200 else raw_response
}
def analyze_repository(self, repo_url: str) -> Dict[str, Any]:
"""
Analyze GitHub repository for business value and insights.
Args:
repo_url: GitHub repository URL
Returns:
Structured repository analysis
Raises:
GitHubAnalyzerError: If analysis fails critically
"""
try:
# Fetch repository data
repo_data = self._fetch_repository_data(repo_url)
logger.info(f"Analyzing repository: {repo_data['name']}")
# Prepare data for analysis
analysis_data = {
"repo_name": repo_data["name"],
"description": repo_data["description"],
"tech_stack": ", ".join(repo_data["tech_stack"]) if repo_data["tech_stack"] else "Not detected",
"key_files": "\n".join([f"{path}: {content[:200]}..." for path, content in repo_data["key_files"].items()]),
"readme_content": repo_data["readme_content"]
}
# Generate prompt and analyze with Gemma3
prompt = get_github_prompt(analysis_data)
response = self.gemma3.process_with_json(prompt, "github_analysis")
# Validate and structure response
validated_response = self._validate_analysis_response(response)
# Add repository metadata
validated_response.update({
"repository_url": repo_url,
"repository_name": repo_data["name"],
"tech_stack": repo_data["tech_stack"],
"stars": repo_data["stars"],
"forks": repo_data["forks"],
"language": repo_data["language"],
"license": repo_data["license"],
"topics": repo_data["topics"],
"analysis_success": True
})
logger.info("Repository analysis completed successfully")
return validated_response
except Gemma3ClientError as e:
logger.error(f"Gemma3 client error: {e}")
# Return fallback response
fallback = self._create_fallback_analysis(str(e))
fallback["repository_url"] = repo_url
fallback["analysis_success"] = False
fallback["error_type"] = "gemma3_error"
return fallback
except GitHubAnalyzerError:
raise # Re-raise GitHub-specific errors
except Exception as e:
logger.error(f"Repository analysis failed: {e}")
raise GitHubAnalyzerError(f"Analysis failed: {e}")
async def analyze_repository_async(self, repo_url: str) -> Dict[str, Any]:
"""
Analyze GitHub repository asynchronously.
Args:
repo_url: GitHub repository URL
Returns:
Structured repository analysis
"""
try:
# Fetch repository data (sync operation)
repo_data = await asyncio.to_thread(self._fetch_repository_data, repo_url)
logger.info(f"Analyzing repository: {repo_data['name']} [async]")
# Prepare data for analysis
analysis_data = {
"repo_name": repo_data["name"],
"description": repo_data["description"],
"tech_stack": ", ".join(repo_data["tech_stack"]) if repo_data["tech_stack"] else "Not detected",
"key_files": "\n".join([f"{path}: {content[:200]}..." for path, content in repo_data["key_files"].items()]),
"readme_content": repo_data["readme_content"]
}
# Generate prompt and analyze with Gemma3
prompt = get_github_prompt(analysis_data)
response = await self.gemma3.process_with_json_async(prompt, "github_analysis")
# Validate and structure response
validated_response = self._validate_analysis_response(response)
# Add repository metadata
validated_response.update({
"repository_url": repo_url,
"repository_name": repo_data["name"],
"tech_stack": repo_data["tech_stack"],
"stars": repo_data["stars"],
"forks": repo_data["forks"],
"language": repo_data["language"],
"license": repo_data["license"],
"topics": repo_data["topics"],
"analysis_success": True
})
logger.info("Repository analysis completed successfully [async]")
return validated_response
except Gemma3ClientError as e:
logger.error(f"Gemma3 client error: {e}")
# Return fallback response
fallback = self._create_fallback_analysis(str(e))
fallback["repository_url"] = repo_url
fallback["analysis_success"] = False
fallback["error_type"] = "gemma3_error"
return fallback
except GitHubAnalyzerError:
raise # Re-raise GitHub-specific errors
except Exception as e:
logger.error(f"Repository analysis failed: {e}")
raise GitHubAnalyzerError(f"Analysis failed: {e}")
def get_rate_limit_info(self) -> Dict[str, Any]:
"""
Get GitHub API rate limit information.
Returns:
Rate limit status
"""
try:
rate_limit = self.github.get_rate_limit()
return {
"core_limit": rate_limit.core.limit,
"core_remaining": rate_limit.core.remaining,
"core_reset": rate_limit.core.reset.isoformat() if rate_limit.core.reset else None,
"search_limit": rate_limit.search.limit,
"search_remaining": rate_limit.search.remaining
}
except Exception as e:
return {"error": f"Failed to get rate limit info: {e}"}
def health_check(self) -> Dict[str, Any]:
"""
Perform health check on the GitHub analyzer.
Returns:
Health status information
"""
try:
# Test GitHub API access
rate_limit = self.get_rate_limit_info()
# Test Gemma3 client
gemma_health = self.gemma3.health_check()
return {
"status": "healthy",
"github_api": "accessible",
"rate_limit": rate_limit,
"gemma3_status": gemma_health["status"]
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"github_api": "error",
"gemma3_status": "unknown"
}