#!/usr/bin/env python3
# server.py
"""
Salesforce Revenue Cloud MCP Server using FastMCP.
This server provides tools for interacting with Salesforce Revenue Cloud via the Model Context Protocol (MCP).
"""
import os
import json
from typing import Optional
from dotenv import load_dotenv
from fastmcp import FastMCP
from salesforce_auth import get_salesforce_client
from simple_salesforce.exceptions import SalesforceMalformedRequest, SalesforceError
# Load environment variables
load_dotenv()
# Create MCP server instance
mcp = FastMCP("Salesforce Revenue Cloud")
# Global Salesforce client - initialized lazily
sf_client = None
def get_sf_client():
"""Initialize and return the Salesforce client (lazy initialization)."""
global sf_client
if sf_client is None:
try:
print("INFO: Initializing Salesforce client...")
sf_client = get_salesforce_client()
print("INFO: Salesforce client initialized successfully.")
except Exception as e:
print(f"ERROR: Failed to initialize Salesforce client: {e}")
raise
return sf_client
@mcp.tool()
def get_products(product_family: Optional[str] = None) -> str:
"""
Fetches a list of products from Salesforce Revenue Cloud.
Args:
product_family: Optional category/family of products to filter by (e.g., 'Solar Panels')
Returns:
JSON string with product data including id, name, code, description, and family
"""
print(f"INFO: get_products tool called with product_family={product_family}")
try:
client = get_sf_client()
# Build SOQL query
soql_query = "SELECT Id, Name, ProductCode, Description, Family FROM Product2"
if product_family:
# SOQL escaping: escape backslashes, single quotes, and filter potentially dangerous characters
# Note: simple-salesforce doesn't support parameterized queries, so we must escape manually
# Reject strings with wildcards or other SOQL special characters to prevent injection
if any(char in product_family for char in ['%', '_', '"', ';', '\n', '\r']):
return json.dumps({"status": "error", "message": "Invalid characters in product_family parameter"})
safe_family = product_family.replace("\\", "\\\\").replace("'", "\\'")
soql_query += f" WHERE Family = '{safe_family}'"
soql_query += " LIMIT 20"
print(f"INFO: Executing SOQL query: {soql_query}")
query_result = client.query_all(soql_query)
# Extract product data
products = []
for record in query_result.get('records', []):
products.append({
"id": record.get('Id'),
"name": record.get('Name'),
"code": record.get('ProductCode'),
"description": record.get('Description'),
"family": record.get('Family')
})
if not products:
message = "No products found"
if product_family:
message += f" for category '{product_family}'"
message += "."
return json.dumps({"status": "success", "message": message, "data": []})
return json.dumps({
"status": "success",
"message": f"Found {len(products)} products.",
"data": products
})
except SalesforceMalformedRequest as e:
error_content = getattr(e, 'content', 'No additional error content')
print(f"ERROR: Salesforce SOQL Malformed Request: {error_content}")
error_message = "Salesforce query error."
if isinstance(error_content, list) and error_content:
error_detail = error_content[0]
error_message = f"Salesforce query error: {error_detail.get('errorCode', 'N/A')} - {error_detail.get('message', 'No message')}"
elif isinstance(error_content, str):
error_message = f"Salesforce query error: {error_content}"
return json.dumps({"status": "error", "message": error_message})
except SalesforceError as e:
error_content = getattr(e, 'content', 'No additional error content')
print(f"ERROR: Salesforce API Error: {error_content}")
return json.dumps({"status": "error", "message": f"Salesforce API error: {error_content}"})
except Exception as e:
print(f"ERROR: Unexpected error in get_products: {e}")
return json.dumps({"status": "error", "message": f"An unexpected error occurred: {str(e)}"})
@mcp.tool()
def get_price_books() -> str:
"""
Fetches a list of price books from Salesforce Revenue Cloud.
Returns:
JSON string with price book data including id, name, description, and active status
"""
print("INFO: get_price_books tool called")
try:
client = get_sf_client()
# Query Pricebook2 object
soql_query = "SELECT Id, Name, Description, IsActive FROM Pricebook2 LIMIT 20"
print(f"INFO: Executing SOQL query: {soql_query}")
query_result = client.query_all(soql_query)
# Extract price book data
price_books = []
for record in query_result.get('records', []):
price_books.append({
"id": record.get('Id'),
"name": record.get('Name'),
"description": record.get('Description'),
"is_active": record.get('IsActive')
})
if not price_books:
return json.dumps({"status": "success", "message": "No price books found.", "data": []})
return json.dumps({
"status": "success",
"message": f"Found {len(price_books)} price books.",
"data": price_books
})
except Exception as e:
print(f"ERROR: Error in get_price_books: {e}")
return json.dumps({"status": "error", "message": f"Error fetching price books: {str(e)}"})
@mcp.tool()
def get_quotes(limit: int = 20) -> str:
"""
Fetches a list of quotes from Salesforce Revenue Cloud.
Args:
limit: Maximum number of quotes to return (default: 20, max: 100)
Returns:
JSON string with quote data including id, name, status, total price, and expiration date
"""
print(f"INFO: get_quotes tool called with limit={limit}")
# Validate limit - ensure it's an integer in acceptable range
try:
limit = int(limit)
except (ValueError, TypeError):
return json.dumps({"status": "error", "message": "Invalid limit parameter: must be an integer"})
if limit < 1:
limit = 20
elif limit > 100:
limit = 100
try:
client = get_sf_client()
# Query Quote object - limit is validated as int above
soql_query = f"SELECT Id, Name, Status, TotalPrice, ExpirationDate, OpportunityId FROM Quote LIMIT {limit}"
print(f"INFO: Executing SOQL query: {soql_query}")
query_result = client.query_all(soql_query)
# Extract quote data
quotes = []
for record in query_result.get('records', []):
quotes.append({
"id": record.get('Id'),
"name": record.get('Name'),
"status": record.get('Status'),
"total_price": record.get('TotalPrice'),
"expiration_date": record.get('ExpirationDate'),
"opportunity_id": record.get('OpportunityId')
})
if not quotes:
return json.dumps({"status": "success", "message": "No quotes found.", "data": []})
return json.dumps({
"status": "success",
"message": f"Found {len(quotes)} quotes.",
"data": quotes
})
except Exception as e:
print(f"ERROR: Error in get_quotes: {e}")
return json.dumps({"status": "error", "message": f"Error fetching quotes: {str(e)}"})
@mcp.tool()
def get_orders(limit: int = 20) -> str:
"""
Fetches a list of orders from Salesforce Revenue Cloud.
Args:
limit: Maximum number of orders to return (default: 20, max: 100)
Returns:
JSON string with order data including id, order number, status, total amount, and effective date
"""
print(f"INFO: get_orders tool called with limit={limit}")
# Validate limit - ensure it's an integer in acceptable range
try:
limit = int(limit)
except (ValueError, TypeError):
return json.dumps({"status": "error", "message": "Invalid limit parameter: must be an integer"})
if limit < 1:
limit = 20
elif limit > 100:
limit = 100
try:
client = get_sf_client()
# Query Order object - limit is validated as int above
soql_query = f"SELECT Id, OrderNumber, Status, TotalAmount, EffectiveDate, AccountId FROM Order LIMIT {limit}"
print(f"INFO: Executing SOQL query: {soql_query}")
query_result = client.query_all(soql_query)
# Extract order data
orders = []
for record in query_result.get('records', []):
orders.append({
"id": record.get('Id'),
"order_number": record.get('OrderNumber'),
"status": record.get('Status'),
"total_amount": record.get('TotalAmount'),
"effective_date": record.get('EffectiveDate'),
"account_id": record.get('AccountId')
})
if not orders:
return json.dumps({"status": "success", "message": "No orders found.", "data": []})
return json.dumps({
"status": "success",
"message": f"Found {len(orders)} orders.",
"data": orders
})
except Exception as e:
print(f"ERROR: Error in get_orders: {e}")
return json.dumps({"status": "error", "message": f"Error fetching orders: {str(e)}"})
@mcp.tool()
def query_salesforce(soql: str) -> str:
"""
Execute a custom SOQL query against Salesforce.
WARNING: This function executes raw SOQL queries. Use with caution in production environments
as it may expose security risks if user input is not properly validated by the caller.
Args:
soql: The SOQL query to execute (e.g., "SELECT Id, Name FROM Account LIMIT 10")
Returns:
JSON string with query results
"""
print(f"INFO: query_salesforce tool called with SOQL: {soql}")
# Basic validation: ensure query starts with SELECT (read-only)
if not soql.strip().upper().startswith("SELECT"):
return json.dumps({
"status": "error",
"message": "Only SELECT queries are allowed for security reasons"
})
try:
client = get_sf_client()
# Execute the query
print(f"INFO: Executing custom SOQL query: {soql}")
query_result = client.query_all(soql)
# Extract records
records = query_result.get('records', [])
# Clean up records (remove attributes field)
cleaned_records = []
for record in records:
cleaned_record = {k: v for k, v in record.items() if k != 'attributes'}
cleaned_records.append(cleaned_record)
return json.dumps({
"status": "success",
"message": f"Query executed successfully. Found {len(cleaned_records)} records.",
"data": cleaned_records
})
except SalesforceMalformedRequest as e:
error_content = getattr(e, 'content', 'No additional error content')
print(f"ERROR: Salesforce SOQL Malformed Request: {error_content}")
error_message = "Salesforce query error."
if isinstance(error_content, list) and error_content:
error_detail = error_content[0]
error_message = f"Salesforce query error: {error_detail.get('errorCode', 'N/A')} - {error_detail.get('message', 'No message')}"
elif isinstance(error_content, str):
error_message = f"Salesforce query error: {error_content}"
return json.dumps({"status": "error", "message": error_message})
except Exception as e:
print(f"ERROR: Error in query_salesforce: {e}")
return json.dumps({"status": "error", "message": f"Error executing query: {str(e)}"})
if __name__ == "__main__":
# Run the MCP server
mcp.run()