create_destination_connector
Configure data export to destinations like databases, cloud storage, or vector stores by specifying connector type and required parameters.
Instructions
Create a destination connector based on type.
Args:
ctx: Context object with the request and lifespan context
name: A unique name for this connector
destination_type: The type of destination being created
type_specific_config:
astradb:
collection_name: The AstraDB collection name
keyspace: The AstraDB keyspace
batch_size: (Optional[int]) The batch size for inserting documents
databricks_delta_table:
catalog: Name of the catalog in Databricks Unity Catalog
database: The database in Unity Catalog
http_path: The cluster’s or SQL warehouse’s HTTP Path value
server_hostname: The Databricks cluster’s or SQL warehouse’s Server Hostname value
table_name: The name of the table in the schema
volume: Name of the volume associated with the schema.
schema: (Optional[str]) Name of the schema associated with the volume
volume_path: (Optional[str]) Any target folder path within the volume, starting
from the root of the volume.
databricks_volumes:
catalog: Name of the catalog in Databricks
host: The Databricks host URL
volume: Name of the volume associated with the schema
schema: (Optional[str]) Name of the schema associated with the volume. The default
value is "default".
volume_path: (Optional[str]) Any target folder path within the volume,
starting from the root of the volume.
mongodb:
database: The name of the MongoDB database
collection: The name of the MongoDB collection
neo4j:
database: The Neo4j database, e.g. "neo4j"
uri: The Neo4j URI e.g. neo4j+s://<neo4j_instance_id>.databases.neo4j.io
batch_size: (Optional[int]) The batch size for the connector
pinecone:
index_name: The Pinecone index name
namespace: (Optional[str]) The pinecone namespace, a folder inside the
pinecone index
batch_size: (Optional[int]) The batch size
s3:
remote_url: The S3 URI to the bucket or folder
weaviate:
cluster_url: URL of the Weaviate cluster
collection: Name of the collection in the Weaviate cluster
Note: Minimal schema is required for the collection, e.g. record_id: Text
Returns:
String containing the created destination connector information
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| name | Yes | ||
| destination_type | Yes | ||
| type_specific_config | Yes |
Implementation Reference
- The primary handler function implementing the 'create_destination_connector' MCP tool. It dispatches creation logic to type-specific helper functions based on the provided destination_type.async def create_destination_connector( ctx: Context, name: str, destination_type: Literal[ "astradb", "databricks_delta_table", "databricks_volumes", "mongodb", "neo4j", "pinecone", "s3", "weaviate", ], type_specific_config: dict[str, Any], ) -> str: """Create a destination connector based on type. Args: ctx: Context object with the request and lifespan context name: A unique name for this connector destination_type: The type of destination being created type_specific_config: astradb: collection_name: The AstraDB collection name keyspace: The AstraDB keyspace batch_size: (Optional[int]) The batch size for inserting documents databricks_delta_table: catalog: Name of the catalog in Databricks Unity Catalog database: The database in Unity Catalog http_path: The cluster’s or SQL warehouse’s HTTP Path value server_hostname: The Databricks cluster’s or SQL warehouse’s Server Hostname value table_name: The name of the table in the schema volume: Name of the volume associated with the schema. schema: (Optional[str]) Name of the schema associated with the volume volume_path: (Optional[str]) Any target folder path within the volume, starting from the root of the volume. databricks_volumes: catalog: Name of the catalog in Databricks host: The Databricks host URL volume: Name of the volume associated with the schema schema: (Optional[str]) Name of the schema associated with the volume. The default value is "default". volume_path: (Optional[str]) Any target folder path within the volume, starting from the root of the volume. mongodb: database: The name of the MongoDB database collection: The name of the MongoDB collection neo4j: database: The Neo4j database, e.g. "neo4j" uri: The Neo4j URI e.g. neo4j+s://<neo4j_instance_id>.databases.neo4j.io batch_size: (Optional[int]) The batch size for the connector pinecone: index_name: The Pinecone index name namespace: (Optional[str]) The pinecone namespace, a folder inside the pinecone index batch_size: (Optional[int]) The batch size s3: remote_url: The S3 URI to the bucket or folder weaviate: cluster_url: URL of the Weaviate cluster collection: Name of the collection in the Weaviate cluster Note: Minimal schema is required for the collection, e.g. record_id: Text Returns: String containing the created destination connector information """ destination_functions = { "astradb": create_astradb_destination, "databricks_delta_table": create_databricks_delta_table_destination, "databricks_volumes": create_databricks_volumes_destination, "mongodb": create_mongodb_destination, "neo4j": create_neo4j_destination, "pinecone": create_pinecone_destination, "s3": create_s3_destination, "weaviate": create_weaviate_destination, } if destination_type in destination_functions: destination_function = destination_functions[destination_type] return await destination_function(ctx=ctx, name=name, **type_specific_config) return ( f"Unsupported destination type: {destination_type}. " f"Please use a supported destination type {list(destination_functions.keys())}." )
- uns_mcp/connectors/destination/__init__.py:10-14 (registration)Direct registration of the 'create_destination_connector' tool using mcp.tool() decorator within the destination connectors module.def register_destination_connectors(mcp: FastMCP): """Register all destination connector tools with the MCP server.""" mcp.tool()(create_destination_connector) mcp.tool()(update_destination_connector) mcp.tool()(delete_destination_connector)
- uns_mcp/connectors/__init__.py:14-14 (registration)Top-level call to register destination connectors (including 'create_destination_connector') as part of all connectors registration.register_destination_connectors(mcp)
- Type annotations defining the input schema for the tool, including supported destination types via Literal.async def create_destination_connector( ctx: Context, name: str, destination_type: Literal[ "astradb", "databricks_delta_table", "databricks_volumes", "mongodb", "neo4j", "pinecone", "s3", "weaviate", ], type_specific_config: dict[str, Any], ) -> str: