Unstructured API MCP Server

Official
import os from typing import Optional from mcp.server.fastmcp import Context from unstructured_client.models.operations import ( CreateDestinationRequest, DeleteDestinationRequest, GetDestinationRequest, UpdateDestinationRequest, ) from unstructured_client.models.shared import ( CreateDestinationConnector, DestinationConnectorType, UpdateDestinationConnector, WeaviateDestinationConnectorConfigInput, ) from connectors.utils import ( create_log_for_created_updated_connector, ) def _prepare_weaviate_dest_config( collection: str, cluster_url: str, ) -> WeaviateDestinationConnectorConfigInput: """Prepare the Azure source connector configuration.""" return WeaviateDestinationConnectorConfigInput( cluster_url=cluster_url, api_key=os.getenv("WEAVIATE_CLOUD_API_KEY"), collection=collection, ) async def create_weaviate_destination( ctx: Context, name: str, cluster_url: str, collection: str, ) -> str: """Create an weaviate vector database destination connector. Args: cluster_url: URL of the weaviate cluster collection : Name of the collection to use in the weaviate cluster Note: The collection is a table in the weaviate cluster. In platform, there are dedicated code to generate collection for users here, due to the simplicity of the server, we are not generating it for users. Returns: String containing the created destination connector information """ client = ctx.request_context.lifespan_context.client config = _prepare_weaviate_dest_config(collection, cluster_url) destination_connector = CreateDestinationConnector( name=name, type=DestinationConnectorType.WEAVIATE_CLOUD, config=config, ) try: response = await client.destinations.create_destination_async( request=CreateDestinationRequest(create_destination_connector=destination_connector), ) result = create_log_for_created_updated_connector( response, connector_name="Weaviate", connector_type="Destination", created_or_updated="Created", ) return result except Exception as e: return f"Error creating weaviate destination connector: {str(e)}" async def update_weaviate_destination( ctx: Context, destination_id: str, cluster_url: Optional[str] = None, collection: Optional[str] = None, ) -> str: """Update an weaviate destination connector. Args: destination_id: ID of the destination connector to update cluster_url (optional): URL of the weaviate cluster collection (optional): Name of the collection(like a file) to use in the weaviate cluster Returns: String containing the updated destination connector information """ client = ctx.request_context.lifespan_context.client # Get the current destination connector configuration try: get_response = await client.destinations.get_destination_async( request=GetDestinationRequest(destination_id=destination_id), ) current_config = get_response.destination_connector_information.config except Exception as e: return f"Error retrieving destination connector: {str(e)}" # Update configuration with new values config = dict(current_config) if cluster_url is not None: config["cluster_url"] = cluster_url if collection is not None: config["collection"] = collection destination_connector = UpdateDestinationConnector(config=config) try: response = await client.destinations.update_destination_async( request=UpdateDestinationRequest( destination_id=destination_id, update_destination_connector=destination_connector, ), ) result = create_log_for_created_updated_connector( response, connector_name="Weaviate", connector_type="Destination", created_or_updated="Updated", ) return result except Exception as e: return f"Error updating weaviate destination connector: {str(e)}" async def delete_weaviate_destination(ctx: Context, destination_id: str) -> str: """Delete an weaviate destination connector. Args: destination_id: ID of the destination connector to delete Returns: String containing the result of the deletion """ client = ctx.request_context.lifespan_context.client try: _ = await client.destinations.delete_destination_async( request=DeleteDestinationRequest(destination_id=destination_id), ) return f"weaviate Destination Connector with ID {destination_id} deleted successfully" except Exception as e: return f"Error deleting weaviate destination connector: {str(e)}"