Skip to main content
Glama

Unstructured API MCP Server

Official
salesforce.py4.4 kB
import os from typing import List, Optional from mcp.server.fastmcp import Context from unstructured_client.models.operations import ( CreateSourceRequest, GetSourceRequest, UpdateSourceRequest, ) from unstructured_client.models.shared import ( CreateSourceConnector, SalesforceSourceConnectorConfigInput, UpdateSourceConnector, ) from uns_mcp.connectors.utils import ( create_log_for_created_updated_connector, ) def _prepare_salesforce_source_config( username: str, categories: Optional[List[str]] = None, ) -> SalesforceSourceConnectorConfigInput: """Prepare the Salesforce source connector configuration.""" if os.getenv("SALESFORCE_CONSUMER_KEY") is None or os.getenv("SALESFORCE_PRIVATE_KEY") is None: raise ValueError( "SALESFORCE_CONSUMER_KEY or SALESFORCE_PRIVATE_KEY environment variables are not set", ) config = SalesforceSourceConnectorConfigInput( username=username, consumer_key=os.getenv("SALESFORCE_CONSUMER_KEY"), private_key=os.getenv("SALESFORCE_PRIVATE_KEY"), categories=categories, ) return config async def create_salesforce_source( ctx: Context, name: str, username: str, categories: Optional[list[str]] = None, ) -> str: """Create a Salesforce source connector. Args: name: A unique name for this connector username: The Salesforce username categories: Optional Salesforce domain,the names of the Salesforce categories (objects) that you want to access, specified as a comma-separated list. Available categories include Account, Campaign, Case, EmailMessage, and Lead. Returns: String containing the created source connector information """ client = ctx.request_context.lifespan_context.client config = _prepare_salesforce_source_config(username, categories) source_connector = CreateSourceConnector(name=name, type="salesforce", config=config) try: response = await client.sources.create_source_async( request=CreateSourceRequest(create_source_connector=source_connector), ) result = create_log_for_created_updated_connector( response, connector_name="Salesforce", connector_type="Source", created_or_updated="Created", ) return result except Exception as e: return f"Error creating Salesforce source connector: {str(e)}" async def update_salesforce_source( ctx: Context, source_id: str, username: Optional[str] = None, categories: Optional[List[str]] = None, ) -> str: """Update a Salesforce source connector. Args: source_id: ID of the source connector to update username: The Salesforce username categories: Optional Salesforce domain,the names of the Salesforce categories (objects) that you want to access, specified as a comma-separated list. Available categories include Account, Campaign, Case, EmailMessage, and Lead. Returns: String containing the updated source connector information """ client = ctx.request_context.lifespan_context.client # Get the current source connector configuration try: get_response = await client.sources.get_source_async( request=GetSourceRequest(source_id=source_id), ) current_config = get_response.source_connector_information.config except Exception as e: return f"Error retrieving source connector: {str(e)}" # Update configuration with new values config = dict(current_config) if username is not None: config["username"] = username if categories is not None: config["categories"] = categories source_connector = UpdateSourceConnector(config=config) try: response = await client.sources.update_source_async( request=UpdateSourceRequest( source_id=source_id, update_source_connector=source_connector, ), ) result = create_log_for_created_updated_connector( response, connector_name="Salesforce", connector_type="Source", created_or_updated="Updated", ) return result except Exception as e: return f"Error updating Salesforce source connector: {str(e)}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Unstructured-IO/UNS-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server