update_destination_connector
Modify configuration settings for data destination connectors like AstraDB, Databricks, MongoDB, Neo4j, Pinecone, S3, or Weaviate to adjust storage parameters and batch processing.
Instructions
Update a destination connector based on type.
Args:
ctx: Context object with the request and lifespan context
destination_id: ID of the destination connector to update
destination_type: The type of destination being updated
type_specific_config:
astradb:
collection_name: (Optional[str]): The AstraDB collection name
keyspace: (Optional[str]): The AstraDB keyspace
batch_size: (Optional[int]) The batch size for inserting documents
databricks_delta_table:
catalog: (Optional[str]): Name of the catalog in Databricks Unity Catalog
database: (Optional[str]): The database in Unity Catalog
http_path: (Optional[str]): The cluster’s or SQL warehouse’s HTTP Path value
server_hostname: (Optional[str]): The Databricks cluster’s or SQL warehouse’s
Server Hostname value
table_name: (Optional[str]): The name of the table in the schema
volume: (Optional[str]): Name of the volume associated with the schema.
schema: (Optional[str]) Name of the schema associated with the volume
volume_path: (Optional[str]) Any target folder path within the volume, starting
from the root of the volume.
databricks_volumes:
catalog: (Optional[str]): Name of the catalog in Databricks
host: (Optional[str]): The Databricks host URL
volume: (Optional[str]): Name of the volume associated with the schema
schema: (Optional[str]) Name of the schema associated with the volume. The default
value is "default".
volume_path: (Optional[str]) Any target folder path within the volume,
starting from the root of the volume.
mongodb:
database: (Optional[str]): The name of the MongoDB database
collection: (Optional[str]): The name of the MongoDB collection
neo4j:
database: (Optional[str]): The Neo4j database, e.g. "neo4j"
uri: (Optional[str]): The Neo4j URI
e.g. neo4j+s://<neo4j_instance_id>.databases.neo4j.io
batch_size: (Optional[int]) The batch size for the connector
pinecone:
index_name: (Optional[str]): The Pinecone index name
namespace: (Optional[str]) The pinecone namespace, a folder inside the
pinecone index
batch_size: (Optional[int]) The batch size
s3:
remote_url: (Optional[str]): The S3 URI to the bucket or folder
weaviate:
cluster_url: (Optional[str]): URL of the Weaviate cluster
collection: (Optional[str]): Name of the collection in the Weaviate cluster
Note: Minimal schema is required for the collection, e.g. record_id: Text
Returns:
String containing the updated destination connector information
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| destination_id | Yes | ||
| destination_type | Yes | ||
| type_specific_config | Yes |
Implementation Reference
- The primary handler for the 'update_destination_connector' tool. Dispatches to type-specific update functions based on the destination_type, handling input validation via type hints and docstring.async def update_destination_connector( ctx: Context, destination_id: str, destination_type: Literal[ "astradb", "databricks_delta_table", "databricks_volumes", "mongodb", "neo4j", "pinecone", "s3", "weaviate", ], type_specific_config: dict[str, Any], ) -> str: """Update a destination connector based on type. Args: ctx: Context object with the request and lifespan context destination_id: ID of the destination connector to update destination_type: The type of destination being updated type_specific_config: astradb: collection_name: (Optional[str]): The AstraDB collection name keyspace: (Optional[str]): The AstraDB keyspace batch_size: (Optional[int]) The batch size for inserting documents databricks_delta_table: catalog: (Optional[str]): Name of the catalog in Databricks Unity Catalog database: (Optional[str]): The database in Unity Catalog http_path: (Optional[str]): The cluster’s or SQL warehouse’s HTTP Path value server_hostname: (Optional[str]): The Databricks cluster’s or SQL warehouse’s Server Hostname value table_name: (Optional[str]): The name of the table in the schema volume: (Optional[str]): Name of the volume associated with the schema. schema: (Optional[str]) Name of the schema associated with the volume volume_path: (Optional[str]) Any target folder path within the volume, starting from the root of the volume. databricks_volumes: catalog: (Optional[str]): Name of the catalog in Databricks host: (Optional[str]): The Databricks host URL volume: (Optional[str]): Name of the volume associated with the schema schema: (Optional[str]) Name of the schema associated with the volume. The default value is "default". volume_path: (Optional[str]) Any target folder path within the volume, starting from the root of the volume. mongodb: database: (Optional[str]): The name of the MongoDB database collection: (Optional[str]): The name of the MongoDB collection neo4j: database: (Optional[str]): The Neo4j database, e.g. "neo4j" uri: (Optional[str]): The Neo4j URI e.g. neo4j+s://<neo4j_instance_id>.databases.neo4j.io batch_size: (Optional[int]) The batch size for the connector pinecone: index_name: (Optional[str]): The Pinecone index name namespace: (Optional[str]) The pinecone namespace, a folder inside the pinecone index batch_size: (Optional[int]) The batch size s3: remote_url: (Optional[str]): The S3 URI to the bucket or folder weaviate: cluster_url: (Optional[str]): URL of the Weaviate cluster collection: (Optional[str]): Name of the collection in the Weaviate cluster Note: Minimal schema is required for the collection, e.g. record_id: Text Returns: String containing the updated destination connector information """ update_functions = { "astradb": update_astradb_destination, "databricks_delta_table": update_databricks_delta_table_destination, "databricks_volumes": update_databricks_volumes_destination, "mongodb": update_mongodb_destination, "neo4j": update_neo4j_destination, "pinecone": update_pinecone_destination, "s3": update_s3_destination, "weaviate": update_weaviate_destination, } if destination_type in update_functions: update_function = update_functions[destination_type] return await update_function(ctx=ctx, destination_id=destination_id, **type_specific_config) return ( f"Unsupported destination type: {destination_type}. " f"Please use a supported destination type: {list(update_functions.keys())}." )
- uns_mcp/connectors/destination/__init__.py:10-15 (registration)Registers the update_destination_connector tool (along with create and delete) using mcp.tool() decorator on the FastMCP server.def register_destination_connectors(mcp: FastMCP): """Register all destination connector tools with the MCP server.""" mcp.tool()(create_destination_connector) mcp.tool()(update_destination_connector) mcp.tool()(delete_destination_connector)
- Example type-specific helper function for updating S3 destination connectors, invoked by the main handler. Similar implementations exist for other destination types.async def update_s3_destination( ctx: Context, destination_id: str, remote_url: Optional[str] = None, recursive: Optional[bool] = None, ) -> str: """Update an S3 destination connector. Args: destination_id: ID of the destination connector to update remote_url: The S3 URI to the bucket or folder Returns: String containing the updated destination connector information """ client = ctx.request_context.lifespan_context.client # Get the current destination connector configuration try: get_response = await client.destinations.get_destination_async( request=GetDestinationRequest(destination_id=destination_id), ) current_config = get_response.destination_connector_information.config except Exception as e: return f"Error retrieving destination connector: {str(e)}" # Update configuration with new values config = dict(current_config) if remote_url is not None: config["remote_url"] = remote_url if recursive is not None: config["recursive"] = recursive destination_connector = UpdateDestinationConnector(config=config) try: response = await client.destinations.update_destination_async( request=UpdateDestinationRequest( destination_id=destination_id, update_destination_connector=destination_connector, ), ) result = create_log_for_created_updated_connector( response, connector_name="S3", connector_type="Destination", created_or_updated="Updated", ) return result except Exception as e: return f"Error updating S3 destination connector: {str(e)}"