"""MCP tool definitions for GitLab pipeline and job status checking."""
import time
import logging
from typing import Optional, Dict, Any
from fastmcp import FastMCP
from .git_utils import (
get_project_path_from_working_dir,
get_current_branch,
get_current_commit,
validate_working_dir,
GitUtilsError
)
from .gitlab_client import GitLabClient, GitLabClientError
logger = logging.getLogger(__name__)
def register_tools(mcp: FastMCP, gitlab_client: GitLabClient) -> None:
"""Register MCP tools with the server."""
logger.debug("Starting tool registration...")
check_pipeline_status_schema = {
"type": "object",
"properties": {
"working_directory": {
"type": "string",
"description": "Path to the working directory of the git repository"
}
},
"required": ["working_directory"]
}
@mcp.tool(
description="Check pipeline status for current project and branch. Automatically detects the GitLab project from git remote origin, gets the current branch, and retrieves pipeline status with all jobs.",
annotations={"inputSchema": check_pipeline_status_schema}
)
def check_pipeline_status(working_directory: str) -> str:
"""Check pipeline status for current project and branch."""
try:
validate_working_dir(working_directory)
project_path = get_project_path_from_working_dir(working_directory)
current_branch = get_current_branch(working_directory)
current_commit = get_current_commit(working_directory)
logger.info(f"Checking pipeline: {project_path} {current_branch} {current_commit}")
pipeline_info = gitlab_client.get_pipeline_status(
project_path, current_branch, current_commit
)
return _format_pipeline_response(pipeline_info)
except (GitUtilsError, GitLabClientError) as e:
return _format_error_response(str(e))
except Exception as e:
logger.exception("Unexpected error in check_pipeline_status")
return _format_error_response(f"Unexpected error: {str(e)}")
check_job_status_schema = {
"type": "object",
"properties": {
"working_directory": {
"type": "string",
"description": "Path to the working directory of the git repository"
},
"job_name": {
"type": "string",
"description": "Name of the job to check (optional, use job_id if not provided)"
},
"job_id": {
"type": "integer",
"description": "ID of the job to check (optional, use job_name if not provided)"
}
},
"required": ["working_directory"]
}
@mcp.tool(
description="Check job status with automatic polling until completion. Polls every 2 seconds for up to 30 seconds until the job reaches a terminal state (success, failed, canceled, or skipped). Provide either job_name or job_id to identify the job.",
annotations={"inputSchema": check_job_status_schema}
)
def check_job_status(
working_directory: str,
job_name: Optional[str] = None,
job_id: Optional[int] = None
) -> str:
"""
Check job status with automatic polling until completion.
Polls every 2 seconds for up to 30 seconds until the job reaches
a terminal state (success, failed, canceled, or skipped).
Provide either job_name or job_id to identify the job.
"""
try:
validate_working_dir(working_directory)
if not job_name and not job_id:
return _format_error_response("Either job_name or job_id must be provided")
project_path = get_project_path_from_working_dir(working_directory)
current_branch = get_current_branch(working_directory)
current_commit = get_current_commit(working_directory)
logger.info(f"Checking job: {project_path} {current_branch} {job_name or job_id}")
pipeline_info = gitlab_client.get_pipeline_status(
project_path, current_branch, current_commit
)
pipeline_id = pipeline_info['id']
job_info = _poll_job_status(
gitlab_client, project_path, job_name, job_id, timeout_seconds=30
)
return _format_job_response(job_info, pipeline_id)
except (GitUtilsError, GitLabClientError) as e:
return _format_error_response(str(e))
except Exception as e:
logger.exception("Unexpected error in check_job_status")
return _format_error_response(f"Unexpected error: {str(e)}")
get_job_log_schema = {
"type": "object",
"properties": {
"working_directory": {
"type": "string",
"description": "Path to the working directory of the git repository"
},
"job_id": {
"type": "integer",
"description": "ID of the job to retrieve log for"
}
},
"required": ["working_directory", "job_id"]
}
@mcp.tool(
description="Retrieve the whole raw job log. Returns the complete unformatted log output from a GitLab CI/CD job.",
annotations={"inputSchema": get_job_log_schema}
)
def get_job_log(working_directory: str, job_id: int) -> str:
"""Retrieve the whole raw job log."""
try:
validate_working_dir(working_directory)
project_path = get_project_path_from_working_dir(working_directory)
logger.info(f"Retrieving job log: {project_path} job_id={job_id}")
log_content = gitlab_client.get_job_log(project_path, job_id)
return log_content
except (GitUtilsError, GitLabClientError) as e:
return _format_error_response(str(e))
except Exception as e:
logger.exception("Unexpected error in get_job_log")
return _format_error_response(f"Unexpected error: {str(e)}")
logger.debug(f"check_pipeline_status tool registered")
logger.debug(f"check_job_status tool registered")
logger.debug(f"get_job_log tool registered")
def _poll_job_status(
client: GitLabClient,
project_path: str,
job_name: Optional[str],
job_id: Optional[int],
timeout_seconds: int = 30,
poll_interval: float = 2.0
) -> Dict[str, Any]:
"""Poll job status until completion or timeout."""
start_time = time.time()
last_job_info = None
while True:
elapsed = time.time() - start_time
try:
job_info = client.get_job_status(project_path, job_name, job_id)
last_job_info = job_info
status = job_info.get('status')
terminal_states = {'success', 'failed', 'canceled', 'skipped'}
if status in terminal_states:
job_info['is_polling'] = False
job_info['polling_timeout'] = False
job_info['polling_duration_seconds'] = elapsed
return job_info
if elapsed >= timeout_seconds:
job_info['is_polling'] = True
job_info['polling_timeout'] = True
job_info['polling_duration_seconds'] = elapsed
return job_info
logger.debug(f"Job {job_name or job_id} status: {status}, elapsed: {elapsed:.1f}s")
time.sleep(poll_interval)
except GitLabClientError as e:
if elapsed >= timeout_seconds:
if last_job_info:
last_job_info['is_polling'] = True
last_job_info['polling_timeout'] = True
last_job_info['polling_duration_seconds'] = elapsed
return last_job_info
raise
logger.warning(f"Job query failed, retrying: {str(e)}")
time.sleep(poll_interval)
def _format_pipeline_response(pipeline_info: Dict[str, Any]) -> str:
"""Format pipeline status response as readable text."""
jobs_text = '\n'.join([
f" - {job['name']:40} {job['status']:15} (stage: {job['stage']}, id: {job['id']})"
for job in pipeline_info.get('jobs', [])
])
return f"""Pipeline Status Report
==================
Project Pipeline ID: {pipeline_info['id']}
Status: {pipeline_info['status']}
Reference: {pipeline_info['ref']}
Commit: {pipeline_info['sha'][:8]}
Web URL: {pipeline_info['web_url']}
Timing:
Created: {pipeline_info['created_at']}
Updated: {pipeline_info['updated_at']}
Started: {pipeline_info.get('started_at', 'N/A')}
Finished: {pipeline_info.get('finished_at', 'N/A')}
Duration: {pipeline_info.get('duration', 'N/A')} seconds
Jobs:
{jobs_text}
"""
def _format_job_response(job_info: Dict[str, Any], pipeline_id: int) -> str:
"""Format job status response as readable text."""
polling_status = ""
if job_info.get('is_polling'):
if job_info.get('polling_timeout'):
d = job_info['polling_duration_seconds']
polling_status = f"\n⏱️ POLLING TIMEOUT: Waited {d:.1f}s for job to complete"
else:
d = job_info['polling_duration_seconds']
polling_status = f"\n✓ Job completed after {d:.1f}s of polling"
return f"""Job Status Report
=================
Job ID: {job_info['id']}
Name: {job_info['name']}
Status: {job_info['status']}
Stage: {job_info['stage']}
Pipeline ID: {pipeline_id}
Web URL: {job_info['web_url']}
Timing:
Started: {job_info.get('started_at', 'N/A')}
Finished: {job_info.get('finished_at', 'N/A')}
Duration: {job_info.get('duration', 'N/A')} seconds
{polling_status}
"""
def _format_error_response(error_message: str) -> str:
"""Format error response as readable text."""
return f"""Error
=====
{error_message}
"""