get_pipelines
Fetch CI/CD pipelines for a specific GitLab project, optionally filtered by status such as running, failed, or success.
Instructions
Get CI/CD pipelines for a GitLab project.
Args:
project_id: GitLab project ID
status: Pipeline status (running, pending, success, failed, canceled, skipped)
token: GitLab Personal Access Token (optional)
ctx: MCP context (automatically injected)Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| project_id | Yes | ||
| status | No | running | |
| token | No | ||
| ctx | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- The `get_pipelines` async function is the handler implementation decorated with @mcp.tool(). It takes a project_id, optional status filter (default 'running'), token, and ctx. It calls GitLab API endpoint /projects/{project_id}/pipelines?status={status}, formats up to 10 pipelines with ID, status, ref, and user name.
async def get_pipelines(project_id: int, status: str = "running", token: str = None, ctx=None) -> str: """Get CI/CD pipelines for a GitLab project. Args: project_id: GitLab project ID status: Pipeline status (running, pending, success, failed, canceled, skipped) token: GitLab Personal Access Token (optional) ctx: MCP context (automatically injected) """ endpoint = f"/projects/{project_id}/pipelines?status={status}" data = await make_gitlab_request(endpoint, ctx=ctx, token=token) if isinstance(data, dict) and "error" in data: return f"Error: {data['error']}" if not data: return f"No {status} pipelines found." pipelines = [] for pipeline in data[:10]: # Limit to 10 pipelines pipelines.append(f"Pipeline #{pipeline['id']}: {pipeline['status']} - {pipeline['ref']} ({pipeline.get('user', {}).get('name', 'Unknown')})") return "\n".join(pipelines) - gitlab_clone_mcp_server/server.py:195-196 (registration)The tool is registered via the @mcp.tool() decorator on line 195, which registers the Python function as an MCP tool named 'get_pipelines' in the FastMCP server instance.
@mcp.tool() async def get_pipelines(project_id: int, status: str = "running", token: str = None, ctx=None) -> str: