"""GitLab API client with retry logic and session caching."""
import time
import logging
from typing import Optional, List, Dict, Any
import gitlab
from gitlab.exceptions import GitlabError
logger = logging.getLogger(__name__)
class GitLabClientError(Exception):
"""Base exception for GitLab client."""
pass
class GitLabClient:
"""GitLab API client with retry logic and project ID caching."""
# Retry configuration: 3 attempts at 1s, 5s, 9s intervals
RETRY_DELAYS = [1, 5, 9]
MAX_RETRIES = len(RETRY_DELAYS)
def __init__(self, gitlab_url: str, gitlab_token: str):
"""
Initialize GitLab client.
Args:
gitlab_url: GitLab server URL (e.g., https://gitlab.example.com)
gitlab_token: GitLab API token
"""
self.gitlab_url = gitlab_url
self.gitlab_token = gitlab_token
self.gl = gitlab.Gitlab(gitlab_url, private_token=gitlab_token)
# Session cache for project IDs
self._project_id_cache: Dict[str, int] = {}
def _retry_request(self, func, *args, **kwargs) -> Any:
"""
Retry a request with exponential backoff.
Args:
func: Callable to retry
*args: Positional arguments for func
**kwargs: Keyword arguments for func
Returns:
Result from func
Raises:
GitLabClientError: If all retries fail
"""
last_error = None
for attempt in range(self.MAX_RETRIES):
try:
return func(*args, **kwargs)
except (GitlabError, Exception) as e:
last_error = e
if attempt < self.MAX_RETRIES - 1:
delay = self.RETRY_DELAYS[attempt]
logger.warning(
f"Attempt {attempt + 1}/{self.MAX_RETRIES} failed, "
f"retrying in {delay}s: {str(e)}"
)
time.sleep(delay)
raise GitLabClientError(
f"Failed after {self.MAX_RETRIES} attempts: {str(last_error)}"
)
def get_project_id(self, project_path: str) -> int:
"""
Get GitLab project ID from project path.
Uses session cache to avoid repeated API calls.
Args:
project_path: Project path (e.g., 'group/project')
Returns:
GitLab project ID (numeric)
Raises:
GitLabClientError: If project not found or API fails
"""
if project_path in self._project_id_cache:
return self._project_id_cache[project_path]
def fetch_project():
try:
project = self.gl.projects.get(project_path)
return project.id
except gitlab.exceptions.GitlabGetError:
raise GitLabClientError(f"Project not found: {project_path}")
project_id = self._retry_request(fetch_project)
self._project_id_cache[project_path] = project_id
return project_id
def get_pipeline_status(self, project_path: str, branch: str, commit: Optional[str] = None) -> Dict[str, Any]:
"""
Get pipeline status for a branch or commit.
Args:
project_path: Project path (e.g., 'group/project')
branch: Branch name
commit: Optional commit SHA; if not provided, uses latest on branch
Returns:
Dictionary with pipeline status and jobs
Raises:
GitLabClientError: If pipeline not found or API fails
"""
project_id = self.get_project_id(project_path)
def fetch_pipeline():
project = self.gl.projects.get(project_id)
# Find pipeline for branch/commit
if commit:
pipelines = project.pipelines.list(sha=commit, order_by='updated_at', sort='desc', get_all=False)
else:
pipelines = project.pipelines.list(ref=branch, order_by='updated_at', sort='desc', get_all=False)
if not pipelines:
raise GitLabClientError(f"No pipeline found for {branch} (commit: {commit})")
pipeline = pipelines[0]
pipeline.refresh()
# Get jobs for this pipeline
jobs = pipeline.jobs.list(get_all=True)
return {
'id': pipeline.id,
'status': pipeline.status,
'ref': pipeline.ref,
'sha': pipeline.sha,
'created_at': pipeline.created_at,
'updated_at': pipeline.updated_at,
'started_at': getattr(pipeline, 'started_at', None),
'finished_at': getattr(pipeline, 'finished_at', None),
'duration': getattr(pipeline, 'duration', None),
'web_url': pipeline.web_url,
'jobs': [
{
'id': job.id,
'name': job.name,
'status': job.status,
'stage': job.stage,
'started_at': getattr(job, 'started_at', None),
'finished_at': getattr(job, 'finished_at', None),
'duration': getattr(job, 'duration', None),
'web_url': job.web_url,
}
for job in jobs
]
}
return self._retry_request(fetch_pipeline)
def get_job_status(self, project_path: str, job_name: Optional[str] = None, job_id: Optional[int] = None) -> Dict[str, Any]:
"""
Get status of a specific job.
Args:
project_path: Project path (e.g., 'group/project')
job_name: Job name (will search for matching job)
job_id: Job ID (specific job)
Returns:
Dictionary with job details
Raises:
GitLabClientError: If job not found or API fails
"""
project_id = self.get_project_id(project_path)
def fetch_job():
project = self.gl.projects.get(project_id)
if job_id:
job = project.jobs.get(job_id)
elif job_name:
# Search for job by name in recent jobs
jobs = project.jobs.list(get_all=False)
matching_jobs = [j for j in jobs if j.name == job_name]
if not matching_jobs:
raise GitLabClientError(f"Job not found: {job_name}")
job = matching_jobs[0]
else:
raise GitLabClientError("Either job_id or job_name must be provided")
return {
'id': job.id,
'name': job.name,
'status': job.status,
'stage': job.stage,
'started_at': getattr(job, 'started_at', None),
'finished_at': getattr(job, 'finished_at', None),
'duration': getattr(job, 'duration', None),
'web_url': job.web_url,
}
return self._retry_request(fetch_job)
def get_job_log(self, project_path: str, job_id: int) -> str:
"""
Get the raw log content for a job.
Args:
project_path: Project path (e.g., 'group/project')
job_id: Job ID
Returns:
Raw job log content as string
Raises:
GitLabClientError: If job not found or API fails
"""
project_id = self.get_project_id(project_path)
def fetch_log():
project = self.gl.projects.get(project_id)
job = project.jobs.get(job_id)
return job.trace()
return self._retry_request(fetch_log)
def get_merge_request_for_branch(self, project_path: str, branch: str) -> Optional[Dict[str, Any]]:
"""
Get open merge request for a branch.
Args:
project_path: Project path (e.g., 'group/project')
branch: Branch name
Returns:
Dictionary with MR details or None if no open MR
Raises:
GitLabClientError: If API fails
"""
project_id = self.get_project_id(project_path)
def fetch_mr():
project = self.gl.projects.get(project_id)
# Search for open MRs on this branch
mrs = project.mergerequests.list(
source_branch=branch,
state='opened',
get_all=False
)
if not mrs:
return None
mr = mrs[0]
return {
'id': mr.iid,
'title': mr.title,
'state': mr.state,
'web_url': mr.web_url,
}
return self._retry_request(fetch_mr)