clone_group_repositories
Clone all repositories from a GitLab group by providing the group ID and optional base path.
Instructions
Clone all repositories from a GitLab group.
Args:
group_id: GitLab group ID
base_path: Base directory for cloned repos (default: ./repos)
token: GitLab Personal Access Token (optional)
ctx: MCP context (automatically injected)Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| group_id | Yes | ||
| base_path | No | ./repos | |
| token | No | ||
| ctx | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- The `clone_group_repositories` tool handler function. It fetches all projects from a GitLab group via the GitLab API, then clones each repository using `git clone` via subprocess. Returns a summary of successful and failed clones.
@mcp.tool() async def clone_group_repositories(group_id: int, base_path: str = "./repos", token: str = None, ctx=None) -> str: """Clone all repositories from a GitLab group. Args: group_id: GitLab group ID base_path: Base directory for cloned repos (default: ./repos) token: GitLab Personal Access Token (optional) ctx: MCP context (automatically injected) """ import subprocess import os as os_module # Get group projects projects_data = await make_gitlab_request(f"/groups/{group_id}/projects?per_page=100", ctx=ctx, token=token) if isinstance(projects_data, dict) and "error" in projects_data: return f"Error getting group projects: {projects_data['error']}" if not projects_data: return "No projects found in group" # Create base directory try: os_module.makedirs(base_path, exist_ok=True) except Exception as e: return f"Error creating base directory: {str(e)}" cloned = [] failed = [] # Use the provided token or fall back to environment variable clone_token = token or os_module.getenv("GITLAB_TOKEN") for project in projects_data: try: clone_url = project['http_url_to_repo'] if clone_token: clone_url = clone_url.replace("https://", f"https://gitlab-ci-token:{clone_token}@") local_path = f"{base_path}/{project['name']}" result = subprocess.run( ["git", "clone", clone_url, local_path], capture_output=True, text=True, timeout=300 ) if result.returncode == 0: cloned.append(project['name']) else: failed.append(f"{project['name']}: {result.stderr.strip()}") except Exception as e: failed.append(f"{project['name']}: {str(e)}") result_msg = f"Cloned {len(cloned)} repositories to {base_path}\n" if cloned: result_msg += f"\nSuccessful: {', '.join(cloned)}" if failed: result_msg += f"\nFailed: {'; '.join(failed)}" return result_msg - gitlab_clone_mcp_server/server.py:1032-1033 (registration)Registration of the tool via @mcp.tool() decorator on line 1032.
@mcp.tool() async def clone_group_repositories(group_id: int, base_path: str = "./repos", token: str = None, ctx=None) -> str: - The `make_gitlab_request` helper function that is used by `clone_group_repositories` to call the GitLab API for fetching group projects.
async def make_gitlab_request(endpoint: str, method: str = "GET", data: dict = None, ctx=None, token: str = None) -> dict[str, Any] | None: """Make a request to GitLab API with proper error handling.""" # Priority: 1. Explicit token parameter, 2. Context headers, 3. Environment variable # If no explicit token provided, try to get from context if not token and ctx and hasattr(ctx, 'request_context') and ctx.request_context: # Try to get from request headers if hasattr(ctx.request_context, 'headers'): token = ctx.request_context.headers.get('GITLAB_TOKEN') # Fallback to environment variable if not token: token = os.getenv("GITLAB_TOKEN") if not token: return {"error": "GitLab token not provided. Please provide a token parameter, GITLAB_TOKEN in the request headers, or set the environment variable."} # Get GitLab URL (from context or environment) gitlab_url = os.getenv("GITLAB_URL", "https://gitlab.com") headers = { "PRIVATE-TOKEN": token, "Content-Type": "application/json" } url = f"{gitlab_url}/api/v4{endpoint}" async with httpx.AsyncClient() as client: try: if method == "GET": response = await client.get(url, headers=headers, timeout=30.0) elif method == "POST": response = await client.post(url, headers=headers, json=data, timeout=30.0) elif method == "PUT": response = await client.put(url, headers=headers, json=data, timeout=30.0) elif method == "DELETE": response = await client.delete(url, headers=headers, timeout=30.0) response.raise_for_status() return response.json() if response.content else {"success": True} except Exception as e: return {"error": str(e)}