"""Branch and tag management tools for Bitbucket MCP Server."""
from typing import Optional
import httpx
from bitbucket_mcp.server import mcp, get_auth, get_workspace, BITBUCKET_API
@mcp.tool()
def list_branches(
repo_slug: str,
workspace: Optional[str] = None,
page: int = 1,
pagelen: int = 25,
query: Optional[str] = None
) -> dict:
"""
List branches in a repository.
Args:
repo_slug: Repository slug (name)
workspace: Bitbucket workspace (optional if configured)
page: Page number for pagination (default: 1)
pagelen: Number of results per page, max 100 (default: 25)
query: Optional search query to filter branches by name
Returns:
List of branches with their details
"""
try:
ws = get_workspace(workspace)
auth = get_auth()
params = {
"page": page,
"pagelen": min(pagelen, 100)
}
if query:
params["q"] = f'name ~ "{query}"'
with httpx.Client() as client:
response = client.get(
f"{BITBUCKET_API}/repositories/{ws}/{repo_slug}/refs/branches",
auth=auth,
params=params,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
branches = []
for branch in data.get("values", []):
target = branch.get("target", {})
branches.append({
"name": branch.get("name"),
"commit_hash": target.get("hash"),
"commit_message": target.get("message", "").split("\n")[0] if target.get("message") else None,
"commit_author": target.get("author", {}).get("user", {}).get("display_name") if target.get("author") else None,
"commit_date": target.get("date"),
"default": branch.get("name") == data.get("mainbranch", {}).get("name") if data.get("mainbranch") else False
})
return {
"success": True,
"branches": branches,
"total": data.get("size", len(branches)),
"page": data.get("page", page),
"pagelen": data.get("pagelen", pagelen)
}
elif response.status_code == 401:
return {"success": False, "error": "Authentication failed. Please reconfigure with setup_bitbucket."}
elif response.status_code == 404:
return {"success": False, "error": f"Repository '{ws}/{repo_slug}' not found."}
else:
return {"success": False, "error": f"API error: {response.status_code} - {response.text}"}
except ValueError as e:
return {"success": False, "error": str(e)}
except httpx.RequestError as e:
return {"success": False, "error": f"Connection error: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"Unexpected error: {str(e)}"}
@mcp.tool()
def create_branch(
repo_slug: str,
name: str,
target: str,
workspace: Optional[str] = None
) -> dict:
"""
Create a new branch in a repository.
Args:
repo_slug: Repository slug (name)
name: Name for the new branch
target: Commit hash, branch name, or tag to branch from
workspace: Bitbucket workspace (optional if configured)
Returns:
Created branch details or error message
"""
try:
ws = get_workspace(workspace)
auth = get_auth()
payload = {
"name": name,
"target": {
"hash": target
}
}
with httpx.Client() as client:
response = client.post(
f"{BITBUCKET_API}/repositories/{ws}/{repo_slug}/refs/branches",
auth=auth,
json=payload,
timeout=30.0
)
if response.status_code in (200, 201):
branch = response.json()
target_info = branch.get("target", {})
return {
"success": True,
"name": branch.get("name"),
"commit_hash": target_info.get("hash"),
"commit_message": target_info.get("message", "").split("\n")[0] if target_info.get("message") else None
}
elif response.status_code == 400:
error_data = response.json()
error_msg = error_data.get("error", {}).get("message", "Bad request")
return {"success": False, "error": error_msg}
elif response.status_code == 401:
return {"success": False, "error": "Authentication failed. Please reconfigure with setup_bitbucket."}
elif response.status_code == 404:
return {"success": False, "error": f"Repository '{ws}/{repo_slug}' not found."}
else:
return {"success": False, "error": f"API error: {response.status_code} - {response.text}"}
except ValueError as e:
return {"success": False, "error": str(e)}
except httpx.RequestError as e:
return {"success": False, "error": f"Connection error: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"Unexpected error: {str(e)}"}
@mcp.tool()
def delete_branch(
repo_slug: str,
name: str,
workspace: Optional[str] = None
) -> dict:
"""
Delete a branch from a repository.
Args:
repo_slug: Repository slug (name)
name: Branch name to delete
workspace: Bitbucket workspace (optional if configured)
Returns:
Confirmation or error message
"""
try:
ws = get_workspace(workspace)
auth = get_auth()
with httpx.Client() as client:
response = client.delete(
f"{BITBUCKET_API}/repositories/{ws}/{repo_slug}/refs/branches/{name}",
auth=auth,
timeout=30.0
)
if response.status_code == 204:
return {
"success": True,
"message": f"Branch '{name}' deleted successfully."
}
elif response.status_code == 401:
return {"success": False, "error": "Authentication failed. Please reconfigure with setup_bitbucket."}
elif response.status_code == 403:
return {"success": False, "error": f"Permission denied. Cannot delete branch '{name}'."}
elif response.status_code == 404:
return {"success": False, "error": f"Branch '{name}' not found in '{ws}/{repo_slug}'."}
else:
return {"success": False, "error": f"API error: {response.status_code} - {response.text}"}
except ValueError as e:
return {"success": False, "error": str(e)}
except httpx.RequestError as e:
return {"success": False, "error": f"Connection error: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"Unexpected error: {str(e)}"}
@mcp.tool()
def list_tags(
repo_slug: str,
workspace: Optional[str] = None,
page: int = 1,
pagelen: int = 25,
query: Optional[str] = None
) -> dict:
"""
List tags in a repository.
Args:
repo_slug: Repository slug (name)
workspace: Bitbucket workspace (optional if configured)
page: Page number for pagination (default: 1)
pagelen: Number of results per page, max 100 (default: 25)
query: Optional search query to filter tags by name
Returns:
List of tags with their details
"""
try:
ws = get_workspace(workspace)
auth = get_auth()
params = {
"page": page,
"pagelen": min(pagelen, 100)
}
if query:
params["q"] = f'name ~ "{query}"'
with httpx.Client() as client:
response = client.get(
f"{BITBUCKET_API}/repositories/{ws}/{repo_slug}/refs/tags",
auth=auth,
params=params,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
tags = []
for tag in data.get("values", []):
target = tag.get("target", {})
tags.append({
"name": tag.get("name"),
"commit_hash": target.get("hash"),
"commit_message": target.get("message", "").split("\n")[0] if target.get("message") else None,
"commit_date": target.get("date"),
"tagger": target.get("author", {}).get("user", {}).get("display_name") if target.get("author") else None
})
return {
"success": True,
"tags": tags,
"total": data.get("size", len(tags)),
"page": data.get("page", page),
"pagelen": data.get("pagelen", pagelen)
}
elif response.status_code == 401:
return {"success": False, "error": "Authentication failed. Please reconfigure with setup_bitbucket."}
elif response.status_code == 404:
return {"success": False, "error": f"Repository '{ws}/{repo_slug}' not found."}
else:
return {"success": False, "error": f"API error: {response.status_code} - {response.text}"}
except ValueError as e:
return {"success": False, "error": str(e)}
except httpx.RequestError as e:
return {"success": False, "error": f"Connection error: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"Unexpected error: {str(e)}"}
@mcp.tool()
def create_tag(
repo_slug: str,
name: str,
target: str,
workspace: Optional[str] = None
) -> dict:
"""
Create a new tag in a repository.
Args:
repo_slug: Repository slug (name)
name: Name for the new tag
target: Commit hash to tag
workspace: Bitbucket workspace (optional if configured)
Returns:
Created tag details or error message
"""
try:
ws = get_workspace(workspace)
auth = get_auth()
payload = {
"name": name,
"target": {
"hash": target
}
}
with httpx.Client() as client:
response = client.post(
f"{BITBUCKET_API}/repositories/{ws}/{repo_slug}/refs/tags",
auth=auth,
json=payload,
timeout=30.0
)
if response.status_code in (200, 201):
tag = response.json()
target_info = tag.get("target", {})
return {
"success": True,
"name": tag.get("name"),
"commit_hash": target_info.get("hash"),
"commit_message": target_info.get("message", "").split("\n")[0] if target_info.get("message") else None
}
elif response.status_code == 400:
error_data = response.json()
error_msg = error_data.get("error", {}).get("message", "Bad request")
return {"success": False, "error": error_msg}
elif response.status_code == 401:
return {"success": False, "error": "Authentication failed. Please reconfigure with setup_bitbucket."}
elif response.status_code == 404:
return {"success": False, "error": f"Repository '{ws}/{repo_slug}' not found."}
else:
return {"success": False, "error": f"API error: {response.status_code} - {response.text}"}
except ValueError as e:
return {"success": False, "error": str(e)}
except httpx.RequestError as e:
return {"success": False, "error": f"Connection error: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"Unexpected error: {str(e)}"}
@mcp.tool()
def delete_tag(
repo_slug: str,
name: str,
workspace: Optional[str] = None
) -> dict:
"""
Delete a tag from a repository.
Args:
repo_slug: Repository slug (name)
name: Tag name to delete
workspace: Bitbucket workspace (optional if configured)
Returns:
Confirmation or error message
"""
try:
ws = get_workspace(workspace)
auth = get_auth()
with httpx.Client() as client:
response = client.delete(
f"{BITBUCKET_API}/repositories/{ws}/{repo_slug}/refs/tags/{name}",
auth=auth,
timeout=30.0
)
if response.status_code == 204:
return {
"success": True,
"message": f"Tag '{name}' deleted successfully."
}
elif response.status_code == 401:
return {"success": False, "error": "Authentication failed. Please reconfigure with setup_bitbucket."}
elif response.status_code == 403:
return {"success": False, "error": f"Permission denied. Cannot delete tag '{name}'."}
elif response.status_code == 404:
return {"success": False, "error": f"Tag '{name}' not found in '{ws}/{repo_slug}'."}
else:
return {"success": False, "error": f"API error: {response.status_code} - {response.text}"}
except ValueError as e:
return {"success": False, "error": str(e)}
except httpx.RequestError as e:
return {"success": False, "error": f"Connection error: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"Unexpected error: {str(e)}"}
@mcp.tool()
def get_branching_model(
repo_slug: str,
workspace: Optional[str] = None
) -> dict:
"""
Get the branching model configuration for a repository.
Args:
repo_slug: Repository slug (name)
workspace: Bitbucket workspace (optional if configured)
Returns:
Branching model details including branch types and development/production branches
"""
try:
ws = get_workspace(workspace)
auth = get_auth()
with httpx.Client() as client:
response = client.get(
f"{BITBUCKET_API}/repositories/{ws}/{repo_slug}/branching-model",
auth=auth,
timeout=30.0
)
if response.status_code == 200:
data = response.json()
branch_types = []
for bt in data.get("branch_types", []):
branch_types.append({
"kind": bt.get("kind"),
"prefix": bt.get("prefix"),
"enabled": bt.get("enabled", True)
})
development = data.get("development", {})
production = data.get("production", {})
return {
"success": True,
"branch_types": branch_types,
"development_branch": development.get("branch", {}).get("name") if development.get("branch") else development.get("name"),
"production_branch": production.get("branch", {}).get("name") if production.get("branch") else production.get("name") if production else None,
"use_mainbranch": development.get("use_mainbranch", True)
}
elif response.status_code == 401:
return {"success": False, "error": "Authentication failed. Please reconfigure with setup_bitbucket."}
elif response.status_code == 404:
return {"success": False, "error": f"Repository '{ws}/{repo_slug}' not found or branching model not configured."}
else:
return {"success": False, "error": f"API error: {response.status_code} - {response.text}"}
except ValueError as e:
return {"success": False, "error": str(e)}
except httpx.RequestError as e:
return {"success": False, "error": f"Connection error: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"Unexpected error: {str(e)}"}