HubSpot MCP Server
by baryhuang
- mcp-hubspot
- src
- mcp_server_hubspot
import logging
from typing import Any, Dict, List, Optional
import os
from dotenv import load_dotenv
from hubspot import HubSpot
from hubspot.crm.contacts import SimplePublicObjectInputForCreate
from hubspot.crm.contacts.exceptions import ApiException
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from pydantic import AnyUrl
import json
from datetime import datetime, timedelta
from dateutil.tz import tzlocal
import argparse
logger = logging.getLogger('mcp_hubspot_server')
def convert_datetime_fields(obj: Any) -> Any:
"""Convert any datetime or tzlocal objects to string in the given object"""
if isinstance(obj, dict):
return {k: convert_datetime_fields(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_datetime_fields(item) for item in obj]
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, tzlocal):
# Get the current timezone offset
offset = datetime.now(tzlocal()).strftime('%z')
return f"UTC{offset[:3]}:{offset[3:]}" # Format like "UTC+08:00" or "UTC-05:00"
return obj
class HubSpotClient:
def __init__(self, access_token: Optional[str] = None):
access_token = access_token or os.getenv("HUBSPOT_ACCESS_TOKEN")
logger.debug(f"Using access token: {'[MASKED]' if access_token else 'None'}")
if not access_token:
raise ValueError("HUBSPOT_ACCESS_TOKEN environment variable is required")
self.client = HubSpot(access_token=access_token)
def get_recent_companies(self, limit: int = 10) -> str:
"""Get most recently active companies from HubSpot
Args:
limit: Maximum number of companies to return (default: 10)
"""
try:
from hubspot.crm.companies import PublicObjectSearchRequest
# Create search request with sort by lastmodifieddate
search_request = PublicObjectSearchRequest(
sorts=[{
"propertyName": "lastmodifieddate",
"direction": "DESCENDING"
}],
limit=limit,
properties=["name", "domain", "website", "phone", "industry", "hs_lastmodifieddate"]
)
# Execute the search
search_response = self.client.crm.companies.search_api.do_search(
public_object_search_request=search_request
)
# Convert the response to a dictionary
companies_dict = [company.to_dict() for company in search_response.results]
converted_companies = convert_datetime_fields(companies_dict)
return json.dumps(converted_companies)
except ApiException as e:
logger.error(f"API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception: {str(e)}")
return json.dumps({"error": str(e)})
def get_recent_contacts(self, limit: int = 10) -> str:
"""Get most recently active contacts from HubSpot
Args:
limit: Maximum number of contacts to return (default: 10)
"""
try:
from hubspot.crm.contacts import PublicObjectSearchRequest
# Create search request with sort by lastmodifieddate
search_request = PublicObjectSearchRequest(
sorts=[{
"propertyName": "lastmodifieddate",
"direction": "DESCENDING"
}],
limit=limit,
properties=["firstname", "lastname", "email", "phone", "company", "hs_lastmodifieddate", "lastmodifieddate"]
)
# Execute the search
search_response = self.client.crm.contacts.search_api.do_search(
public_object_search_request=search_request
)
# Convert the response to a dictionary
contacts_dict = [contact.to_dict() for contact in search_response.results]
converted_contacts = convert_datetime_fields(contacts_dict)
return json.dumps(converted_contacts)
except ApiException as e:
logger.error(f"API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception: {str(e)}")
return json.dumps({"error": str(e)})
def get_company_activity(self, company_id: str) -> str:
"""Get activity history for a specific company"""
try:
# Step 1: Get all engagement IDs associated with the company using CRM Associations v4 API
associated_engagements = self.client.crm.associations.v4.basic_api.get_page(
object_type="companies",
object_id=company_id,
to_object_type="engagements",
limit=500
)
# Extract engagement IDs from the associations response
engagement_ids = []
if hasattr(associated_engagements, 'results'):
for result in associated_engagements.results:
engagement_ids.append(result.to_object_id)
# Step 2: Get detailed information for each engagement
activities = []
for engagement_id in engagement_ids:
engagement_response = self.client.api_request({
"method": "GET",
"path": f"/engagements/v1/engagements/{engagement_id}"
}).json()
engagement_data = engagement_response.get('engagement', {})
metadata = engagement_response.get('metadata', {})
# Format the engagement
formatted_engagement = {
"id": engagement_data.get("id"),
"type": engagement_data.get("type"),
"created_at": engagement_data.get("createdAt"),
"last_updated": engagement_data.get("lastUpdated"),
"created_by": engagement_data.get("createdBy"),
"modified_by": engagement_data.get("modifiedBy"),
"timestamp": engagement_data.get("timestamp"),
"associations": engagement_response.get("associations", {})
}
# Add type-specific metadata formatting
if engagement_data.get("type") == "NOTE":
formatted_engagement["content"] = metadata.get("body", "")
elif engagement_data.get("type") == "EMAIL":
formatted_engagement["content"] = {
"subject": metadata.get("subject", ""),
"from": {
"raw": metadata.get("from", {}).get("raw", ""),
"email": metadata.get("from", {}).get("email", ""),
"firstName": metadata.get("from", {}).get("firstName", ""),
"lastName": metadata.get("from", {}).get("lastName", "")
},
"to": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("to", [])],
"cc": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("cc", [])],
"bcc": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("bcc", [])],
"sender": {
"email": metadata.get("sender", {}).get("email", "")
},
"body": metadata.get("text", "") or metadata.get("html", "")
}
elif engagement_data.get("type") == "TASK":
formatted_engagement["content"] = {
"subject": metadata.get("subject", ""),
"body": metadata.get("body", ""),
"status": metadata.get("status", ""),
"for_object_type": metadata.get("forObjectType", "")
}
elif engagement_data.get("type") == "MEETING":
formatted_engagement["content"] = {
"title": metadata.get("title", ""),
"body": metadata.get("body", ""),
"start_time": metadata.get("startTime"),
"end_time": metadata.get("endTime"),
"internal_notes": metadata.get("internalMeetingNotes", "")
}
elif engagement_data.get("type") == "CALL":
formatted_engagement["content"] = {
"body": metadata.get("body", ""),
"from_number": metadata.get("fromNumber", ""),
"to_number": metadata.get("toNumber", ""),
"duration_ms": metadata.get("durationMilliseconds"),
"status": metadata.get("status", ""),
"disposition": metadata.get("disposition", "")
}
activities.append(formatted_engagement)
# Convert any datetime fields and return
converted_activities = convert_datetime_fields(activities)
return json.dumps(converted_activities)
except ApiException as e:
logger.error(f"API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception: {str(e)}")
return json.dumps({"error": str(e)})
def get_recent_engagements(self, days: int = 7, limit: int = 50) -> str:
"""Get recent engagements across all contacts/companies
Args:
days: Number of days to look back (default: 7)
limit: Maximum number of engagements to return (default: 50)
"""
try:
# Calculate the date range (past N days)
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# Format timestamps for API call
start_timestamp = int(start_time.timestamp() * 1000) # Convert to milliseconds
end_timestamp = int(end_time.timestamp() * 1000) # Convert to milliseconds
# Get all recent engagements
engagements_response = self.client.api_request({
"method": "GET",
"path": f"/engagements/v1/engagements/recent/modified",
"params": {
"count": limit,
"since": start_timestamp,
"offset": 0
}
}).json()
# Format the engagements similar to company_activity
formatted_engagements = []
for engagement in engagements_response.get('results', []):
engagement_data = engagement.get('engagement', {})
metadata = engagement.get('metadata', {})
formatted_engagement = {
"id": engagement_data.get("id"),
"type": engagement_data.get("type"),
"created_at": engagement_data.get("createdAt"),
"last_updated": engagement_data.get("lastUpdated"),
"created_by": engagement_data.get("createdBy"),
"modified_by": engagement_data.get("modifiedBy"),
"timestamp": engagement_data.get("timestamp"),
"associations": engagement.get("associations", {})
}
# Add type-specific metadata formatting identical to company_activity
if engagement_data.get("type") == "NOTE":
formatted_engagement["content"] = metadata.get("body", "")
elif engagement_data.get("type") == "EMAIL":
formatted_engagement["content"] = {
"subject": metadata.get("subject", ""),
"from": {
"raw": metadata.get("from", {}).get("raw", ""),
"email": metadata.get("from", {}).get("email", ""),
"firstName": metadata.get("from", {}).get("firstName", ""),
"lastName": metadata.get("from", {}).get("lastName", "")
},
"to": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("to", [])],
"cc": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("cc", [])],
"bcc": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("bcc", [])],
"sender": {
"email": metadata.get("sender", {}).get("email", "")
},
"body": metadata.get("text", "") or metadata.get("html", "")
}
elif engagement_data.get("type") == "TASK":
formatted_engagement["content"] = {
"subject": metadata.get("subject", ""),
"body": metadata.get("body", ""),
"status": metadata.get("status", ""),
"for_object_type": metadata.get("forObjectType", "")
}
elif engagement_data.get("type") == "MEETING":
formatted_engagement["content"] = {
"title": metadata.get("title", ""),
"body": metadata.get("body", ""),
"start_time": metadata.get("startTime"),
"end_time": metadata.get("endTime"),
"internal_notes": metadata.get("internalMeetingNotes", "")
}
elif engagement_data.get("type") == "CALL":
formatted_engagement["content"] = {
"body": metadata.get("body", ""),
"from_number": metadata.get("fromNumber", ""),
"to_number": metadata.get("toNumber", ""),
"duration_ms": metadata.get("durationMilliseconds"),
"status": metadata.get("status", ""),
"disposition": metadata.get("disposition", "")
}
formatted_engagements.append(formatted_engagement)
# Convert any datetime fields and return
converted_engagements = convert_datetime_fields(formatted_engagements)
return json.dumps(converted_engagements)
except ApiException as e:
logger.error(f"API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception: {str(e)}")
return json.dumps({"error": str(e)})
async def main(access_token: Optional[str] = None):
"""Run the HubSpot MCP server."""
logger.info("Server starting")
hubspot = HubSpotClient(access_token)
server = Server("hubspot-manager")
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
return []
@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> str:
if uri.scheme != "hubspot":
raise ValueError(f"Unsupported URI scheme: {uri.scheme}")
path = str(uri).replace("hubspot://", "")
return ""
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="hubspot_create_contact",
description="Create a new contact in HubSpot",
inputSchema={
"type": "object",
"properties": {
"firstname": {"type": "string", "description": "Contact's first name"},
"lastname": {"type": "string", "description": "Contact's last name"},
"email": {"type": "string", "description": "Contact's email address"},
"properties": {"type": "object", "description": "Additional contact properties"}
},
"required": ["firstname", "lastname"]
},
),
types.Tool(
name="hubspot_create_company",
description="Create a new company in HubSpot",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Company name"},
"properties": {"type": "object", "description": "Additional company properties"}
},
"required": ["name"]
},
),
types.Tool(
name="hubspot_get_company_activity",
description="Get activity history for a specific company",
inputSchema={
"type": "object",
"properties": {
"company_id": {"type": "string", "description": "HubSpot company ID"}
},
"required": ["company_id"]
},
),
types.Tool(
name="hubspot_get_recent_engagements",
description="Get recent engagement activities across all contacts and companies",
inputSchema={
"type": "object",
"properties": {
"days": {"type": "integer", "description": "Number of days to look back (default: 7)"},
"limit": {"type": "integer", "description": "Maximum number of engagements to return (default: 50)"}
},
},
),
types.Tool(
name="hubspot_get_active_companies",
description="Get most recently active companies from HubSpot",
inputSchema={
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Maximum number of companies to return (default: 10)"}
},
},
),
types.Tool(
name="hubspot_get_active_contacts",
description="Get most recently active contacts from HubSpot",
inputSchema={
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Maximum number of contacts to return (default: 10)"}
},
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict[str, Any] | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""Handle tool execution requests"""
try:
if name == "hubspot_create_contact":
if not arguments:
raise ValueError("Missing arguments for create_contact")
firstname = arguments["firstname"]
lastname = arguments["lastname"]
company = arguments.get("properties", {}).get("company")
# Search for existing contacts with same name and company
try:
from hubspot.crm.contacts import PublicObjectSearchRequest
filter_groups = [{
"filters": [
{
"propertyName": "firstname",
"operator": "EQ",
"value": firstname
},
{
"propertyName": "lastname",
"operator": "EQ",
"value": lastname
}
]
}]
# Add company filter if provided
if company:
filter_groups[0]["filters"].append({
"propertyName": "company",
"operator": "EQ",
"value": company
})
search_request = PublicObjectSearchRequest(
filter_groups=filter_groups
)
search_response = hubspot.client.crm.contacts.search_api.do_search(
public_object_search_request=search_request
)
if search_response.total > 0:
# Contact already exists
return [types.TextContent(
type="text",
text=f"Contact already exists: {search_response.results[0].to_dict()}"
)]
# If no existing contact found, proceed with creation
properties = {
"firstname": firstname,
"lastname": lastname
}
# Add email if provided
if "email" in arguments:
properties["email"] = arguments["email"]
# Add any additional properties
if "properties" in arguments:
properties.update(arguments["properties"])
# Create contact using SimplePublicObjectInputForCreate
simple_public_object_input = SimplePublicObjectInputForCreate(
properties=properties
)
api_response = hubspot.client.crm.contacts.basic_api.create(
simple_public_object_input_for_create=simple_public_object_input
)
return [types.TextContent(type="text", text=str(api_response.to_dict()))]
except ApiException as e:
return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")]
elif name == "hubspot_create_company":
if not arguments:
raise ValueError("Missing arguments for create_company")
company_name = arguments["name"]
# Search for existing companies with same name
try:
from hubspot.crm.companies import PublicObjectSearchRequest
search_request = PublicObjectSearchRequest(
filter_groups=[{
"filters": [
{
"propertyName": "name",
"operator": "EQ",
"value": company_name
}
]
}]
)
search_response = hubspot.client.crm.companies.search_api.do_search(
public_object_search_request=search_request
)
if search_response.total > 0:
# Company already exists
return [types.TextContent(
type="text",
text=f"Company already exists: {search_response.results[0].to_dict()}"
)]
# If no existing company found, proceed with creation
properties = {
"name": company_name
}
# Add any additional properties
if "properties" in arguments:
properties.update(arguments["properties"])
# Create company using SimplePublicObjectInputForCreate
simple_public_object_input = SimplePublicObjectInputForCreate(
properties=properties
)
api_response = hubspot.client.crm.companies.basic_api.create(
simple_public_object_input_for_create=simple_public_object_input
)
return [types.TextContent(type="text", text=str(api_response.to_dict()))]
except ApiException as e:
return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")]
elif name == "hubspot_get_company_activity":
if not arguments:
raise ValueError("Missing arguments for get_company_activity")
results = hubspot.get_company_activity(arguments["company_id"])
return [types.TextContent(type="text", text=results)]
elif name == "hubspot_get_recent_engagements":
# Extract parameters with defaults if not provided
days = arguments.get("days", 7) if arguments else 7
limit = arguments.get("limit", 50) if arguments else 50
# Ensure days and limit are integers
days = int(days) if days is not None else 7
limit = int(limit) if limit is not None else 50
# Get recent engagements
results = hubspot.get_recent_engagements(days=days, limit=limit)
return [types.TextContent(type="text", text=results)]
elif name == "hubspot_get_active_companies":
# Extract parameters with defaults if not provided
limit = arguments.get("limit", 10) if arguments else 10
# Ensure limit is an integer
limit = int(limit) if limit is not None else 10
# Get recent companies
results = hubspot.get_recent_companies(limit=limit)
return [types.TextContent(type="text", text=results)]
elif name == "hubspot_get_active_contacts":
# Extract parameters with defaults if not provided
limit = arguments.get("limit", 10) if arguments else 10
# Ensure limit is an integer
limit = int(limit) if limit is not None else 10
# Get recent contacts
results = hubspot.get_recent_contacts(limit=limit)
return [types.TextContent(type="text", text=results)]
else:
raise ValueError(f"Unknown tool: {name}")
except ApiException as e:
return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")]
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server running with stdio transport")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="hubspot",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
import asyncio
import argparse
# Set up command line argument parser
parser = argparse.ArgumentParser(description="Run the HubSpot MCP server")
parser.add_argument("--access-token",
help="HubSpot API access token (overrides HUBSPOT_ACCESS_TOKEN environment variable)")
args = parser.parse_args()
asyncio.run(main(access_token=args.access_token))