Databricks MCP Server
by JustTryAI
Verified
"""
API for managing Databricks jobs.
"""
import logging
from typing import Any, Dict, List, Optional
from src.core.utils import DatabricksAPIError, make_api_request
# Configure logging
logger = logging.getLogger(__name__)
async def create_job(job_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Create a new Databricks job.
Args:
job_config: Job configuration
Returns:
Response containing the job ID
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info("Creating new job")
return make_api_request("POST", "/api/2.0/jobs/create", data=job_config)
async def run_job(job_id: int, notebook_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Run a job now.
Args:
job_id: ID of the job to run
notebook_params: Optional parameters for the notebook
Returns:
Response containing the run ID
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Running job: {job_id}")
run_params = {"job_id": job_id}
if notebook_params:
run_params["notebook_params"] = notebook_params
return make_api_request("POST", "/api/2.0/jobs/run-now", data=run_params)
async def list_jobs() -> Dict[str, Any]:
"""
List all jobs.
Returns:
Response containing a list of jobs
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info("Listing all jobs")
return make_api_request("GET", "/api/2.0/jobs/list")
async def get_job(job_id: int) -> Dict[str, Any]:
"""
Get information about a specific job.
Args:
job_id: ID of the job
Returns:
Response containing job information
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Getting information for job: {job_id}")
return make_api_request("GET", "/api/2.0/jobs/get", params={"job_id": job_id})
async def update_job(job_id: int, new_settings: Dict[str, Any]) -> Dict[str, Any]:
"""
Update an existing job.
Args:
job_id: ID of the job to update
new_settings: New job settings
Returns:
Empty response on success
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Updating job: {job_id}")
update_data = {
"job_id": job_id,
"new_settings": new_settings
}
return make_api_request("POST", "/api/2.0/jobs/update", data=update_data)
async def delete_job(job_id: int) -> Dict[str, Any]:
"""
Delete a job.
Args:
job_id: ID of the job to delete
Returns:
Empty response on success
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Deleting job: {job_id}")
return make_api_request("POST", "/api/2.0/jobs/delete", data={"job_id": job_id})
async def get_run(run_id: int) -> Dict[str, Any]:
"""
Get information about a specific job run.
Args:
run_id: ID of the run
Returns:
Response containing run information
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Getting information for run: {run_id}")
return make_api_request("GET", "/api/2.0/jobs/runs/get", params={"run_id": run_id})
async def cancel_run(run_id: int) -> Dict[str, Any]:
"""
Cancel a job run.
Args:
run_id: ID of the run to cancel
Returns:
Empty response on success
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Cancelling run: {run_id}")
return make_api_request("POST", "/api/2.0/jobs/runs/cancel", data={"run_id": run_id})