Databricks MCP Server
by JustTryAI
Verified
"""
API for managing Databricks File System (DBFS).
"""
import base64
import logging
import os
from typing import Any, Dict, List, Optional, BinaryIO
from src.core.utils import DatabricksAPIError, make_api_request
# Configure logging
logger = logging.getLogger(__name__)
async def put_file(
dbfs_path: str,
file_content: bytes,
overwrite: bool = True,
) -> Dict[str, Any]:
"""
Upload a file to DBFS.
Args:
dbfs_path: The path where the file should be stored in DBFS
file_content: The content of the file as bytes
overwrite: Whether to overwrite an existing file
Returns:
Empty response on success
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Uploading file to DBFS path: {dbfs_path}")
# Convert bytes to base64
content_base64 = base64.b64encode(file_content).decode("utf-8")
return make_api_request(
"POST",
"/api/2.0/dbfs/put",
data={
"path": dbfs_path,
"contents": content_base64,
"overwrite": overwrite,
},
)
async def upload_large_file(
dbfs_path: str,
local_file_path: str,
overwrite: bool = True,
buffer_size: int = 1024 * 1024, # 1MB chunks
) -> Dict[str, Any]:
"""
Upload a large file to DBFS in chunks.
Args:
dbfs_path: The path where the file should be stored in DBFS
local_file_path: Local path to the file to upload
overwrite: Whether to overwrite an existing file
buffer_size: Size of chunks to upload
Returns:
Empty response on success
Raises:
DatabricksAPIError: If the API request fails
FileNotFoundError: If the local file does not exist
"""
logger.info(f"Uploading large file from {local_file_path} to DBFS path: {dbfs_path}")
if not os.path.exists(local_file_path):
raise FileNotFoundError(f"Local file not found: {local_file_path}")
# Create a handle for the upload
create_response = make_api_request(
"POST",
"/api/2.0/dbfs/create",
data={
"path": dbfs_path,
"overwrite": overwrite,
},
)
handle = create_response.get("handle")
try:
with open(local_file_path, "rb") as f:
chunk_index = 0
while True:
chunk = f.read(buffer_size)
if not chunk:
break
# Convert chunk to base64
chunk_base64 = base64.b64encode(chunk).decode("utf-8")
# Add to handle
make_api_request(
"POST",
"/api/2.0/dbfs/add-block",
data={
"handle": handle,
"data": chunk_base64,
},
)
chunk_index += 1
logger.debug(f"Uploaded chunk {chunk_index}")
# Close the handle
return make_api_request(
"POST",
"/api/2.0/dbfs/close",
data={"handle": handle},
)
except Exception as e:
# Attempt to abort the upload on error
try:
make_api_request(
"POST",
"/api/2.0/dbfs/close",
data={"handle": handle},
)
except Exception:
pass
logger.error(f"Error uploading file: {str(e)}")
raise
async def get_file(
dbfs_path: str,
offset: int = 0,
length: int = 1024 * 1024, # Default to 1MB
) -> Dict[str, Any]:
"""
Get the contents of a file from DBFS.
Args:
dbfs_path: The path of the file in DBFS
offset: Starting byte position
length: Number of bytes to read
Returns:
Response containing the file content
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Reading file from DBFS path: {dbfs_path}")
response = make_api_request(
"GET",
"/api/2.0/dbfs/read",
params={
"path": dbfs_path,
"offset": offset,
"length": length,
},
)
# Decode base64 content
if "data" in response:
try:
response["decoded_data"] = base64.b64decode(response["data"])
except Exception as e:
logger.warning(f"Failed to decode file content: {str(e)}")
return response
async def list_files(dbfs_path: str) -> Dict[str, Any]:
"""
List files and directories in a DBFS path.
Args:
dbfs_path: The path to list
Returns:
Response containing the directory listing
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Listing files in DBFS path: {dbfs_path}")
return make_api_request("GET", "/api/2.0/dbfs/list", params={"path": dbfs_path})
async def delete_file(
dbfs_path: str,
recursive: bool = False,
) -> Dict[str, Any]:
"""
Delete a file or directory from DBFS.
Args:
dbfs_path: The path to delete
recursive: Whether to recursively delete directories
Returns:
Empty response on success
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Deleting DBFS path: {dbfs_path}")
return make_api_request(
"POST",
"/api/2.0/dbfs/delete",
data={
"path": dbfs_path,
"recursive": recursive,
},
)
async def get_status(dbfs_path: str) -> Dict[str, Any]:
"""
Get the status of a file or directory.
Args:
dbfs_path: The path to check
Returns:
Response containing file status
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Getting status of DBFS path: {dbfs_path}")
return make_api_request("GET", "/api/2.0/dbfs/get-status", params={"path": dbfs_path})
async def create_directory(dbfs_path: str) -> Dict[str, Any]:
"""
Create a directory in DBFS.
Args:
dbfs_path: The path to create
Returns:
Empty response on success
Raises:
DatabricksAPIError: If the API request fails
"""
logger.info(f"Creating DBFS directory: {dbfs_path}")
return make_api_request("POST", "/api/2.0/dbfs/mkdirs", data={"path": dbfs_path})