get_merge_request_pipeline
Retrieve the latest pipeline data for a GitLab merge request, including job statuses and IDs for accessing detailed logs.
Instructions
Get the last pipeline data for a specific merge request, including all jobs and their statuses. Returns job IDs that can be used with get_job_log to fetch detailed output.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| merge_request_iid | Yes | Internal ID of the merge request |
Implementation Reference
- Primary tool handler: extracts MR IID from args, calls GitLab API for pipeline and jobs, handles errors, and generates detailed Markdown output with pipeline overview, job lists grouped by status (failed first), timings, links, and status explanations.async def get_merge_request_pipeline(gitlab_url, project_id, access_token, args): """Get the last pipeline data for a merge request with all jobs""" logging.info(f"get_merge_request_pipeline called with args: {args}") mr_iid = args["merge_request_iid"] try: status, pipeline_data, error = await api_get_merge_request_pipeline( gitlab_url, project_id, access_token, mr_iid ) except Exception as e: logging.error(f"Error fetching pipeline: {e}") raise Exception(f"Error fetching merge request pipeline: {e}") if status != 200: logging.error(f"Error fetching pipeline: {status} - {error}") raise Exception(f"Error fetching merge request pipeline: {status} - {error}") if not pipeline_data: result = f"# π§ Pipeline for Merge Request !{mr_iid}\n\n" result += "βΉοΈ No pipeline found for this merge request.\n\n" result += "This could mean:\n" result += "β’ No CI/CD is configured for this project\n" result += "β’ The pipeline hasn't been triggered yet\n" result += "β’ The merge request branch has no commits\n" return [TextContent(type="text", text=result)] # Get jobs for the pipeline pipeline_id = pipeline_data.get("id") jobs_data = [] if pipeline_id: try: jobs_status, jobs_data, jobs_error = await get_pipeline_jobs( gitlab_url, project_id, access_token, pipeline_id ) if jobs_status != 200: logging.warning(f"Could not fetch jobs: {jobs_status} - {jobs_error}") jobs_data = [] except Exception as e: logging.warning(f"Error fetching jobs: {e}") jobs_data = [] # Format the pipeline data pipeline_status = pipeline_data.get("status", "unknown") pipeline_icon = get_pipeline_status_icon(pipeline_status) result = f"# {pipeline_icon} Pipeline for Merge Request !{mr_iid}\n\n" result += "## π Pipeline Overview\n" result += f"**π Pipeline ID**: #{pipeline_data.get('id', 'N/A')}\n" result += f"**π Status**: {pipeline_icon} {pipeline_status}\n" result += f"**π SHA**: `{pipeline_data.get('sha', 'N/A')[:8]}`\n" result += f"**πΏ Ref**: `{pipeline_data.get('ref', 'N/A')}`\n" if pipeline_data.get("source"): result += f"**π Source**: {pipeline_data['source']}\n" if pipeline_data.get("created_at"): result += f"**π Created**: {format_date(pipeline_data['created_at'])}\n" if pipeline_data.get("updated_at"): result += f"**π Updated**: {format_date(pipeline_data['updated_at'])}\n" if pipeline_data.get("started_at"): result += f"**βΆοΈ Started**: {format_date(pipeline_data['started_at'])}\n" if pipeline_data.get("finished_at"): result += f"**βΉοΈ Finished**: {format_date(pipeline_data['finished_at'])}\n" # Duration if pipeline_data.get("duration"): duration_mins = pipeline_data["duration"] // 60 duration_secs = pipeline_data["duration"] % 60 result += f"**β±οΈ Duration**: {duration_mins}m {duration_secs}s\n" if pipeline_data.get("queued_duration"): queued_mins = pipeline_data["queued_duration"] // 60 queued_secs = pipeline_data["queued_duration"] % 60 result += f"**β³ Queued**: {queued_mins}m {queued_secs}s\n" result += "\n" # User info if pipeline_data.get("user"): user = pipeline_data["user"] result += "## π€ Triggered By\n" result += f"**Name**: {user.get('name', 'N/A')}\n" result += f"**Username**: @{user.get('username', 'N/A')}\n" result += "\n" # Coverage if pipeline_data.get("coverage"): result += "## π Code Coverage\n" result += f"**Coverage**: {pipeline_data['coverage']}%\n" result += "\n" # Web URL if pipeline_data.get("web_url"): result += "## π Actions\n" result += f"β’ [View Pipeline Details]({pipeline_data['web_url']})\n" result += "\n" # Jobs information if jobs_data: result += "## π¨ Pipeline Jobs\n\n" # Group jobs by status failed_jobs = [j for j in jobs_data if j.get("status") == "failed"] success_jobs = [j for j in jobs_data if j.get("status") == "success"] running_jobs = [j for j in jobs_data if j.get("status") == "running"] other_jobs = [j for j in jobs_data if j.get("status") not in ["failed", "success", "running"]] result += f"**Total Jobs**: {len(jobs_data)}\n" result += f"**β Success**: {len(success_jobs)} | " result += f"**β Failed**: {len(failed_jobs)} | " result += f"**π Running**: {len(running_jobs)} | " result += f"**β³ Other**: {len(other_jobs)}\n\n" # Show failed jobs first if failed_jobs: result += "### β Failed Jobs\n\n" for job in failed_jobs: job_icon = get_pipeline_status_icon(job.get("status")) result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})" if job.get("duration"): duration_mins = int(job["duration"]) // 60 duration_secs = int(job["duration"]) % 60 result += f" - {duration_mins}m {duration_secs}s" if job.get("web_url"): result += f" - [View]({job['web_url']})" result += "\n" result += "\n*π‘ Tip: Use `get_job_log` with a Job ID to see the full output*\n" result += "\n" # Show running jobs if running_jobs: result += "### π Running Jobs\n\n" for job in running_jobs: job_icon = get_pipeline_status_icon(job.get("status")) result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})" if job.get("web_url"): result += f" - [View]({job['web_url']})" result += "\n" result += "\n" # Show successful jobs (summary) if success_jobs: result += "### β Successful Jobs\n\n" for job in success_jobs: result += f"- β **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})" if job.get("duration"): duration_mins = int(job["duration"]) // 60 duration_secs = int(job["duration"]) % 60 result += f" - {duration_mins}m {duration_secs}s" result += "\n" result += "\n" # Show other jobs if other_jobs: result += "### β³ Other Jobs\n\n" for job in other_jobs: job_icon = get_pipeline_status_icon(job.get("status")) result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')}, " result += f"Status: {job.get('status', 'N/A')})\n" result += "\n" # Status explanation result += "## βΉοΈ Status Information\n" status_explanations = { "success": "β All jobs passed successfully", "failed": "β One or more jobs failed", "running": "π Pipeline is currently running", "pending": "β³ Pipeline is waiting to start", "canceled": "βΉοΈ Pipeline was canceled", "skipped": "βοΈ Pipeline was skipped", "manual": "π€ Waiting for manual action", "created": "π Pipeline was created but not started", "preparing": "π§ Pipeline is preparing to run", "waiting_for_resource": "βΈοΈ Waiting for available resources", "scheduled": "π Pipeline is scheduled to run", } explanation = status_explanations.get(pipeline_status, f"Unknown status: {pipeline_status}") result += f"{explanation}\n" return [TextContent(type="text", text=result)]
- main.py:101-121 (schema)Input schema definition for the tool: requires 'merge_request_iid' as integer >=1, no other properties.Tool( name="get_merge_request_pipeline", description=( "Get the last pipeline data for a specific merge " "request, including all jobs and their statuses. " "Returns job IDs that can be used with get_job_log " "to fetch detailed output." ), inputSchema={ "type": "object", "properties": { "merge_request_iid": { "type": "integer", "minimum": 1, "description": ("Internal ID of the merge request"), } }, "required": ["merge_request_iid"], "additionalProperties": False, }, ),
- main.py:312-315 (registration)Tool dispatch/registration in the server's call_tool handler: maps tool name to the handler function call with config params.elif name == "get_merge_request_pipeline": return await get_merge_request_pipeline( self.config["gitlab_url"], self.config["project_id"], self.config["access_token"], arguments )
- gitlab_api.py:16-25 (helper)Low-level API helper: makes HTTP GET to GitLab /merge_requests/{mr_iid}/pipelines?per_page=1 to fetch the latest pipeline for the MR.async def get_merge_request_pipeline(gitlab_url, project_id, access_token, mr_iid): """Get the latest pipeline for a merge request""" url = f"{gitlab_url}/api/v4/projects/{project_id}/" f"merge_requests/{mr_iid}/pipelines" headers = _headers(access_token) async with aiohttp.ClientSession() as session: params = {"per_page": 1} async with session.get(url, headers=headers, params=params) as response: data = await response.json() return (response.status, data[0] if data else None, await response.text())
- gitlab_api.py:27-35 (helper)Supporting helper called by handler: fetches all jobs for the pipeline (up to 100) to list in the output.async def get_pipeline_jobs(gitlab_url, project_id, access_token, pipeline_id): """Get all jobs for a specific pipeline""" url = f"{gitlab_url}/api/v4/projects/{project_id}/" f"pipelines/{pipeline_id}/jobs" headers = _headers(access_token) async with aiohttp.ClientSession() as session: params = {"per_page": 100} async with session.get(url, headers=headers, params=params) as response: return (response.status, await response.json(), await response.text())