Skip to main content
Glama
bitbucket_client.py49.7 kB
"""Bitbucket API client for MCP server. Provides all Bitbucket API operations needed by the MCP tools: - Repositories: get, create, delete, list, update - Pull Requests: create, get, list, merge, approve, decline, comments, diff - Pipelines: trigger, get, list, logs, stop - Branches: list, get - Commits: list, get, compare, statuses - Deployments: environments, deployment history - Webhooks: list, create, get, delete """ from __future__ import annotations import time from typing import Any, Optional import httpx from src.settings import get_settings from src.utils import ensure_uuid_braces class BitbucketError(Exception): """Exception for Bitbucket API errors.""" pass class BitbucketClient: """Client for Bitbucket API operations. Uses connection pooling for better performance when making multiple requests. Includes automatic retry with exponential backoff for rate-limited requests (HTTP 429). Configuration via environment variables: API_TIMEOUT: Request timeout in seconds (default: 30, max: 300) MAX_RETRIES: Max retry attempts for rate limiting (default: 3, max: 10) """ BASE_URL = "https://api.bitbucket.org/2.0" INITIAL_BACKOFF = 1.0 # seconds def __init__( self, workspace: Optional[str] = None, email: Optional[str] = None, api_token: Optional[str] = None, timeout: Optional[int] = None, max_retries: Optional[int] = None, ): """Initialize Bitbucket client. Args: workspace: Bitbucket workspace (default from settings) email: Bitbucket email for auth (default from settings) api_token: Bitbucket access token (default from settings) timeout: Request timeout in seconds (default from settings) max_retries: Max retry attempts for rate limiting (default from settings) """ settings = get_settings() self.workspace = workspace or settings.bitbucket_workspace self.email = email or settings.bitbucket_email # Handle SecretStr - get the secret value if it's a SecretStr token = api_token or settings.bitbucket_api_token self.api_token = token.get_secret_value() if hasattr(token, "get_secret_value") else token # Configurable timeout and retries self.timeout = timeout if timeout is not None else settings.api_timeout self.max_retries = max_retries if max_retries is not None else settings.max_retries # Connection pooling - reuse HTTP client for multiple requests self._client: Optional[httpx.Client] = None def _get_http_client(self) -> httpx.Client: """Get or create the HTTP client with connection pooling.""" if self._client is None: self._client = httpx.Client( timeout=self.timeout, auth=(self.email, self.api_token), follow_redirects=True, ) return self._client def close(self) -> None: """Close the HTTP client and release connections.""" if self._client is not None: self._client.close() self._client = None def __enter__(self) -> "BitbucketClient": """Context manager entry.""" return self def __exit__(self, *args) -> None: """Context manager exit - close client.""" self.close() def _get_auth(self) -> tuple[str, str]: """Get auth tuple for Basic Auth requests.""" return (self.email, self.api_token) def _url(self, path: str) -> str: """Build full API URL.""" return f"{self.BASE_URL}/{path.lstrip('/')}" def _repo_path(self, repo_slug: str, *parts: str) -> str: """Build repository endpoint path. Args: repo_slug: Repository slug *parts: Additional path segments Returns: Full path like "repositories/workspace/repo/pullrequests/123" """ base = f"repositories/{self.workspace}/{repo_slug}" return "/".join([base] + list(parts)) if parts else base def _request( self, method: str, path: str, json: Optional[dict] = None, params: Optional[dict] = None, timeout: Optional[int] = None, ) -> Optional[dict]: """Make an API request using connection pooling with retry logic. Automatically retries on rate limiting (HTTP 429) with exponential backoff. Args: method: HTTP method path: API path (without base URL) json: Request body params: Query parameters timeout: Request timeout in seconds (uses default if not specified) Returns: Response JSON or None for 204/404 """ client = self._get_http_client() backoff = self.INITIAL_BACKOFF for attempt in range(self.max_retries + 1): r = client.request( method, self._url(path), json=json, params=params, headers={"Content-Type": "application/json"} if json else None, timeout=timeout or self.timeout, ) if r.status_code == 404: return None if r.status_code in (200, 201, 202): return r.json() if r.content else {} if r.status_code == 204: return {} # Rate limiting - retry with exponential backoff if r.status_code == 429: if attempt < self.max_retries: # Check Retry-After header for server-suggested wait time retry_after = r.headers.get("Retry-After") if retry_after: try: wait_time = float(retry_after) except ValueError: wait_time = backoff else: wait_time = backoff time.sleep(wait_time) backoff *= 2 # Exponential backoff continue else: raise BitbucketError( f"Rate limited after {self.max_retries} retries. " f"Method: {method} {path}" ) # Truncate error response to avoid huge exception messages error_text = r.text[:500] if len(r.text) > 500 else r.text raise BitbucketError( f"API error {r.status_code}: {error_text}\n" f"Method: {method} {path}" ) # Should not reach here, but satisfy type checker raise BitbucketError(f"Unexpected error in request: {method} {path}") def _paginated_list( self, endpoint: str, limit: int = 50, max_page: int = 100, **extra_params, ) -> list[dict[str, Any]]: """Helper for paginated list endpoints. Args: endpoint: API endpoint path limit: Maximum results to return max_page: Maximum page size (API limit) **extra_params: Additional query parameters Returns: List of result dicts from 'values' key """ params = {"pagelen": min(limit, max_page)} params.update({k: v for k, v in extra_params.items() if v is not None}) result = self._request("GET", endpoint, params=params) return result.get("values", []) if result else [] def _request_text( self, path: str, timeout: Optional[int] = None, ) -> Optional[str]: """Make an API request that returns plain text using connection pooling. Used for endpoints that return text content like logs, diffs, and files. Follows redirects automatically. Includes retry logic for rate limiting. Args: path: API path (without base URL) timeout: Request timeout in seconds (uses default if not specified) Returns: Response text or None for 404 """ client = self._get_http_client() backoff = self.INITIAL_BACKOFF for attempt in range(self.max_retries + 1): r = client.get( self._url(path), timeout=timeout or self.timeout, ) if r.status_code == 200: return r.text elif r.status_code == 404: return None elif r.status_code == 429: if attempt < self.max_retries: retry_after = r.headers.get("Retry-After") wait_time = float(retry_after) if retry_after else backoff time.sleep(wait_time) backoff *= 2 continue else: raise BitbucketError(f"Rate limited after {self.max_retries} retries") else: raise BitbucketError(f"Request failed: {r.status_code}") raise BitbucketError(f"Unexpected error in request: GET {path}") def _require_result( self, result: Optional[dict[str, Any]], action: str, identifier: str = "", ) -> dict[str, Any]: """Validate that a result is not None/empty. Args: result: API response to validate action: Action description for error message (e.g., "create repository") identifier: Optional identifier for error message (e.g., repo name) Returns: The result if valid Raises: BitbucketError: If result is None or empty """ if not result: msg = f"Failed to {action}" if identifier: msg += f": {identifier}" raise BitbucketError(msg) return result # ==================== REPOSITORIES ==================== def get_repository(self, repo_slug: str) -> Optional[dict[str, Any]]: """Get repository information. Args: repo_slug: Repository slug Returns: Repository info or None if not found """ return self._request("GET", self._repo_path(repo_slug)) def create_repository( self, repo_slug: str, project_key: Optional[str] = None, is_private: bool = True, description: str = "", ) -> dict[str, Any]: """Create a new repository. Args: repo_slug: Repository slug project_key: Project key to create repo under is_private: Whether repo is private (default: True) description: Repository description Returns: Created repository info """ payload = { "scm": "git", "is_private": is_private, } if project_key: payload["project"] = {"key": project_key} if description: payload["description"] = description result = self._request("POST", self._repo_path(repo_slug), json=payload) return self._require_result(result, "create repository", repo_slug) def delete_repository(self, repo_slug: str) -> bool: """Delete a repository. Args: repo_slug: Repository slug Returns: True if deleted successfully """ self._request("DELETE", self._repo_path(repo_slug)) return True def list_repositories( self, project_key: Optional[str] = None, query: Optional[str] = None, limit: int = 50, ) -> list[dict[str, Any]]: """List repositories in workspace. Args: project_key: Filter by project (optional) query: Search query using Bitbucket query syntax (optional) Examples: - name ~ "anzsic" (partial name match) - name = "exact-name" (exact name match) - description ~ "api" (search in description) - is_private = true (filter by visibility) limit: Maximum results to return Returns: List of repository info dicts """ params = {"pagelen": min(limit, 100)} # Build query string q_parts = [] if project_key: q_parts.append(f'project.key="{project_key}"') if query: q_parts.append(query) if q_parts: params["q"] = " AND ".join(q_parts) result = self._request( "GET", f"repositories/{self.workspace}", params=params, ) return result.get("values", []) if result else [] # ==================== PULL REQUESTS ==================== def create_pull_request( self, repo_slug: str, title: str, source_branch: str, destination_branch: str = "main", description: str = "", close_source_branch: bool = True, reviewers: Optional[list[str]] = None, ) -> dict[str, Any]: """Create a pull request. Args: repo_slug: Repository slug title: PR title source_branch: Source branch name destination_branch: Target branch (default: main) description: PR description body close_source_branch: Delete branch after merge reviewers: List of reviewer account IDs (optional) Returns: Dict with PR info including 'id', 'links', 'state' """ payload = { "title": title, "source": {"branch": {"name": source_branch}}, "destination": {"branch": {"name": destination_branch}}, "close_source_branch": close_source_branch, } if description: payload["description"] = description if reviewers: # Handle both UUID format and account_id format payload["reviewers"] = [ {"uuid": r} if r.startswith("{") else {"account_id": r} for r in reviewers ] result = self._request( "POST", self._repo_path(repo_slug, "pullrequests"), json=payload, ) return self._require_result( result, "create PR", f"{source_branch} -> {destination_branch}" ) def get_pull_request( self, repo_slug: str, pr_id: int ) -> Optional[dict[str, Any]]: """Get pull request by ID. Args: repo_slug: Repository slug pr_id: Pull request ID Returns: PR info or None if not found """ return self._request( "GET", self._repo_path(repo_slug, "pullrequests", str(pr_id)), ) def list_pull_requests( self, repo_slug: str, state: str = "OPEN", limit: int = 50, ) -> list[dict[str, Any]]: """List pull requests for a repository. Args: repo_slug: Repository slug state: PR state filter (OPEN, MERGED, DECLINED, SUPERSEDED) limit: Maximum results to return Returns: List of PR info dicts """ return self._paginated_list( self._repo_path(repo_slug, "pullrequests"), limit=limit, max_page=50, state=state, ) def merge_pull_request( self, repo_slug: str, pr_id: int, merge_strategy: str = "merge_commit", close_source_branch: bool = True, message: Optional[str] = None, ) -> dict[str, Any]: """Merge a pull request. Args: repo_slug: Repository slug pr_id: Pull request ID merge_strategy: One of 'merge_commit', 'squash', 'fast_forward' close_source_branch: Delete source branch after merge message: Optional merge commit message Returns: Merged PR info """ payload = { "type": merge_strategy, "close_source_branch": close_source_branch, } if message: payload["message"] = message result = self._request( "POST", self._repo_path(repo_slug, "pullrequests", str(pr_id), "merge"), json=payload, ) return self._require_result(result, "merge PR", f"#{pr_id}") # ==================== PIPELINES ==================== def trigger_pipeline( self, repo_slug: str, branch: str = "main", variables: Optional[dict[str, str]] = None, ) -> dict[str, Any]: """Trigger a pipeline run. Args: repo_slug: Repository slug branch: Branch to run pipeline on (default: main) variables: Custom pipeline variables Returns: Pipeline run info including 'uuid', 'state' """ payload = { "target": { "ref_type": "branch", "type": "pipeline_ref_target", "ref_name": branch, } } if variables: payload["variables"] = [ {"key": k, "value": v} for k, v in variables.items() ] result = self._request( "POST", self._repo_path(repo_slug, "pipelines") + "/", json=payload, ) return self._require_result(result, "trigger pipeline on", branch) def get_pipeline( self, repo_slug: str, pipeline_uuid: str ) -> Optional[dict[str, Any]]: """Get pipeline run status. Args: repo_slug: Repository slug pipeline_uuid: Pipeline UUID (with or without braces) Returns: Pipeline info or None if not found """ pipeline_uuid = ensure_uuid_braces(pipeline_uuid) return self._request( "GET", self._repo_path(repo_slug, "pipelines", pipeline_uuid), ) def list_pipelines( self, repo_slug: str, limit: int = 10, ) -> list[dict[str, Any]]: """List recent pipeline runs. Args: repo_slug: Repository slug limit: Maximum results to return Returns: List of pipeline info dicts """ return self._paginated_list( self._repo_path(repo_slug, "pipelines") + "/", limit=limit, sort="-created_on", ) def get_pipeline_steps( self, repo_slug: str, pipeline_uuid: str ) -> list[dict[str, Any]]: """Get pipeline steps. Args: repo_slug: Repository slug pipeline_uuid: Pipeline UUID Returns: List of step info dicts """ pipeline_uuid = ensure_uuid_braces(pipeline_uuid) return self._paginated_list( self._repo_path(repo_slug, "pipelines", pipeline_uuid, "steps") + "/", ) def get_pipeline_logs( self, repo_slug: str, pipeline_uuid: str, step_uuid: str, ) -> str: """Get logs for a pipeline step. Args: repo_slug: Repository slug pipeline_uuid: Pipeline UUID step_uuid: Step UUID Returns: Log content as string """ pipeline_uuid = ensure_uuid_braces(pipeline_uuid) step_uuid = ensure_uuid_braces(step_uuid) path = self._repo_path( repo_slug, "pipelines", pipeline_uuid, "steps", step_uuid, "log" ) return self._request_text(path) or "" def stop_pipeline( self, repo_slug: str, pipeline_uuid: str ) -> dict[str, Any]: """Stop a running pipeline. Args: repo_slug: Repository slug pipeline_uuid: Pipeline UUID Returns: Updated pipeline info """ pipeline_uuid = ensure_uuid_braces(pipeline_uuid) result = self._request( "POST", self._repo_path(repo_slug, "pipelines", pipeline_uuid, "stopPipeline"), ) # 204 returns {} which is a success if result is None: raise BitbucketError(f"Failed to stop pipeline {pipeline_uuid}") # Return updated pipeline state return self.get_pipeline(repo_slug, pipeline_uuid) or {"stopped": True} # ==================== PIPELINE VARIABLES ==================== def list_pipeline_variables( self, repo_slug: str, limit: int = 50, ) -> list[dict[str, Any]]: """List pipeline variables for a repository. Args: repo_slug: Repository slug limit: Maximum results to return Returns: List of pipeline variable info dicts """ return self._paginated_list( self._repo_path(repo_slug, "pipelines_config", "variables"), limit=limit, ) def get_pipeline_variable( self, repo_slug: str, variable_uuid: str ) -> Optional[dict[str, Any]]: """Get a specific pipeline variable. Args: repo_slug: Repository slug variable_uuid: Variable UUID Returns: Variable info or None if not found """ variable_uuid = ensure_uuid_braces(variable_uuid) return self._request( "GET", self._repo_path(repo_slug, "pipelines_config", "variables", variable_uuid), ) def create_pipeline_variable( self, repo_slug: str, key: str, value: str, secured: bool = False, ) -> dict[str, Any]: """Create a pipeline variable. Args: repo_slug: Repository slug key: Variable name (e.g., "PYPI_TOKEN") value: Variable value secured: Whether to encrypt the value (default: False) Returns: Created variable info """ payload = {"key": key, "value": value, "secured": secured} result = self._request( "POST", self._repo_path(repo_slug, "pipelines_config", "variables") + "/", json=payload, ) return self._require_result(result, "create pipeline variable") def update_pipeline_variable( self, repo_slug: str, variable_uuid: str, value: str, ) -> dict[str, Any]: """Update a pipeline variable's value. Args: repo_slug: Repository slug variable_uuid: Variable UUID value: New variable value Returns: Updated variable info """ variable_uuid = ensure_uuid_braces(variable_uuid) result = self._request( "PUT", self._repo_path(repo_slug, "pipelines_config", "variables", variable_uuid), json={"value": value}, ) return self._require_result(result, "update pipeline variable") def delete_pipeline_variable( self, repo_slug: str, variable_uuid: str ) -> bool: """Delete a pipeline variable. Args: repo_slug: Repository slug variable_uuid: Variable UUID Returns: True if deleted successfully """ variable_uuid = ensure_uuid_braces(variable_uuid) self._request( "DELETE", self._repo_path(repo_slug, "pipelines_config", "variables", variable_uuid), ) return True # ==================== PROJECTS ==================== def list_projects( self, limit: int = 50, ) -> list[dict[str, Any]]: """List projects in workspace. Args: limit: Maximum results to return Returns: List of project info dicts """ return self._paginated_list( f"workspaces/{self.workspace}/projects", limit=limit, ) def get_project(self, project_key: str) -> Optional[dict[str, Any]]: """Get project information. Args: project_key: Project key (e.g., "DS") Returns: Project info or None if not found """ return self._request( "GET", f"workspaces/{self.workspace}/projects/{project_key}", ) # ==================== REPOSITORY UPDATE ==================== def update_repository( self, repo_slug: str, project_key: Optional[str] = None, is_private: Optional[bool] = None, description: Optional[str] = None, name: Optional[str] = None, ) -> dict[str, Any]: """Update repository settings. Args: repo_slug: Repository slug project_key: Move to different project (optional) is_private: Change visibility (optional) description: Update description (optional) name: Rename repository (optional) Returns: Updated repository info """ payload = {} if project_key is not None: payload["project"] = {"key": project_key} if is_private is not None: payload["is_private"] = is_private if description is not None: payload["description"] = description if name is not None: payload["name"] = name if not payload: raise BitbucketError("No fields to update") result = self._request("PUT", self._repo_path(repo_slug), json=payload) return self._require_result(result, "update repository", repo_slug) # ==================== COMMITS ==================== def list_commits( self, repo_slug: str, branch: Optional[str] = None, path: Optional[str] = None, limit: int = 20, ) -> list[dict[str, Any]]: """List commits in a repository. Args: repo_slug: Repository slug branch: Filter by branch (optional) path: Filter by file path (optional) limit: Maximum results to return Returns: List of commit info dicts """ return self._paginated_list( self._repo_path(repo_slug, "commits"), limit=limit, include=branch, path=path, ) def get_commit( self, repo_slug: str, commit: str ) -> Optional[dict[str, Any]]: """Get commit details. Args: repo_slug: Repository slug commit: Commit hash (full or short) Returns: Commit info or None if not found """ return self._request("GET", self._repo_path(repo_slug, "commit", commit)) def compare_commits( self, repo_slug: str, base: str, head: str, ) -> Optional[dict[str, Any]]: """Compare two commits or branches (get diff). Args: repo_slug: Repository slug base: Base commit/branch head: Head commit/branch Returns: Diff info including files changed """ # Use diffstat for summary, diff for full content return self._request( "GET", self._repo_path(repo_slug, "diffstat", f"{base}..{head}"), ) # ==================== COMMIT STATUSES ==================== def get_commit_statuses( self, repo_slug: str, commit: str, limit: int = 20, ) -> list[dict[str, Any]]: """Get build/CI statuses for a commit. Args: repo_slug: Repository slug commit: Commit hash limit: Maximum results to return Returns: List of status info dicts """ return self._paginated_list( self._repo_path(repo_slug, "commit", commit, "statuses"), limit=limit, ) def create_commit_status( self, repo_slug: str, commit: str, state: str, key: str, url: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = None, ) -> dict[str, Any]: """Create a build status for a commit. Args: repo_slug: Repository slug commit: Commit hash state: Status state (SUCCESSFUL, FAILED, INPROGRESS, STOPPED) key: Unique identifier for this status url: URL to the build (optional) name: Display name (optional) description: Status description (optional) Returns: Created status info """ payload = { "state": state, "key": key, } if url: payload["url"] = url if name: payload["name"] = name if description: payload["description"] = description result = self._request( "POST", self._repo_path(repo_slug, "commit", commit, "statuses", "build"), json=payload, ) return self._require_result(result, "create status for commit", commit) # ==================== PR COMMENTS & REVIEWS ==================== def list_pr_comments( self, repo_slug: str, pr_id: int, limit: int = 50, ) -> list[dict[str, Any]]: """List comments on a pull request. Args: repo_slug: Repository slug pr_id: Pull request ID limit: Maximum results to return Returns: List of comment info dicts """ return self._paginated_list( self._repo_path(repo_slug, "pullrequests", str(pr_id), "comments"), limit=limit, ) def add_pr_comment( self, repo_slug: str, pr_id: int, content: str, inline: Optional[dict] = None, ) -> dict[str, Any]: """Add a comment to a pull request. Args: repo_slug: Repository slug pr_id: Pull request ID content: Comment content (markdown supported) inline: Inline comment location (optional) {"path": "file.py", "to": 10} for line comment Returns: Created comment info """ payload = { "content": {"raw": content} } if inline: payload["inline"] = inline result = self._request( "POST", self._repo_path(repo_slug, "pullrequests", str(pr_id), "comments"), json=payload, ) return self._require_result(result, "add comment to PR", f"#{pr_id}") def approve_pr( self, repo_slug: str, pr_id: int ) -> dict[str, Any]: """Approve a pull request. Args: repo_slug: Repository slug pr_id: Pull request ID Returns: Approval info """ result = self._request( "POST", self._repo_path(repo_slug, "pullrequests", str(pr_id), "approve"), ) return self._require_result(result, "approve PR", f"#{pr_id}") def unapprove_pr( self, repo_slug: str, pr_id: int ) -> bool: """Remove approval from a pull request. Args: repo_slug: Repository slug pr_id: Pull request ID Returns: True if successful """ self._request( "DELETE", self._repo_path(repo_slug, "pullrequests", str(pr_id), "approve"), ) return True def request_changes_pr( self, repo_slug: str, pr_id: int ) -> dict[str, Any]: """Request changes on a pull request. Args: repo_slug: Repository slug pr_id: Pull request ID Returns: Request info """ result = self._request( "POST", self._repo_path(repo_slug, "pullrequests", str(pr_id), "request-changes"), ) return self._require_result(result, "request changes on PR", f"#{pr_id}") def decline_pr( self, repo_slug: str, pr_id: int ) -> dict[str, Any]: """Decline (close) a pull request. Args: repo_slug: Repository slug pr_id: Pull request ID Returns: Declined PR info """ result = self._request( "POST", self._repo_path(repo_slug, "pullrequests", str(pr_id), "decline"), ) return self._require_result(result, "decline PR", f"#{pr_id}") def get_pr_diff( self, repo_slug: str, pr_id: int ) -> str: """Get diff of a pull request. Args: repo_slug: Repository slug pr_id: Pull request ID Returns: Diff content as string """ path = self._repo_path(repo_slug, "pullrequests", str(pr_id), "diff") return self._request_text(path) or "" # ==================== DEPLOYMENTS ==================== def list_environments( self, repo_slug: str, limit: int = 20, ) -> list[dict[str, Any]]: """List deployment environments. Args: repo_slug: Repository slug limit: Maximum results to return Returns: List of environment info dicts """ return self._paginated_list( self._repo_path(repo_slug, "environments"), limit=limit, ) def get_environment( self, repo_slug: str, environment_uuid: str ) -> Optional[dict[str, Any]]: """Get deployment environment details. Args: repo_slug: Repository slug environment_uuid: Environment UUID Returns: Environment info or None if not found """ environment_uuid = ensure_uuid_braces(environment_uuid) return self._request( "GET", self._repo_path(repo_slug, "environments", environment_uuid), ) def list_deployment_history( self, repo_slug: str, environment_uuid: str, limit: int = 20, ) -> list[dict[str, Any]]: """Get deployment history for an environment. Args: repo_slug: Repository slug environment_uuid: Environment UUID limit: Maximum results to return Returns: List of deployment info dicts """ environment_uuid = ensure_uuid_braces(environment_uuid) return self._paginated_list( self._repo_path(repo_slug, "deployments"), limit=limit, environment=environment_uuid, sort="-state.started_on", ) # ==================== WEBHOOKS ==================== def list_webhooks( self, repo_slug: str, limit: int = 50, ) -> list[dict[str, Any]]: """List webhooks for a repository. Args: repo_slug: Repository slug limit: Maximum results to return Returns: List of webhook info dicts """ return self._paginated_list( self._repo_path(repo_slug, "hooks"), limit=limit, ) def create_webhook( self, repo_slug: str, url: str, events: list[str], description: str = "", active: bool = True, ) -> dict[str, Any]: """Create a webhook. Args: repo_slug: Repository slug url: Webhook URL to call events: List of events to trigger on e.g., ["repo:push", "pullrequest:created", "pullrequest:merged"] description: Webhook description active: Whether webhook is active Returns: Created webhook info """ payload = { "url": url, "events": events, "active": active, } if description: payload["description"] = description result = self._request( "POST", self._repo_path(repo_slug, "hooks"), json=payload, ) return self._require_result(result, "create webhook") def get_webhook( self, repo_slug: str, webhook_uid: str ) -> Optional[dict[str, Any]]: """Get webhook details. Args: repo_slug: Repository slug webhook_uid: Webhook UID Returns: Webhook info or None if not found """ webhook_uid = ensure_uuid_braces(webhook_uid) return self._request( "GET", self._repo_path(repo_slug, "hooks", webhook_uid), ) def delete_webhook( self, repo_slug: str, webhook_uid: str ) -> bool: """Delete a webhook. Args: repo_slug: Repository slug webhook_uid: Webhook UID Returns: True if deleted successfully """ webhook_uid = ensure_uuid_braces(webhook_uid) self._request("DELETE", self._repo_path(repo_slug, "hooks", webhook_uid)) return True # ==================== BRANCHES ==================== def list_branches( self, repo_slug: str, limit: int = 50, ) -> list[dict[str, Any]]: """List branches in a repository. Args: repo_slug: Repository slug limit: Maximum results to return Returns: List of branch info dicts """ return self._paginated_list( self._repo_path(repo_slug, "refs", "branches"), limit=limit, ) def get_branch( self, repo_slug: str, branch_name: str ) -> Optional[dict[str, Any]]: """Get branch information. Args: repo_slug: Repository slug branch_name: Branch name Returns: Branch info or None if not found """ return self._request( "GET", self._repo_path(repo_slug, "refs", "branches", branch_name), ) # ==================== TAGS ==================== def list_tags( self, repo_slug: str, limit: int = 50, ) -> list[dict[str, Any]]: """List tags in a repository. Args: repo_slug: Repository slug limit: Maximum results to return Returns: List of tag info dicts """ return self._paginated_list( self._repo_path(repo_slug, "refs", "tags"), limit=limit, sort="-target.date", ) def create_tag( self, repo_slug: str, name: str, target: str, message: Optional[str] = None, ) -> dict[str, Any]: """Create a tag. Args: repo_slug: Repository slug name: Tag name (e.g., "v1.0.0") target: Commit hash or branch to tag message: Optional tag message (for annotated tags) Returns: Created tag info """ payload = { "name": name, "target": {"hash": target}, } if message: payload["message"] = message result = self._request( "POST", self._repo_path(repo_slug, "refs", "tags"), json=payload, ) return self._require_result(result, "create tag", name) def delete_tag( self, repo_slug: str, tag_name: str ) -> bool: """Delete a tag. Args: repo_slug: Repository slug tag_name: Tag name to delete Returns: True if deleted successfully """ self._request( "DELETE", self._repo_path(repo_slug, "refs", "tags", tag_name), ) return True # ==================== BRANCH RESTRICTIONS ==================== def list_branch_restrictions( self, repo_slug: str, limit: int = 50, ) -> list[dict[str, Any]]: """List branch restrictions. Args: repo_slug: Repository slug limit: Maximum results to return Returns: List of restriction info dicts """ return self._paginated_list( self._repo_path(repo_slug, "branch-restrictions"), limit=limit, ) def create_branch_restriction( self, repo_slug: str, kind: str, pattern: str = "", branch_match_kind: str = "glob", branch_type: Optional[str] = None, users: Optional[list[dict]] = None, groups: Optional[list[dict]] = None, value: Optional[int] = None, ) -> dict[str, Any]: """Create a branch restriction. Args: repo_slug: Repository slug kind: Restriction type. Options: - require_passing_builds_to_merge - require_approvals_to_merge - require_default_reviewer_approvals_to_merge - require_no_changes_requested - require_tasks_to_be_completed - require_commits_behind - push, force, delete, restrict_merges pattern: Branch pattern (e.g., "main", "release/*") branch_match_kind: "glob" or "branching_model" branch_type: If branch_match_kind is "branching_model": development, production, feature, bugfix, release, hotfix users: List of users exempt from restriction groups: List of groups exempt from restriction value: Number value (e.g., required approvals count) Returns: Created restriction info """ payload: dict[str, Any] = { "kind": kind, "branch_match_kind": branch_match_kind, } if branch_match_kind == "glob" and pattern: payload["pattern"] = pattern if branch_match_kind == "branching_model" and branch_type: payload["branch_type"] = branch_type if users: payload["users"] = users if groups: payload["groups"] = groups if value is not None: payload["value"] = value result = self._request( "POST", self._repo_path(repo_slug, "branch-restrictions"), json=payload, ) return self._require_result(result, "create branch restriction", kind) def delete_branch_restriction( self, repo_slug: str, restriction_id: int ) -> bool: """Delete a branch restriction. Args: repo_slug: Repository slug restriction_id: Restriction ID to delete Returns: True if deleted successfully """ self._request( "DELETE", self._repo_path(repo_slug, "branch-restrictions", str(restriction_id)), ) return True # ==================== SOURCE ==================== def get_file_content( self, repo_slug: str, path: str, ref: str = "main", ) -> Optional[str]: """Get file content from repository. Args: repo_slug: Repository slug path: File path (e.g., "src/main.py") ref: Branch, tag, or commit (default: main) Returns: File content as string or None if not found """ return self._request_text(self._repo_path(repo_slug, "src", ref, path)) def list_directory( self, repo_slug: str, path: str = "", ref: str = "main", limit: int = 100, ) -> list[dict[str, Any]]: """List directory contents. Args: repo_slug: Repository slug path: Directory path (empty for root) ref: Branch, tag, or commit (default: main) limit: Maximum results to return Returns: List of file/directory info dicts """ return self._paginated_list( self._repo_path(repo_slug, "src", ref, path) if path else self._repo_path(repo_slug, "src", ref), limit=limit, ) # ==================== REPOSITORY PERMISSIONS ==================== def list_user_permissions( self, repo_slug: str, limit: int = 50, ) -> list[dict[str, Any]]: """List user permissions for a repository. Args: repo_slug: Repository slug limit: Maximum results to return Returns: List of user permission info dicts """ return self._paginated_list( self._repo_path(repo_slug, "permissions-config", "users"), limit=limit, ) def get_user_permission( self, repo_slug: str, selected_user: str ) -> Optional[dict[str, Any]]: """Get permission for a specific user. Args: repo_slug: Repository slug selected_user: User UUID or account_id Returns: User permission info or None if not found """ return self._request( "GET", self._repo_path(repo_slug, "permissions-config", "users", selected_user), ) def update_user_permission( self, repo_slug: str, selected_user: str, permission: str, ) -> dict[str, Any]: """Update (or add) user permission. Args: repo_slug: Repository slug selected_user: User UUID or account_id permission: Permission level: "read", "write", or "admin" Returns: Updated permission info """ result = self._request( "PUT", self._repo_path(repo_slug, "permissions-config", "users", selected_user), json={"permission": permission}, ) return self._require_result(result, "update permission for user", selected_user) def delete_user_permission( self, repo_slug: str, selected_user: str ) -> bool: """Remove user permission from repository. Args: repo_slug: Repository slug selected_user: User UUID or account_id Returns: True if deleted successfully """ self._request( "DELETE", self._repo_path(repo_slug, "permissions-config", "users", selected_user), ) return True def list_group_permissions( self, repo_slug: str, limit: int = 50, ) -> list[dict[str, Any]]: """List group permissions for a repository. Args: repo_slug: Repository slug limit: Maximum results to return Returns: List of group permission info dicts """ return self._paginated_list( self._repo_path(repo_slug, "permissions-config", "groups"), limit=limit, ) def get_group_permission( self, repo_slug: str, group_slug: str ) -> Optional[dict[str, Any]]: """Get permission for a specific group. Args: repo_slug: Repository slug group_slug: Group slug Returns: Group permission info or None if not found """ return self._request( "GET", self._repo_path(repo_slug, "permissions-config", "groups", group_slug), ) def update_group_permission( self, repo_slug: str, group_slug: str, permission: str, ) -> dict[str, Any]: """Update (or add) group permission. Args: repo_slug: Repository slug group_slug: Group slug permission: Permission level: "read", "write", or "admin" Returns: Updated permission info """ result = self._request( "PUT", self._repo_path(repo_slug, "permissions-config", "groups", group_slug), json={"permission": permission}, ) return self._require_result(result, "update permission for group", group_slug) def delete_group_permission( self, repo_slug: str, group_slug: str ) -> bool: """Remove group permission from repository. Args: repo_slug: Repository slug group_slug: Group slug Returns: True if deleted successfully """ self._request( "DELETE", self._repo_path(repo_slug, "permissions-config", "groups", group_slug), ) return True # ==================== UTILITIES ==================== @staticmethod def extract_pr_url(pr_response: dict[str, Any]) -> str: """Extract the HTML URL from a PR response.""" return pr_response.get("links", {}).get("html", {}).get("href", "") @staticmethod def extract_clone_urls(repo_response: dict[str, Any]) -> dict[str, str]: """Extract clone URLs from a repository response.""" urls = {} for link in repo_response.get("links", {}).get("clone", []): name = link.get("name", "").lower() if name in ("https", "ssh"): urls[name] = link.get("href", "") urls["html"] = repo_response.get("links", {}).get("html", {}).get("href", "") return urls # Singleton instance _client: Optional[BitbucketClient] = None def get_client() -> BitbucketClient: """Get or create the Bitbucket client singleton.""" global _client if _client is None: _client = BitbucketClient() return _client

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JaviMaligno/mcp-server-bitbucket'

If you have feedback or need assistance with the MCP directory API, please join our Discord server