Skip to main content
Glama

get_merge_request_pipeline

Retrieve the latest pipeline data for a GitLab merge request, including job statuses and IDs for accessing detailed logs.

Instructions

Get the last pipeline data for a specific merge request, including all jobs and their statuses. Returns job IDs that can be used with get_job_log to fetch detailed output.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
merge_request_iidYesInternal ID of the merge request

Implementation Reference

  • Primary tool handler: extracts MR IID from args, calls GitLab API for pipeline and jobs, handles errors, and generates detailed Markdown output with pipeline overview, job lists grouped by status (failed first), timings, links, and status explanations.
    async def get_merge_request_pipeline(gitlab_url, project_id, access_token, args):
        """Get the last pipeline data for a merge request with all jobs"""
        logging.info(f"get_merge_request_pipeline called with args: {args}")
        mr_iid = args["merge_request_iid"]
    
        try:
            status, pipeline_data, error = await api_get_merge_request_pipeline(
                gitlab_url, project_id, access_token, mr_iid
            )
        except Exception as e:
            logging.error(f"Error fetching pipeline: {e}")
            raise Exception(f"Error fetching merge request pipeline: {e}")
    
        if status != 200:
            logging.error(f"Error fetching pipeline: {status} - {error}")
            raise Exception(f"Error fetching merge request pipeline: {status} - {error}")
    
        if not pipeline_data:
            result = f"# πŸ”§ Pipeline for Merge Request !{mr_iid}\n\n"
            result += "ℹ️ No pipeline found for this merge request.\n\n"
            result += "This could mean:\n"
            result += "β€’ No CI/CD is configured for this project\n"
            result += "β€’ The pipeline hasn't been triggered yet\n"
            result += "β€’ The merge request branch has no commits\n"
            return [TextContent(type="text", text=result)]
    
        # Get jobs for the pipeline
        pipeline_id = pipeline_data.get("id")
        jobs_data = []
        if pipeline_id:
            try:
                jobs_status, jobs_data, jobs_error = await get_pipeline_jobs(
                    gitlab_url, project_id, access_token, pipeline_id
                )
                if jobs_status != 200:
                    logging.warning(f"Could not fetch jobs: {jobs_status} - {jobs_error}")
                    jobs_data = []
            except Exception as e:
                logging.warning(f"Error fetching jobs: {e}")
                jobs_data = []
    
        # Format the pipeline data
        pipeline_status = pipeline_data.get("status", "unknown")
        pipeline_icon = get_pipeline_status_icon(pipeline_status)
    
        result = f"# {pipeline_icon} Pipeline for Merge Request !{mr_iid}\n\n"
    
        result += "## πŸ“Š Pipeline Overview\n"
        result += f"**πŸ†” Pipeline ID**: #{pipeline_data.get('id', 'N/A')}\n"
        result += f"**πŸ“Š Status**: {pipeline_icon} {pipeline_status}\n"
        result += f"**πŸ”— SHA**: `{pipeline_data.get('sha', 'N/A')[:8]}`\n"
        result += f"**🌿 Ref**: `{pipeline_data.get('ref', 'N/A')}`\n"
    
        if pipeline_data.get("source"):
            result += f"**πŸ“ Source**: {pipeline_data['source']}\n"
    
        if pipeline_data.get("created_at"):
            result += f"**πŸ“… Created**: {format_date(pipeline_data['created_at'])}\n"
    
        if pipeline_data.get("updated_at"):
            result += f"**πŸ”„ Updated**: {format_date(pipeline_data['updated_at'])}\n"
    
        if pipeline_data.get("started_at"):
            result += f"**▢️ Started**: {format_date(pipeline_data['started_at'])}\n"
    
        if pipeline_data.get("finished_at"):
            result += f"**⏹️ Finished**: {format_date(pipeline_data['finished_at'])}\n"
    
        # Duration
        if pipeline_data.get("duration"):
            duration_mins = pipeline_data["duration"] // 60
            duration_secs = pipeline_data["duration"] % 60
            result += f"**⏱️ Duration**: {duration_mins}m {duration_secs}s\n"
    
        if pipeline_data.get("queued_duration"):
            queued_mins = pipeline_data["queued_duration"] // 60
            queued_secs = pipeline_data["queued_duration"] % 60
            result += f"**⏳ Queued**: {queued_mins}m {queued_secs}s\n"
    
        result += "\n"
    
        # User info
        if pipeline_data.get("user"):
            user = pipeline_data["user"]
            result += "## πŸ‘€ Triggered By\n"
            result += f"**Name**: {user.get('name', 'N/A')}\n"
            result += f"**Username**: @{user.get('username', 'N/A')}\n"
            result += "\n"
    
        # Coverage
        if pipeline_data.get("coverage"):
            result += "## πŸ“ˆ Code Coverage\n"
            result += f"**Coverage**: {pipeline_data['coverage']}%\n"
            result += "\n"
    
        # Web URL
        if pipeline_data.get("web_url"):
            result += "## πŸ”— Actions\n"
            result += f"β€’ [View Pipeline Details]({pipeline_data['web_url']})\n"
            result += "\n"
    
        # Jobs information
        if jobs_data:
            result += "## πŸ”¨ Pipeline Jobs\n\n"
    
            # Group jobs by status
            failed_jobs = [j for j in jobs_data if j.get("status") == "failed"]
            success_jobs = [j for j in jobs_data if j.get("status") == "success"]
            running_jobs = [j for j in jobs_data if j.get("status") == "running"]
            other_jobs = [j for j in jobs_data if j.get("status") not in ["failed", "success", "running"]]
    
            result += f"**Total Jobs**: {len(jobs_data)}\n"
            result += f"**βœ… Success**: {len(success_jobs)} | "
            result += f"**❌ Failed**: {len(failed_jobs)} | "
            result += f"**πŸ”„ Running**: {len(running_jobs)} | "
            result += f"**⏳ Other**: {len(other_jobs)}\n\n"
    
            # Show failed jobs first
            if failed_jobs:
                result += "### ❌ Failed Jobs\n\n"
                for job in failed_jobs:
                    job_icon = get_pipeline_status_icon(job.get("status"))
                    result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** "
                    result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})"
    
                    if job.get("duration"):
                        duration_mins = int(job["duration"]) // 60
                        duration_secs = int(job["duration"]) % 60
                        result += f" - {duration_mins}m {duration_secs}s"
    
                    if job.get("web_url"):
                        result += f" - [View]({job['web_url']})"
    
                    result += "\n"
    
                result += "\n*πŸ’‘ Tip: Use `get_job_log` with a Job ID to see the full output*\n"
                result += "\n"
    
            # Show running jobs
            if running_jobs:
                result += "### πŸ”„ Running Jobs\n\n"
                for job in running_jobs:
                    job_icon = get_pipeline_status_icon(job.get("status"))
                    result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** "
                    result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})"
                    if job.get("web_url"):
                        result += f" - [View]({job['web_url']})"
                    result += "\n"
                result += "\n"
    
            # Show successful jobs (summary)
            if success_jobs:
                result += "### βœ… Successful Jobs\n\n"
                for job in success_jobs:
                    result += f"- βœ… **{job.get('name', 'Unknown Job')}** "
                    result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')})"
                    if job.get("duration"):
                        duration_mins = int(job["duration"]) // 60
                        duration_secs = int(job["duration"]) % 60
                        result += f" - {duration_mins}m {duration_secs}s"
                    result += "\n"
                result += "\n"
    
            # Show other jobs
            if other_jobs:
                result += "### ⏳ Other Jobs\n\n"
                for job in other_jobs:
                    job_icon = get_pipeline_status_icon(job.get("status"))
                    result += f"- {job_icon} **{job.get('name', 'Unknown Job')}** "
                    result += f"(Job ID: `{job.get('id')}`, Stage: {job.get('stage', 'N/A')}, "
                    result += f"Status: {job.get('status', 'N/A')})\n"
                result += "\n"
    
        # Status explanation
        result += "## ℹ️ Status Information\n"
        status_explanations = {
            "success": "βœ… All jobs passed successfully",
            "failed": "❌ One or more jobs failed",
            "running": "πŸ”„ Pipeline is currently running",
            "pending": "⏳ Pipeline is waiting to start",
            "canceled": "⏹️ Pipeline was canceled",
            "skipped": "⏭️ Pipeline was skipped",
            "manual": "πŸ‘€ Waiting for manual action",
            "created": "πŸ“ Pipeline was created but not started",
            "preparing": "πŸ”§ Pipeline is preparing to run",
            "waiting_for_resource": "⏸️ Waiting for available resources",
            "scheduled": "πŸ“… Pipeline is scheduled to run",
        }
    
        explanation = status_explanations.get(pipeline_status, f"Unknown status: {pipeline_status}")
        result += f"{explanation}\n"
    
        return [TextContent(type="text", text=result)]
  • Input schema definition for the tool: requires 'merge_request_iid' as integer >=1, no other properties.
    Tool(
        name="get_merge_request_pipeline",
        description=(
            "Get the last pipeline data for a specific merge "
            "request, including all jobs and their statuses. "
            "Returns job IDs that can be used with get_job_log "
            "to fetch detailed output."
        ),
        inputSchema={
            "type": "object",
            "properties": {
                "merge_request_iid": {
                    "type": "integer",
                    "minimum": 1,
                    "description": ("Internal ID of the merge request"),
                }
            },
            "required": ["merge_request_iid"],
            "additionalProperties": False,
        },
    ),
  • main.py:312-315 (registration)
    Tool dispatch/registration in the server's call_tool handler: maps tool name to the handler function call with config params.
    elif name == "get_merge_request_pipeline":
        return await get_merge_request_pipeline(
            self.config["gitlab_url"], self.config["project_id"], self.config["access_token"], arguments
        )
  • Low-level API helper: makes HTTP GET to GitLab /merge_requests/{mr_iid}/pipelines?per_page=1 to fetch the latest pipeline for the MR.
    async def get_merge_request_pipeline(gitlab_url, project_id, access_token, mr_iid):
        """Get the latest pipeline for a merge request"""
        url = f"{gitlab_url}/api/v4/projects/{project_id}/" f"merge_requests/{mr_iid}/pipelines"
        headers = _headers(access_token)
        async with aiohttp.ClientSession() as session:
            params = {"per_page": 1}
            async with session.get(url, headers=headers, params=params) as response:
                data = await response.json()
                return (response.status, data[0] if data else None, await response.text())
  • Supporting helper called by handler: fetches all jobs for the pipeline (up to 100) to list in the output.
    async def get_pipeline_jobs(gitlab_url, project_id, access_token, pipeline_id):
        """Get all jobs for a specific pipeline"""
        url = f"{gitlab_url}/api/v4/projects/{project_id}/" f"pipelines/{pipeline_id}/jobs"
        headers = _headers(access_token)
        async with aiohttp.ClientSession() as session:
            params = {"per_page": 100}
            async with session.get(url, headers=headers, params=params) as response:
                return (response.status, await response.json(), await response.text())
Behavior3/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden. It discloses that the tool returns pipeline data with job statuses and IDs, which is useful behavioral context. However, it doesn't mention potential limitations like error handling, rate limits, authentication needs, or whether it's a read-only operation, leaving gaps for a tool with no annotation coverage.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is efficiently structured in two sentences: the first states the purpose and scope, the second explains the utility of returned data. Every sentence adds value without redundancy, making it appropriately sized and front-loaded.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given no annotations and no output schema, the description partially compensates by explaining the return data includes jobs and statuses with IDs usable for get_job_log. However, for a tool that fetches pipeline data, it lacks details on response format, error cases, or data freshness, leaving room for improvement in completeness.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 100%, so the schema already fully documents the merge_request_iid parameter. The description doesn't add any parameter-specific details beyond what the schema provides, such as format examples or constraints. Baseline 3 is appropriate when the schema does the heavy lifting.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the specific action ('Get the last pipeline data'), target resource ('for a specific merge request'), and scope ('including all jobs and their statuses'). It distinguishes from siblings like get_merge_request_details or get_pipeline_test_summary by focusing on pipeline data rather than general MR info or test summaries.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines4/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides clear context by specifying it retrieves 'the last pipeline data' and mentions job IDs can be used with get_job_log, implicitly suggesting when to use this tool versus get_job_log. However, it doesn't explicitly state when not to use it or compare with alternatives like get_merge_request_test_report or get_pipeline_test_summary.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/amirsina-mandegari/gitlab-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server