update_astradb_destination
Modifies an AstraDB destination connector by updating its collection name, keyspace, or batch size. Ensures seamless integration with your data pipeline using the Unstructured API MCP Server.
Instructions
Update an AstraDB destination connector.
Args:
destination_id: ID of the destination connector to update
collection_name: The name of the collection to use (optional)
keyspace: The AstraDB keyspace (optional)
batch_size: The batch size for inserting documents (optional)
Note: We require the users to create their own collection and keyspace before creating the connector.
Returns:
String containing the updated destination connector information
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| batch_size | No | ||
| collection_name | No | ||
| destination_id | Yes | ||
| keyspace | No |
Implementation Reference
- The core asynchronous handler function that implements the logic for updating an AstraDB destination connector. It retrieves current config, prepares updates, and calls the unstructured_client API to perform the update.async def update_astradb_destination( ctx: Context, destination_id: str, collection_name: Optional[str] = None, keyspace: Optional[str] = None, batch_size: Optional[int] = None, ) -> str: """Update an AstraDB destination connector. Args: destination_id: ID of the destination connector to update collection_name: The name of the collection to use (optional) keyspace: The AstraDB keyspace (optional) batch_size: The batch size for inserting documents (optional) Note: We require the users to create their own collection and keyspace before creating the connector. Returns: String containing the updated destination connector information """ client = ctx.request_context.lifespan_context.client # Get the current destination connector configuration try: get_response = await client.destinations.get_destination_async( request=GetDestinationRequest(destination_id=destination_id), ) current_config = get_response.destination_connector_information.config except Exception as e: return f"Error retrieving destination connector: {str(e)}" # Use current values if new ones aren't provided current_config = dict(current_config) if collection_name is None and "collection_name" in current_config: collection_name = current_config["collection_name"] if keyspace is None and "keyspace" in current_config: keyspace = current_config["keyspace"] if batch_size is None and "batch_size" in current_config: batch_size = current_config["batch_size"] try: config = _prepare_astra_dest_config( collection_name=collection_name, keyspace=keyspace, batch_size=batch_size, ) except ValueError as e: return f"Error: {str(e)}" destination_connector = UpdateDestinationConnector(config=config) try: response = await client.destinations.update_destination_async( request=UpdateDestinationRequest( destination_id=destination_id, update_destination_connector=destination_connector, ), ) result = create_log_for_created_updated_connector( response, connector_name="AstraDB", connector_type="Destination", created_or_updated="Updated", ) return result except Exception as e: return f"Error updating AstraDB destination connector: {str(e)}"
- uns_mcp/connectors/destination/destination_tool.py:200-209 (registration)Dispatch registration mapping 'astradb' type to the update_astradb_destination handler function within the generic update_destination_connector tool.update_functions = { "astradb": update_astradb_destination, "databricks_delta_table": update_databricks_delta_table_destination, "databricks_volumes": update_databricks_volumes_destination, "mongodb": update_mongodb_destination, "neo4j": update_neo4j_destination, "pinecone": update_pinecone_destination, "s3": update_s3_destination, "weaviate": update_weaviate_destination, }
- Helper function used by the handler to prepare and validate the AstraDB connector configuration from input parameters and environment variables.def _prepare_astra_dest_config( collection_name: Optional[str] = None, keyspace: Optional[str] = None, batch_size: Optional[int] = None, ) -> AstraDBConnectorConfigInput: """Prepare the AstraDB destination connector configuration.""" token = os.getenv("ASTRA_DB_APPLICATION_TOKEN") api_endpoint = os.getenv("ASTRA_DB_API_ENDPOINT") # Validate required parameters if not token: return ( "Error: AstraDB application token is required. " "Set ASTRA_DB_APPLICATION_TOKEN environment variable." ) if not api_endpoint: return ( "Error: AstraDB API endpoint is required. " "Set ASTRA_DB_API_ENDPOINT environment variable." ) if not collection_name: return "Error: AstraDB collection name is required." if not keyspace: return "Error: AstraDB keyspace is required." config = AstraDBConnectorConfigInput( token=token, api_endpoint=api_endpoint, collection_name=collection_name, keyspace=keyspace, ) # Set optional parameters if provided if batch_size is not None: # Use default if batch_size is not positive if batch_size <= 0: batch_size = 20 logging.info( f"\n Note: Provided batch_size was invalid, using default value of {batch_size}", ) config.batch_size = batch_size return config