list_merge_requests
Retrieve merge requests from a GitLab project with filters for state, target branch, and result limits to manage code review workflows.
Instructions
List merge requests for the GitLab project
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| state | No | Filter by merge request state | opened |
| target_branch | No | Filter by target branch (optional) | |
| limit | No | Maximum number of results |
Implementation Reference
- tools/list_merge_requests.py:48-217 (handler)The core handler function implementing the tool: fetches merge requests via GitLab API, gathers enhanced data (pipelines, changes) in parallel, analyzes readiness/priority, and returns a comprehensive Markdown-formatted list with summaries, action items, and breakdowns.async def list_merge_requests(gitlab_url, project_id, access_token, args): logging.info(f"list_merge_requests called with args: {args}") state = args.get("state", "opened") target_branch = args.get("target_branch") limit = args.get("limit", 10) params = {"state": state, "per_page": limit, "order_by": "updated_at", "sort": "desc"} if target_branch: params["target_branch"] = target_branch status, data, error = await get_merge_requests(gitlab_url, project_id, access_token, params) if status != 200: logging.error(f"Error listing merge requests: {status} - {error}") raise Exception(f"Error listing merge requests: {status} - {error}") state_filter = f" ({state})" if state != "all" else "" result = f"# π Merge Requests{state_filter}\n" result += f"*Found {len(data)} merge request{'s' if len(data) != 1 else ''}*\n\n" if not data: result += "π No merge requests found.\n" if state == "opened": result += "π‘ **Tip**: Create a merge request to start the development workflow.\n" return [TextContent(type="text", text=result)] enhanced_data_tasks = [] for mr in data[:5]: task = get_enhanced_mr_data(gitlab_url, project_id, access_token, mr["iid"]) enhanced_data_tasks.append(task) try: enhanced_results = await asyncio.gather(*enhanced_data_tasks) except Exception as e: logging.warning(f"Error in parallel enhanced data fetch: {e}") enhanced_results = [(None, None)] * len(data[:5]) for i, mr in enumerate(data): if i < len(enhanced_results): pipeline_data, changes_data = enhanced_results[i] else: pipeline_data, changes_data = None, None if mr["state"] == "merged": state_icon = "β " elif mr["state"] == "opened": state_icon = "π" else: state_icon = "β" result += f"## {state_icon} !{mr['iid']}: {mr['title']}\n" author_name = mr["author"]["name"] author_username = mr["author"]["username"] result += f"**π€ Author**: {author_name} (@{author_username})\n" result += f"**π Status**: {mr['state']} ({get_state_explanation(mr['state'])})\n" priority = get_mr_priority(mr) readiness = analyze_mr_readiness(mr, pipeline_data) result += f"**π·οΈ Priority**: {priority}\n" result += f"**π¦ Merge Status**: {readiness}\n" result += f"**π Created**: {format_date(mr['created_at'])}\n" result += f"**π Updated**: {format_date(mr['updated_at'])}\n" source_branch = mr["source_branch"] target_branch = mr["target_branch"] result += f"**πΏ Branches**: `{source_branch}` β `{target_branch}`\n" if pipeline_data: pipeline_status = pipeline_data.get("status") pipeline_icon = get_pipeline_status_icon(pipeline_status) result += f"**π§ Pipeline**: {pipeline_icon} {pipeline_status}\n" if pipeline_data.get("web_url"): result += f" *[View Pipeline]({pipeline_data['web_url']})*\n" elif mr.get("pipeline"): pipeline_status = mr["pipeline"].get("status") pipeline_icon = get_pipeline_status_icon(pipeline_status) result += f"**π§ Pipeline**: {pipeline_icon} {pipeline_status or 'unknown'}\n" if changes_data: change_stats = calculate_change_stats(changes_data) result += f"**π Changes**: {change_stats}\n" if mr.get("labels"): labels_str = ", ".join(f"`{label}`" for label in mr["labels"]) result += f"**π·οΈ Labels**: {labels_str}\n" if mr.get("draft") or mr.get("work_in_progress"): result += "**β οΈ Status**: π§ Draft/Work in Progress\n" if mr.get("has_conflicts"): result += "**β οΈ Warning**: π₯ Has merge conflicts\n" result += f"**π Actions**: [View MR]({mr['web_url']})" if mr["state"] == "opened": result += f" | [Review]({mr['web_url']})" result += "\n\n" result += "## π Summary\n" state_counts = {} for mr in data: state = mr["state"] state_counts[state] = state_counts.get(state, 0) + 1 result += "**State Breakdown**:\n" for state, count in state_counts.items(): if state == "merged": icon = "β " elif state == "opened": icon = "π" else: icon = "β" result += f" β’ {icon} {state.title()}: {count}\n" priority_counts = {} for mr in data: priority = get_mr_priority(mr) priority_counts[priority] = priority_counts.get(priority, 0) + 1 if len(priority_counts) > 1: result += "\n**Priority Breakdown**:\n" for priority, count in priority_counts.items(): result += f" β’ {priority}: {count}\n" opened_mrs = [mr for mr in data if mr["state"] == "opened"] if opened_mrs: result += "\n**π― Action Items**:\n" has_conflicts = sum(1 for mr in opened_mrs if mr.get("has_conflicts")) drafts = sum(1 for mr in opened_mrs if mr.get("draft") or mr.get("work_in_progress")) failed_pipelines = 0 for i, _mr in enumerate(opened_mrs): if i < len(enhanced_results): pipeline_data, _ = enhanced_results[i] if pipeline_data and pipeline_data.get("status") == "failed": failed_pipelines += 1 if has_conflicts: result += f" β’ π₯ {has_conflicts} MR{'s' if has_conflicts > 1 else ''} with merge conflicts\n" if drafts: result += f" β’ π§ {drafts} draft MR{'s' if drafts > 1 else ''} in progress\n" if failed_pipelines: result += f" β’ β {failed_pipelines} MR{'s' if failed_pipelines > 1 else ''} with failed pipelines\n" ready_count = len(opened_mrs) - has_conflicts - drafts - failed_pipelines if ready_count > 0: result += f" β’ β {ready_count} MR{'s' if ready_count > 1 else ''} ready for review\n" result += "\n**π Next Steps**:\n" if has_conflicts: result += " β’ π§ Resolve merge conflicts to unblock development\n" if failed_pipelines: result += " β’ π§ Fix failing pipelines to ensure quality\n" if ready_count > 0: result += " β’ π Review and approve ready merge requests\n" else: result += "\n**π― Action Items**:\n" if state == "opened": result += " β’ π No open merge requests - ready for new features!\n" else: result += " β’ π Consider filtering by 'opened' state to see active work\n" return [TextContent(type="text", text=result)]
- main.py:46-68 (schema)The input schema definition for the list_merge_requests tool, specifying parameters: state (enum), target_branch (optional string), limit (1-100, default 10). Defined within the list_tools() handler.name="list_merge_requests", description="List merge requests for the GitLab project", inputSchema={ "type": "object", "properties": { "state": { "type": "string", "enum": ["opened", "closed", "merged", "all"], "default": "opened", "description": "Filter by merge request state", }, "target_branch": {"type": "string", "description": ("Filter by target branch (optional)")}, "limit": { "type": "integer", "default": 10, "minimum": 1, "maximum": 100, "description": "Maximum number of results", }, }, "additionalProperties": False, }, ),
- main.py:300-303 (registration)Registration/dispatch logic in the call_tool handler: matches tool name and invokes the list_merge_requests function with config and arguments.if name == "list_merge_requests": return await list_merge_requests( self.config["gitlab_url"], self.config["project_id"], self.config["access_token"], arguments )
- tools/__init__.py:16-20 (registration)Import and export registration of the list_merge_requests function in the tools package __init__.from .list_merge_requests import list_merge_requests from .reply_to_review_comment import create_review_comment, reply_to_review_comment, resolve_review_discussion __all__ = [ "list_merge_requests",
- tools/list_merge_requests.py:17-45 (helper)Helper function to fetch enhanced MR data (pipeline status and changes) in parallel for the top 5 MRs.async def get_enhanced_mr_data(gitlab_url, project_id, access_token, mr_iid): """Get enhanced data for a single MR using parallel API calls""" try: pipeline_task = get_merge_request_pipeline(gitlab_url, project_id, access_token, mr_iid) changes_task = get_merge_request_changes(gitlab_url, project_id, access_token, mr_iid) pipeline_result, changes_result = await asyncio.gather(pipeline_task, changes_task, return_exceptions=True) if isinstance(pipeline_result, Exception): pipeline_data = None logging.warning(f"Pipeline fetch failed for MR {mr_iid}: {pipeline_result}") else: pipeline_status, pipeline_data, _ = pipeline_result if pipeline_status != 200: pipeline_data = None if isinstance(changes_result, Exception): changes_data = None logging.warning(f"Changes fetch failed for MR {mr_iid}: {changes_result}") else: changes_status, changes_data, _ = changes_result if changes_status != 200: changes_data = None return pipeline_data, changes_data except Exception as e: logging.warning(f"Error fetching enhanced data for MR {mr_iid}: {e}") return None, None