Skip to main content
Glama

get_merge_request_pipeline

Retrieve the latest pipeline data for a GitLab merge request, including job statuses and IDs for accessing detailed logs.

Instructions

Get the last pipeline data for a specific merge request, including all jobs and their statuses. Returns job IDs that can be used with get_job_log to fetch detailed output.

Input Schema

NameRequiredDescriptionDefault
merge_request_iidYesInternal ID of the merge request

Input Schema (JSON Schema)

{ "properties": { "merge_request_iid": { "description": "Internal ID of the merge request", "minimum": 1, "type": "integer" } }, "required": [ "merge_request_iid" ], "type": "object" }

Implementation Reference

  • The primary MCP tool handler. Fetches the MR pipeline via GitLab API, retrieves all jobs, formats a comprehensive Markdown report including pipeline status, job lists grouped by status (failed first), timings, links, and tips for further actions.
    async def get_merge_request_pipeline(gitlab_url, project_id, access_token, args): """Get the last pipeline data for a merge request with all jobs""" logging.info(f"get_merge_request_pipeline called with args: {args}") mr_iid = args["merge_request_iid"] try: status, pipeline_data, error = await api_get_merge_request_pipeline( gitlab_url, project_id, access_token, mr_iid ) except Exception as e: logging.error(f"Error fetching pipeline: {e}") raise Exception(f"Error fetching merge request pipeline: {e}") if status != 200: logging.error(f"Error fetching pipeline: {status} - {error}") raise Exception(f"Error fetching merge request pipeline: {status} - {error}") if not pipeline_data: result = f"# πŸ”§ Pipeline for Merge Request !{mr_iid}\n\n" result += "ℹ️ No pipeline found for this merge request.\n\n" result += "This could mean:\n" result += "β€’ No CI/CD is configured for this project\n" result += "β€’ The pipeline hasn't been triggered yet\n" result += "β€’ The merge request branch has no commits\n" return [TextContent(type="text", text=result)] # Get jobs for the pipeline pipeline_id = pipeline_data.get("id") jobs_data = [] if pipeline_id: try: jobs_status, jobs_data, jobs_error = await get_pipeline_jobs( gitlab_url, project_id, access_token, pipeline_id ) if jobs_status != 200: logging.warning(f"Could not fetch jobs: {jobs_status} - {jobs_error}") jobs_data = [] except Exception as e: logging.warning(f"Error fetching jobs: {e}") jobs_data = [] # Format the pipeline data pipeline_status = pipeline_data.get("status", "unknown") pipeline_icon = get_pipeline_status_icon(pipeline_status) result = f"# {pipeline_icon} Pipeline for Merge Request !{mr_iid}\n\n" result += "## πŸ“Š Pipeline Overview\n" result += f"**πŸ†” Pipeline ID**: #{pipeline_data.get('id', 'N/A')}\n" result += f"**πŸ“Š Status**: {pipeline_icon} {pipeline_status}\n" result += f"**πŸ”— SHA**: `{pipeline_data.get('sha', 'N/A')[:8]}`\n" result += f"**🌿 Ref**: `{pipeline_data.get('ref', 'N/A')}`\n" if pipeline_data.get("source"): result += f"**πŸ“ Source**: {pipeline_data['source']}\n" if pipeline_data.get("created_at"): result += f"**πŸ“… Created**: {format_date(pipeline_data['created_at'])}\n" if pipeline_data.get("updated_at"): result += f"**πŸ”„ Updated**: {format_date(pipeline_data['updated_at'])}\n" if pipeline_data.get("started_at"): result += f"**▢️ Started**: {format_date(pipeline_data['started_at'])}\n" if pipeline_data.get("finished_at"): result += f"**⏹️ Finished**: {format_date(pipeline_data['finished_at'])}\n" # Duration if pipeline_data.get("duration"): duration_mins = pipeline_data["duration"] // 60 duration_secs = pipeline_data["duration"] % 60 result += f"**⏱️ Duration**: {duration_mins}m {duration_secs}s\n" if pipeline_data.get("queued_duration"): queued_mins = pipeline_data["queued_duration"] // 60 queued_secs = pipeline_data["queued_duration"] % 60 result += f"**⏳ Queued**: {queued_mins}m {queued_secs}s\n" result += "\n" # User info if pipeline_data.get("user"): user = pipeline_data["user"] result += "## πŸ‘€ Triggered By\n" result += f"**Name**: {user.get('name', 'N/A')}\n" result += f"**Username**: @{user.get('username', 'N/A')}\n" result += "\n" # Coverage if pipeline_data.get("coverage"): result += "## πŸ“ˆ Code Coverage\n" result += f"**Coverage**: {pipeline_data['coverage']}%\n" result += "\n" # Web URL if pipeline_data.get("web_url"): result += "## πŸ”— Actions\n" result += f"β€’ [View Pipeline Details]({pipeline_data['web_url']})\n" result += "\n" # Jobs information if jobs_data: result += "## πŸ”¨ Pipeline Jobs\n\n" # Group jobs by status failed_jobs = [j for j in jobs_data if j.get("status") == "failed"] success_jobs = [j for j in jobs_data if j.get("status") == "success"] running_jobs = [j for j in jobs_data if j.get("status") == "running"] other_jobs = [j for j in jobs_data if j.get("status") not in ["failed", "success", "running"]] result += f"**Total Jobs**: {len(jobs_data)}\n" result += f"**βœ… Success**: {len(success_jobs)} | " result += f"**❌ Failed**: {len(failed_jobs)} | " result += f"**πŸ”„ Running**: {len(running_jobs)} | " result += f"**⏳ Other**: {len(other_jobs)}\n\n" # Show failed jobs first if failed_jobs: result += "### ❌ Failed Jobs\n\n" for job in failed_jobs: job_icon = get_pipeline_status_icon(job.get("status")) result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})" if job.get("duration"): duration_mins = int(job["duration"]) // 60 duration_secs = int(job["duration"]) % 60 result += f" - {duration_mins}m {duration_secs}s" if job.get("web_url"): result += f" - [View]({job['web_url']})" result += "\n" result += "\n*πŸ’‘ Tip: Use `get_job_log` with a Job ID to see the full output*\n" result += "\n" # Show running jobs if running_jobs: result += "### πŸ”„ Running Jobs\n\n" for job in running_jobs: job_icon = get_pipeline_status_icon(job.get("status")) result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})" if job.get("web_url"): result += f" - [View]({job['web_url']})" result += "\n" result += "\n" # Show successful jobs (summary) if success_jobs: result += "### βœ… Successful Jobs\n\n" for job in success_jobs: result += f"- βœ… **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})" if job.get("duration"): duration_mins = int(job["duration"]) // 60 duration_secs = int(job["duration"]) % 60 result += f" - {duration_mins}m {duration_secs}s" result += "\n" result += "\n" # Show other jobs if other_jobs: result += "### ⏳ Other Jobs\n\n" for job in other_jobs: job_icon = get_pipeline_status_icon(job.get("status")) result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** " result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')}, " result += f"Status: {job.get('status', 'N/A')})\n" result += "\n" # Status explanation result += "## ℹ️ Status Information\n" status_explanations = { "success": "βœ… All jobs passed successfully", "failed": "❌ One or more jobs failed", "running": "πŸ”„ Pipeline is currently running", "pending": "⏳ Pipeline is waiting to start", "canceled": "⏹️ Pipeline was canceled", "skipped": "⏭️ Pipeline was skipped", "manual": "πŸ‘€ Waiting for manual action", "created": "πŸ“ Pipeline was created but not started", "preparing": "πŸ”§ Pipeline is preparing to run", "waiting_for_resource": "⏸️ Waiting for available resources", "scheduled": "πŸ“… Pipeline is scheduled to run", } explanation = status_explanations.get(pipeline_status, f"Unknown status: {pipeline_status}") result += f"{explanation}\n" return [TextContent(type="text", text=result)]
  • The input schema definition for the tool, specifying merge_request_iid as required integer parameter.
    Tool( name="get_merge_request_pipeline", description=( "Get the last pipeline data for a specific merge " "request, including all jobs and their statuses. " "Returns job IDs that can be used with get_job_log " "to fetch detailed output." ), inputSchema={ "type": "object", "properties": { "merge_request_iid": { "type": "integer", "minimum": 1, "description": ("Internal ID of the merge request"), } }, "required": ["merge_request_iid"], "additionalProperties": False, }, ),
  • main.py:311-314 (registration)
    Dispatch/registration in the MCP server's call_tool handler, mapping tool name to the handler function.
    elif name == "get_merge_request_pipeline": return await get_merge_request_pipeline( self.config["gitlab_url"], self.config["project_id"], self.config["access_token"], arguments )
  • Import and export of the handler function in the tools package.
    from .get_merge_request_pipeline import get_merge_request_pipeline from .get_merge_request_reviews import get_merge_request_reviews from .get_merge_request_test_report import get_merge_request_test_report from .get_pipeline_test_summary import get_pipeline_test_summary from .list_merge_requests import list_merge_requests from .reply_to_review_comment import create_review_comment, reply_to_review_comment, resolve_review_discussion __all__ = [ "list_merge_requests", "get_merge_request_reviews", "get_merge_request_details", "get_merge_request_pipeline",
  • Low-level API helper that queries GitLab API for the latest pipeline of a merge request.
    async def get_merge_request_pipeline(gitlab_url, project_id, access_token, mr_iid): """Get the latest pipeline for a merge request""" url = f"{gitlab_url}/api/v4/projects/{project_id}/" f"merge_requests/{mr_iid}/pipelines" headers = _headers(access_token) async with aiohttp.ClientSession() as session: params = {"per_page": 1} async with session.get(url, headers=headers, params=params) as response: data = await response.json() return (response.status, data[0] if data else None, await response.text())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/amirsina-mandegari/gitlab-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server