Databricks MCP Server

by JustTryAI
Verified
""" API for managing Databricks clusters. """ import logging from typing import Any, Dict, List, Optional from src.core.utils import DatabricksAPIError, make_api_request # Configure logging logger = logging.getLogger(__name__) async def create_cluster(cluster_config: Dict[str, Any]) -> Dict[str, Any]: """ Create a new Databricks cluster. Args: cluster_config: Cluster configuration Returns: Response containing the cluster ID Raises: DatabricksAPIError: If the API request fails """ logger.info("Creating new cluster") return make_api_request("POST", "/api/2.0/clusters/create", data=cluster_config) async def terminate_cluster(cluster_id: str) -> Dict[str, Any]: """ Terminate a Databricks cluster. Args: cluster_id: ID of the cluster to terminate Returns: Empty response on success Raises: DatabricksAPIError: If the API request fails """ logger.info(f"Terminating cluster: {cluster_id}") return make_api_request("POST", "/api/2.0/clusters/delete", data={"cluster_id": cluster_id}) async def list_clusters() -> Dict[str, Any]: """ List all Databricks clusters. Returns: Response containing a list of clusters Raises: DatabricksAPIError: If the API request fails """ logger.info("Listing all clusters") return make_api_request("GET", "/api/2.0/clusters/list") async def get_cluster(cluster_id: str) -> Dict[str, Any]: """ Get information about a specific cluster. Args: cluster_id: ID of the cluster Returns: Response containing cluster information Raises: DatabricksAPIError: If the API request fails """ logger.info(f"Getting information for cluster: {cluster_id}") return make_api_request("GET", "/api/2.0/clusters/get", params={"cluster_id": cluster_id}) async def start_cluster(cluster_id: str) -> Dict[str, Any]: """ Start a terminated Databricks cluster. Args: cluster_id: ID of the cluster to start Returns: Empty response on success Raises: DatabricksAPIError: If the API request fails """ logger.info(f"Starting cluster: {cluster_id}") return make_api_request("POST", "/api/2.0/clusters/start", data={"cluster_id": cluster_id}) async def resize_cluster(cluster_id: str, num_workers: int) -> Dict[str, Any]: """ Resize a cluster by changing the number of workers. Args: cluster_id: ID of the cluster to resize num_workers: New number of workers Returns: Empty response on success Raises: DatabricksAPIError: If the API request fails """ logger.info(f"Resizing cluster {cluster_id} to {num_workers} workers") return make_api_request( "POST", "/api/2.0/clusters/resize", data={"cluster_id": cluster_id, "num_workers": num_workers} ) async def restart_cluster(cluster_id: str) -> Dict[str, Any]: """ Restart a Databricks cluster. Args: cluster_id: ID of the cluster to restart Returns: Empty response on success Raises: DatabricksAPIError: If the API request fails """ logger.info(f"Restarting cluster: {cluster_id}") return make_api_request("POST", "/api/2.0/clusters/restart", data={"cluster_id": cluster_id})