MCP Salesforce Connector
by smn2gnt
- src
- salesforce
# /// script
# dependencies = [
# "mcp",
# "simple-salesforce",
# "python-dotenv"
# ]
# ///
import asyncio
import json
from typing import Any, Optional
import os
from dotenv import load_dotenv
from simple_salesforce import Salesforce
from simple_salesforce.exceptions import SalesforceError
import mcp.types as types
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
class SalesforceClient:
"""Handles Salesforce operations and caching."""
def __init__(self):
self.sf: Optional[Salesforce] = None
self.sobjects_cache: dict[str, Any] = {}
def connect(self) -> bool:
"""Establishes connection to Salesforce using environment variables.
Returns:
bool: True if connection successful, False otherwise
"""
try:
self.sf = Salesforce(
username=os.getenv('SALESFORCE_USERNAME'),
password=os.getenv('SALESFORCE_PASSWORD'),
security_token=os.getenv('SALESFORCE_SECURITY_TOKEN')
)
return True
except Exception as e:
print(f"Salesforce connection failed: {str(e)}")
return False
def get_object_fields(self, object_name: str) -> str:
"""Retrieves field Names, labels and typesfor a specific Salesforce object.
Args:
object_name (str): The name of the Salesforce object.
Returns:
str: JSON representation of the object fields.
"""
if not self.sf:
raise ValueError("Salesforce connection not established.")
if object_name not in self.sobjects_cache:
sf_object = getattr(self.sf, object_name)
fields = sf_object.describe()['fields']
filtered_fields = []
for field in fields:
filtered_fields.append({
'label': field['label'],
'name': field['name'],
'updateable': field['updateable'],
'type': field['type'],
'length': field['length'],
'picklistValues': field['picklistValues']
})
self.sobjects_cache[object_name] = filtered_fields
return json.dumps(self.sobjects_cache[object_name], indent=2)
# Create a server instance
server = Server("salesforce-mcp")
# Load environment variables
load_dotenv()
# Configure with Salesforce credentials from environment variables
sf_client = SalesforceClient()
if not sf_client.connect():
print("Failed to initialize Salesforce connection")
# Optionally exit here if Salesforce is required
# sys.exit(1)
# Add tool capabilities to run SOQL queries
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""
List available tools.
Each tool specifies its arguments using JSON Schema validation.
"""
return [
types.Tool(
name="run_soql_query",
description="Executes a SOQL query against Salesforce",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SOQL query to execute",
},
},
"required": ["query"],
},
),
types.Tool(
name="run_sosl_search",
description="Executes a SOSL search against Salesforce",
inputSchema={
"type": "object",
"properties": {
"search": {
"type": "string",
"description": "The SOSL search to execute (e.g., 'FIND {John Smith} IN ALL FIELDS')",
},
},
"required": ["search"],
},
),
types.Tool(
name="get_object_fields",
description="Retrieves field Names, labels and types for a specific Salesforce object",
inputSchema={
"type": "object",
"properties": {
"object_name": {
"type": "string",
"description": "The name of the Salesforce object (e.g., 'Account', 'Contact')",
},
},
"required": ["object_name"],
},
),
types.Tool(
name="get_record",
description="Retrieves a specific record by ID",
inputSchema={
"type": "object",
"properties": {
"object_name": {
"type": "string",
"description": "The name of the Salesforce object (e.g., 'Account', 'Contact')",
},
"record_id": {
"type": "string",
"description": "The ID of the record to retrieve",
},
},
"required": ["object_name", "record_id"],
},
),
types.Tool(
name="create_record",
description="Creates a new record",
inputSchema={
"type": "object",
"properties": {
"object_name": {
"type": "string",
"description": "The name of the Salesforce object (e.g., 'Account', 'Contact')",
},
"data": {
"type": "object",
"description": "The data for the new record",
"properties": {},
"additionalProperties": True,
},
},
"required": ["object_name", "data"],
},
),
types.Tool(
name="update_record",
description="Updates an existing record",
inputSchema={
"type": "object",
"properties": {
"object_name": {
"type": "string",
"description": "The name of the Salesforce object (e.g., 'Account', 'Contact')",
},
"record_id": {
"type": "string",
"description": "The ID of the record to update",
},
"data": {
"type": "object",
"description": "The updated data for the record",
"properties": {},
"additionalProperties": True,
},
},
"required": ["object_name", "record_id", "data"],
},
),
types.Tool(
name="delete_record",
description="Deletes a record",
inputSchema={
"type": "object",
"properties": {
"object_name": {
"type": "string",
"description": "The name of the Salesforce object (e.g., 'Account', 'Contact')",
},
"record_id": {
"type": "string",
"description": "The ID of the record to delete",
},
},
"required": ["object_name", "record_id"],
},
),
types.Tool(
name="tooling_execute",
description="Executes a Tooling API request",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"description": "The Tooling API endpoint to call (e.g., 'sobjects/ApexClass')",
},
"method": {
"type": "string",
"description": "The HTTP method (default: 'GET')",
"enum": ["GET", "POST", "PATCH", "DELETE"],
"default": "GET",
},
"data": {
"type": "object",
"description": "Data for POST/PATCH requests",
"properties": {},
"additionalProperties": True,
},
},
"required": ["action"],
},
),
types.Tool(
name="apex_execute",
description="Executes an Apex REST request",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"description": "The Apex REST endpoint to call (e.g., '/MyApexClass')",
},
"method": {
"type": "string",
"description": "The HTTP method (default: 'GET')",
"enum": ["GET", "POST", "PATCH", "DELETE"],
"default": "GET",
},
"data": {
"type": "object",
"description": "Data for POST/PATCH requests",
"properties": {},
"additionalProperties": True,
},
},
"required": ["action"],
},
),
types.Tool(
name="restful",
description="Makes a direct REST API call to Salesforce",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The path of the REST API endpoint (e.g., 'sobjects/Account/describe')",
},
"method": {
"type": "string",
"description": "The HTTP method (default: 'GET')",
"enum": ["GET", "POST", "PATCH", "DELETE"],
"default": "GET",
},
"params": {
"type": "object",
"description": "Query parameters for the request",
"properties": {},
"additionalProperties": True,
},
"data": {
"type": "object",
"description": "Data for POST/PATCH requests",
"properties": {},
"additionalProperties": True,
},
},
"required": ["path"],
},
),
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, str]) -> list[types.TextContent]:
if name == "run_soql_query":
query = arguments.get("query")
if not query:
raise ValueError("Missing 'query' argument")
results = sf_client.sf.query_all(query)
return [
types.TextContent(
type="text",
text=f"SOQL Query Results (JSON):\n{json.dumps(results, indent=2)}",
)
]
elif name == "run_sosl_search":
search = arguments.get("search")
if not search:
raise ValueError("Missing 'search' argument")
results = sf_client.sf.search(search)
return [
types.TextContent(
type="text",
text=f"SOSL Search Results (JSON):\n{json.dumps(results, indent=2)}",
)
]
elif name == "get_object_fields":
object_name = arguments.get("object_name")
if not object_name:
raise ValueError("Missing 'object_name' argument")
if not sf_client.sf:
raise ValueError("Salesforce connection not established.")
results = sf_client.get_object_fields(object_name)
return [
types.TextContent(
type="text",
text=f"{object_name} Metadata (JSON):\n{results}",
)
]
elif name == "get_record":
object_name = arguments.get("object_name")
record_id = arguments.get("record_id")
if not object_name or not record_id:
raise ValueError("Missing 'object_name' or 'record_id' argument")
if not sf_client.sf:
raise ValueError("Salesforce connection not established.")
sf_object = getattr(sf_client.sf, object_name)
results = sf_object.get(record_id)
return [
types.TextContent(
type="text",
text=f"{object_name} Record (JSON):\n{json.dumps(results, indent=2)}",
)
]
elif name == "create_record":
object_name = arguments.get("object_name")
data = arguments.get("data")
if not object_name or not data:
raise ValueError("Missing 'object_name' or 'data' argument")
if not sf_client.sf:
raise ValueError("Salesforce connection not established.")
sf_object = getattr(sf_client.sf, object_name)
results = sf_object.create(data)
return [
types.TextContent(
type="text",
text=f"Create {object_name} Record Result (JSON):\n{json.dumps(results, indent=2)}",
)
]
elif name == "update_record":
object_name = arguments.get("object_name")
record_id = arguments.get("record_id")
data = arguments.get("data")
if not object_name or not record_id or not data:
raise ValueError("Missing 'object_name', 'record_id', or 'data' argument")
if not sf_client.sf:
raise ValueError("Salesforce connection not established.")
sf_object = getattr(sf_client.sf, object_name)
results = sf_object.update(record_id, data)
return [
types.TextContent(
type="text",
text=f"Update {object_name} Record Result: {results}",
)
]
elif name == "delete_record":
object_name = arguments.get("object_name")
record_id = arguments.get("record_id")
if not object_name or not record_id:
raise ValueError("Missing 'object_name' or 'record_id' argument")
if not sf_client.sf:
raise ValueError("Salesforce connection not established.")
sf_object = getattr(sf_client.sf, object_name)
results = sf_object.delete(record_id)
return [
types.TextContent(
type="text",
text=f"Delete {object_name} Record Result: {results}",
)
]
elif name == "tooling_execute":
action = arguments.get("action")
method = arguments.get("method", "GET")
data = arguments.get("data")
if not action:
raise ValueError("Missing 'action' argument")
if not sf_client.sf:
raise ValueError("Salesforce connection not established.")
results = sf_client.sf.toolingexecute(action, method=method, data=data)
return [
types.TextContent(
type="text",
text=f"Tooling Execute Result (JSON):\n{json.dumps(results, indent=2)}",
)
]
elif name == "apex_execute":
action = arguments.get("action")
method = arguments.get("method", "GET")
data = arguments.get("data")
if not action:
raise ValueError("Missing 'action' argument")
if not sf_client.sf:
raise ValueError("Salesforce connection not established.")
results = sf_client.sf.apexecute(action, method=method, data=data)
return [
types.TextContent(
type="text",
text=f"Apex Execute Result (JSON):\n{json.dumps(results, indent=2)}",
)
]
elif name == "restful":
path = arguments.get("path")
method = arguments.get("method", "GET")
params = arguments.get("params")
data = arguments.get("data")
if not path:
raise ValueError("Missing 'path' argument")
if not sf_client.sf:
raise ValueError("Salesforce connection not established.")
results = sf_client.sf.restful(path, method=method, params=params, json=data)
return [
types.TextContent(
type="text",
text=f"RESTful API Call Result (JSON):\n{json.dumps(results, indent=2)}",
)
]
raise ValueError(f"Unknown tool: {name}")
# Add prompt capabilities for common data analysis tasks
async def run():
async with mcp.server.stdio.stdio_server() as (read, write):
await server.run(
read,
write,
InitializationOptions(
server_name="salesforce-mcp",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(run())