get_merge_request_pipeline
Retrieve the latest pipeline data for a GitLab merge request, including job statuses and IDs for accessing detailed logs.
Instructions
Get the last pipeline data for a specific merge request, including all jobs and their statuses. Returns job IDs that can be used with get_job_log to fetch detailed output.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| merge_request_iid | Yes | Internal ID of the merge request |
Input Schema (JSON Schema)
{
"properties": {
"merge_request_iid": {
"description": "Internal ID of the merge request",
"minimum": 1,
"type": "integer"
}
},
"required": [
"merge_request_iid"
],
"type": "object"
}
Implementation Reference
- The primary MCP tool handler. Fetches the MR pipeline via GitLab API, retrieves all jobs, formats a comprehensive Markdown report including pipeline status, job lists grouped by status (failed first), timings, links, and tips for further actions.async def get_merge_request_pipeline(gitlab_url, project_id, access_token, args): """Get the last pipeline data for a merge request with all jobs""" logging.info(f"get_merge_request_pipeline called with args: {args}") mr_iid = args["merge_request_iid"] try: status, pipeline_data, error = await api_get_merge_request_pipeline( gitlab_url, project_id, access_token, mr_iid ) except Exception as e: logging.error(f"Error fetching pipeline: {e}") raise Exception(f"Error fetching merge request pipeline: {e}") if status != 200: logging.error(f"Error fetching pipeline: {status} - {error}") raise Exception(f"Error fetching merge request pipeline: {status} - {error}") if not pipeline_data: result = f"# π§ Pipeline for Merge Request !{mr_iid}\n\n" result += "βΉοΈ No pipeline found for this merge request.\n\n" result += "This could mean:\n" result += "β’ No CI/CD is configured for this project\n" result += "β’ The pipeline hasn't been triggered yet\n" result += "β’ The merge request branch has no commits\n" return [TextContent(type="text", text=result)] # Get jobs for the pipeline pipeline_id = pipeline_data.get("id") jobs_data = [] if pipeline_id: try: jobs_status, jobs_data, jobs_error = await get_pipeline_jobs( gitlab_url, project_id, access_token, pipeline_id ) if jobs_status != 200: logging.warning(f"Could not fetch jobs: {jobs_status} - {jobs_error}") jobs_data = [] except Exception as e: logging.warning(f"Error fetching jobs: {e}") jobs_data = [] # Format the pipeline data pipeline_status = pipeline_data.get("status", "unknown") pipeline_icon = get_pipeline_status_icon(pipeline_status) result = f"# {pipeline_icon} Pipeline for Merge Request !{mr_iid}\n\n" result += "## π Pipeline Overview\n" result += f"**π Pipeline ID**: #{pipeline_data.get('id', 'N/A')}\n" result += f"**π Status**: {pipeline_icon} {pipeline_status}\n" result += f"**π SHA**: `{pipeline_data.get('sha', 'N/A')[:8]}`\n" result += f"**πΏ Ref**: `{pipeline_data.get('ref', 'N/A')}`\n" if pipeline_data.get("source"): result += f"**π Source**: {pipeline_data['source']}\n" if pipeline_data.get("created_at"): result += f"**π Created**: {format_date(pipeline_data['created_at'])}\n" if pipeline_data.get("updated_at"): result += f"**π Updated**: {format_date(pipeline_data['updated_at'])}\n" if pipeline_data.get("started_at"): result += f"**βΆοΈ Started**: {format_date(pipeline_data['started_at'])}\n" if pipeline_data.get("finished_at"): result += f"**βΉοΈ Finished**: {format_date(pipeline_data['finished_at'])}\n" # Duration if pipeline_data.get("duration"): duration_mins = pipeline_data["duration"] // 60 duration_secs = pipeline_data["duration"] % 60 result += f"**β±οΈ Duration**: {duration_mins}m {duration_secs}s\n" if pipeline_data.get("queued_duration"): queued_mins = pipeline_data["queued_duration"] // 60 queued_secs = pipeline_data["queued_duration"] % 60 result += f"**β³ Queued**: {queued_mins}m {queued_secs}s\n" result += "\n" # User info if pipeline_data.get("user"): user = pipeline_data["user"] result += "## π€ Triggered By\n" result += f"**Name**: {user.get('name', 'N/A')}\n" result += f"**Username**: @{user.get('username', 'N/A')}\n" result += "\n" # Coverage if pipeline_data.get("coverage"): result += "## π Code Coverage\n" result += f"**Coverage**: {pipeline_data['coverage']}%\n" result += "\n" # Web URL if pipeline_data.get("web_url"): result += "## π Actions\n" result += f"β’ [View Pipeline Details]({pipeline_data['web_url']})\n" result += "\n" # Jobs information if jobs_data: result += "## π¨ Pipeline Jobs\n\n" # Group jobs by status failed_jobs = [j for j in jobs_data if j.get("status") == "failed"] success_jobs = [j for j in jobs_data if j.get("status") == "success"] running_jobs = [j for j in jobs_data if j.get("status") == "running"] other_jobs = [j for j in jobs_data if j.get("status") not in ["failed", "success", "running"]] result += f"**Total Jobs**: {len(jobs_data)}\n" result += f"**β Success**: {len(success_jobs)} | " result += f"**β Failed**: {len(failed_jobs)} | " result += f"**π Running**: {len(running_jobs)} | " result += f"**β³ Other**: {len(other_jobs)}\n\n" # Show failed jobs first if failed_jobs: result += "### β Failed Jobs\n\n" for job in failed_jobs: job_icon = get_pipeline_status_icon(job.get("status")) result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})" if job.get("duration"): duration_mins = int(job["duration"]) // 60 duration_secs = int(job["duration"]) % 60 result += f" - {duration_mins}m {duration_secs}s" if job.get("web_url"): result += f" - [View]({job['web_url']})" result += "\n" result += "\n*π‘ Tip: Use `get_job_log` with a Job ID to see the full output*\n" result += "\n" # Show running jobs if running_jobs: result += "### π Running Jobs\n\n" for job in running_jobs: job_icon = get_pipeline_status_icon(job.get("status")) result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})" if job.get("web_url"): result += f" - [View]({job['web_url']})" result += "\n" result += "\n" # Show successful jobs (summary) if success_jobs: result += "### β Successful Jobs\n\n" for job in success_jobs: result += f"- β **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})" if job.get("duration"): duration_mins = int(job["duration"]) // 60 duration_secs = int(job["duration"]) % 60 result += f" - {duration_mins}m {duration_secs}s" result += "\n" result += "\n" # Show other jobs if other_jobs: result += "### β³ Other Jobs\n\n" for job in other_jobs: job_icon = get_pipeline_status_icon(job.get("status")) result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')}, " result += f"Status: {job.get('status', 'N/A')})\n" result += "\n" # Status explanation result += "## βΉοΈ Status Information\n" status_explanations = { "success": "β All jobs passed successfully", "failed": "β One or more jobs failed", "running": "π Pipeline is currently running", "pending": "β³ Pipeline is waiting to start", "canceled": "βΉοΈ Pipeline was canceled", "skipped": "βοΈ Pipeline was skipped", "manual": "π€ Waiting for manual action", "created": "π Pipeline was created but not started", "preparing": "π§ Pipeline is preparing to run", "waiting_for_resource": "βΈοΈ Waiting for available resources", "scheduled": "π Pipeline is scheduled to run", } explanation = status_explanations.get(pipeline_status, f"Unknown status: {pipeline_status}") result += f"{explanation}\n" return [TextContent(type="text", text=result)]
- main.py:100-120 (schema)The input schema definition for the tool, specifying merge_request_iid as required integer parameter.Tool( name="get_merge_request_pipeline", description=( "Get the last pipeline data for a specific merge " "request, including all jobs and their statuses. " "Returns job IDs that can be used with get_job_log " "to fetch detailed output." ), inputSchema={ "type": "object", "properties": { "merge_request_iid": { "type": "integer", "minimum": 1, "description": ("Internal ID of the merge request"), } }, "required": ["merge_request_iid"], "additionalProperties": False, }, ),
- main.py:311-314 (registration)Dispatch/registration in the MCP server's call_tool handler, mapping tool name to the handler function.elif name == "get_merge_request_pipeline": return await get_merge_request_pipeline( self.config["gitlab_url"], self.config["project_id"], self.config["access_token"], arguments )
- tools/__init__.py:12-23 (registration)Import and export of the handler function in the tools package.from .get_merge_request_pipeline import get_merge_request_pipeline from .get_merge_request_reviews import get_merge_request_reviews from .get_merge_request_test_report import get_merge_request_test_report from .get_pipeline_test_summary import get_pipeline_test_summary from .list_merge_requests import list_merge_requests from .reply_to_review_comment import create_review_comment, reply_to_review_comment, resolve_review_discussion __all__ = [ "list_merge_requests", "get_merge_request_reviews", "get_merge_request_details", "get_merge_request_pipeline",
- gitlab_api.py:16-25 (helper)Low-level API helper that queries GitLab API for the latest pipeline of a merge request.async def get_merge_request_pipeline(gitlab_url, project_id, access_token, mr_iid): """Get the latest pipeline for a merge request""" url = f"{gitlab_url}/api/v4/projects/{project_id}/" f"merge_requests/{mr_iid}/pipelines" headers = _headers(access_token) async with aiohttp.ClientSession() as session: params = {"per_page": 1} async with session.get(url, headers=headers, params=params) as response: data = await response.json() return (response.status, data[0] if data else None, await response.text())