Unstructured API MCP Server
Official
- UNS-MCP
- connectors
- destination
import os
from typing import Optional
from mcp.server.fastmcp import Context
from unstructured_client.models.operations import (
CreateDestinationRequest,
DeleteDestinationRequest,
GetDestinationRequest,
UpdateDestinationRequest,
)
from unstructured_client.models.shared import (
CreateDestinationConnector,
DatabricksVDTDestinationConnectorConfigInput,
DestinationConnectorType,
UpdateDestinationConnector,
)
from connectors.utils import (
create_log_for_created_updated_connector,
)
def _prepare_databricks_delta_table_dest_config(
catalog: str,
database: str,
http_path: str,
server_hostname: str,
table_name: str,
volume: str,
schema: Optional[str] = "default",
volume_path: Optional[str] = "/",
) -> DatabricksVDTDestinationConnectorConfigInput:
"""Prepare the Azure source connector configuration."""
client_id = os.getenv("DATABRICKS_CLIENT_ID")
client_secret = os.getenv("DATABRICKS_CLIENT_SECRET")
if client_id is None or client_secret is None:
raise ValueError(
"Both Databricks client id and client secret environment variable are needed",
)
else:
return DatabricksVDTDestinationConnectorConfigInput(
catalog=catalog,
database=database,
http_path=http_path,
server_hostname=server_hostname,
table_name=table_name,
schema_=schema,
volume=volume,
volume_path=volume_path,
client_id=os.getenv("DATABRICKS_CLIENT_ID"),
client_secret=os.getenv("DATABRICKS_CLIENT_SECRET"),
)
async def create_databricks_delta_table_destination(
ctx: Context,
name: str,
catalog: str,
database: str,
http_path: str,
server_hostname: str,
table_name: str,
volume: str,
schema: Optional[str] = "default",
volume_path: Optional[str] = "/",
) -> str:
"""Create an databricks volume destination connector.
Args:
name: A unique name for this connector
catalog: Name of the catalog in the Databricks Unity Catalog service for the workspace.
database: The name of the schema (formerly known as a database)
in Unity Catalog for the target table
http_path: The cluster’s or SQL warehouse’s HTTP Path value
server_hostname: The Databricks cluster’s or SQL warehouse’s Server Hostname value
table_name: The name of the table in the schema
volume: Name of the volume associated with the schema.
schema: Name of the schema associated with the volume. The default value is "default".
volume_path: Any target folder path within the volume, starting from the root of the volume.
Returns:
String containing the created destination connector information
"""
client = ctx.request_context.lifespan_context.client
config = _prepare_databricks_delta_table_dest_config(
catalog,
database,
http_path,
server_hostname,
table_name,
volume,
schema,
volume_path,
)
destination_connector = CreateDestinationConnector(
name=name,
type=DestinationConnectorType.DATABRICKS_VOLUME_DELTA_TABLES,
config=config,
)
try:
response = await client.destinations.create_destination_async(
request=CreateDestinationRequest(create_destination_connector=destination_connector),
)
result = create_log_for_created_updated_connector(
response,
connector_name="Databricks Volumes Delta Table",
connector_type="Destination",
created_or_updated="Created",
)
return result
except Exception as e:
return f"Error creating databricks volumes Delta Table destination connector: {str(e)}"
async def update_databricks_delta_table_destination(
ctx: Context,
destination_id: str,
catalog: Optional[str] = None,
database: Optional[str] = None,
http_path: Optional[str] = None,
server_hostname: Optional[str] = None,
table_name: Optional[str] = None,
schema: Optional[str] = None,
volume: Optional[str] = None,
volume_path: Optional[str] = None,
) -> str:
"""Update an databricks volumes destination connector.
Args:
destination_id: ID of the destination connector to update
database: The name of the schema (formerly known as a database)
in Unity Catalog for the target table
http_path: The cluster’s or SQL warehouse’s HTTP Path value
server_hostname: The Databricks cluster’s or SQL warehouse’s Server Hostname value
volume_path: Any target folder path within the volume to update,
starting from the root of the volume.
Returns:
String containing the updated destination connector information
"""
client = ctx.request_context.lifespan_context.client
# Get the current destination connector configuration
try:
get_response = await client.destinations.get_destination_async(
request=GetDestinationRequest(destination_id=destination_id),
)
current_config = get_response.destination_connector_information.config
except Exception as e:
return f"Error retrieving destination connector: {str(e)}"
# Update configuration with new values
config = dict(current_config)
if catalog is not None:
config["catalog"] = catalog
if database is not None:
config["database"] = database
if server_hostname is not None:
config["server_hostname"] = server_hostname
if http_path is not None:
config["http_path"] = http_path
if table_name is not None:
config["table_name"] = table_name
if volume is not None:
config["volume"] = volume
if schema is not None:
config["schema_"] = schema
if volume_path is not None:
config["volume_path"] = volume_path
destination_connector = UpdateDestinationConnector(config=config)
try:
response = await client.destinations.update_destination_async(
request=UpdateDestinationRequest(
destination_id=destination_id,
update_destination_connector=destination_connector,
),
)
result = create_log_for_created_updated_connector(
response,
connector_name="Databricks Volumes Delta Table",
connector_type="Destination",
created_or_updated="Updated",
)
return result
except Exception as e:
return f"Error updating databricks volumes Delta Table destination connector: {str(e)}"
async def delete_databricks_delta_table_destination(ctx: Context, destination_id: str) -> str:
"""Delete an databricks volumes destination connector.
Args:
destination_id: ID of the destination connector to delete
Returns:
String containing the result of the deletion
"""
client = ctx.request_context.lifespan_context.client
try:
_ = await client.destinations.delete_destination_async(
request=DeleteDestinationRequest(destination_id=destination_id),
)
return f"Databricks volumes Delta Table Destination Connector with ID {destination_id} \
deleted successfully"
except Exception as e:
return f"Error deleting Databricks volumes Delta Table destination connector: {str(e)}"