HubSpot MCP Server
by KaranThink41
Verified
- HubSpot_MCP
- src
- mcp_server_hubspot
import logging
from typing import Any, Dict, List, Optional
import os
from dotenv import load_dotenv
from hubspot import HubSpot
from hubspot.crm.contacts import SimplePublicObjectInputForCreate
from hubspot.crm.contacts.exceptions import ApiException
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from pydantic import AnyUrl
import json
from datetime import datetime, timedelta
from dateutil.tz import tzlocal
import argparse
# Load environment variables if needed
load_dotenv()
logger = logging.getLogger('mcp_hubspot_server')
def convert_datetime_fields(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: convert_datetime_fields(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_datetime_fields(item) for item in obj]
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, tzlocal):
offset = datetime.now(tzlocal()).strftime('%z')
return f"UTC{offset[:3]}:{offset[3:]}"
return obj
class HubSpotClient:
def __init__(self, access_token: Optional[str] = None):
access_token = access_token or os.getenv("HUBSPOT_ACCESS_TOKEN")
logger.debug(f"Using access token: {'[MASKED]' if access_token else 'None'}")
if not access_token:
raise ValueError("HUBSPOT_ACCESS_TOKEN environment variable is required")
self.client = HubSpot(access_token=access_token)
def get_recent_companies(self, limit: int = 10) -> str:
try:
from hubspot.crm.companies import PublicObjectSearchRequest
search_request = PublicObjectSearchRequest(
sorts=[{"propertyName": "lastmodifieddate", "direction": "DESCENDING"}],
limit=limit,
properties=["name", "domain", "website", "phone", "industry", "hs_lastmodifieddate"]
)
search_response = self.client.crm.companies.search_api.do_search(public_object_search_request=search_request)
companies_dict = [company.to_dict() for company in search_response.results]
converted_companies = convert_datetime_fields(companies_dict)
return json.dumps(converted_companies)
except ApiException as e:
logger.error(f"API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception: {str(e)}")
return json.dumps({"error": str(e)})
def get_recent_contacts(self, limit: int = 10) -> str:
try:
from hubspot.crm.contacts import PublicObjectSearchRequest
search_request = PublicObjectSearchRequest(
sorts=[{"propertyName": "lastmodifieddate", "direction": "DESCENDING"}],
limit=limit,
properties=["firstname", "lastname", "email", "phone", "company", "hs_lastmodifieddate", "lastmodifieddate"]
)
search_response = self.client.crm.contacts.search_api.do_search(public_object_search_request=search_request)
contacts_dict = [contact.to_dict() for contact in search_response.results]
converted_contacts = convert_datetime_fields(contacts_dict)
return json.dumps(converted_contacts)
except ApiException as e:
logger.error(f"API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception: {str(e)}")
return json.dumps({"error": str(e)})
def get_company_activity(self, company_id: str) -> str:
try:
associated_engagements = self.client.crm.associations.v4.basic_api.get_page(
object_type="companies",
object_id=company_id,
to_object_type="engagements",
limit=500
)
engagement_ids = []
if hasattr(associated_engagements, 'results'):
for result in associated_engagements.results:
engagement_ids.append(result.to_object_id)
activities = []
for engagement_id in engagement_ids:
engagement_response = self.client.api_request({
"method": "GET",
"path": f"/engagements/v1/engagements/{engagement_id}"
}).json()
engagement_data = engagement_response.get('engagement', {})
metadata = engagement_response.get('metadata', {})
formatted_engagement = {
"id": engagement_data.get("id"),
"type": engagement_data.get("type"),
"created_at": engagement_data.get("createdAt"),
"last_updated": engagement_data.get("lastUpdated"),
"created_by": engagement_data.get("createdBy"),
"modified_by": engagement_data.get("modifiedBy"),
"timestamp": engagement_data.get("timestamp"),
"associations": engagement_response.get("associations", {})
}
if engagement_data.get("type") == "NOTE":
formatted_engagement["content"] = metadata.get("body", "")
elif engagement_data.get("type") == "EMAIL":
formatted_engagement["content"] = {
"subject": metadata.get("subject", ""),
"from": {
"raw": metadata.get("from", {}).get("raw", ""),
"email": metadata.get("from", {}).get("email", ""),
"firstName": metadata.get("from", {}).get("firstName", ""),
"lastName": metadata.get("from", {}).get("lastName", "")
},
"to": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("to", [])],
"cc": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("cc", [])],
"bcc": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("bcc", [])],
"sender": {"email": metadata.get("sender", {}).get("email", "")},
"body": metadata.get("text", "") or metadata.get("html", "")
}
elif engagement_data.get("type") == "TASK":
formatted_engagement["content"] = {
"subject": metadata.get("subject", ""),
"body": metadata.get("body", ""),
"status": metadata.get("status", ""),
"for_object_type": metadata.get("forObjectType", "")
}
elif engagement_data.get("type") == "MEETING":
formatted_engagement["content"] = {
"title": metadata.get("title", ""),
"body": metadata.get("body", ""),
"start_time": metadata.get("startTime"),
"end_time": metadata.get("endTime"),
"internal_notes": metadata.get("internalMeetingNotes", "")
}
elif engagement_data.get("type") == "CALL":
formatted_engagement["content"] = {
"body": metadata.get("body", ""),
"from_number": metadata.get("fromNumber", ""),
"to_number": metadata.get("toNumber", ""),
"duration_ms": metadata.get("durationMilliseconds"),
"status": metadata.get("status", ""),
"disposition": metadata.get("disposition", "")
}
activities.append(formatted_engagement)
converted_activities = convert_datetime_fields(activities)
return json.dumps(converted_activities)
except ApiException as e:
logger.error(f"API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception: {str(e)}")
return json.dumps({"error": str(e)})
def get_recent_engagements(self, days: int = 7, limit: int = 50) -> str:
try:
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
start_timestamp = int(start_time.timestamp() * 1000)
end_timestamp = int(end_time.timestamp() * 1000)
engagements_response = self.client.api_request({
"method": "GET",
"path": f"/engagements/v1/engagements/recent/modified",
"params": {"count": limit, "since": start_timestamp, "offset": 0}
}).json()
formatted_engagements = []
for engagement in engagements_response.get('results', []):
engagement_data = engagement.get('engagement', {})
metadata = engagement.get('metadata', {})
formatted_engagement = {
"id": engagement_data.get("id"),
"type": engagement_data.get("type"),
"created_at": engagement_data.get("createdAt"),
"last_updated": engagement_data.get("lastUpdated"),
"created_by": engagement_data.get("createdBy"),
"modified_by": engagement_data.get("modifiedBy"),
"timestamp": engagement_data.get("timestamp"),
"associations": engagement.get("associations", {})
}
if engagement_data.get("type") == "NOTE":
formatted_engagement["content"] = metadata.get("body", "")
elif engagement_data.get("type") == "EMAIL":
formatted_engagement["content"] = {
"subject": metadata.get("subject", ""),
"from": {
"raw": metadata.get("from", {}).get("raw", ""),
"email": metadata.get("from", {}).get("email", ""),
"firstName": metadata.get("from", {}).get("firstName", ""),
"lastName": metadata.get("from", {}).get("lastName", "")
},
"to": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("to", [])],
"cc": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("cc", [])],
"bcc": [{
"raw": recipient.get("raw", ""),
"email": recipient.get("email", ""),
"firstName": recipient.get("firstName", ""),
"lastName": recipient.get("lastName", "")
} for recipient in metadata.get("bcc", [])],
"sender": {"email": metadata.get("sender", {}).get("email", "")},
"body": metadata.get("text", "") or metadata.get("html", "")
}
elif engagement_data.get("type") == "TASK":
formatted_engagement["content"] = {
"subject": metadata.get("subject", ""),
"body": metadata.get("body", ""),
"status": metadata.get("status", ""),
"for_object_type": metadata.get("forObjectType", "")
}
elif engagement_data.get("type") == "MEETING":
formatted_engagement["content"] = {
"title": metadata.get("title", ""),
"body": metadata.get("body", ""),
"start_time": metadata.get("startTime"),
"end_time": metadata.get("endTime"),
"internal_notes": metadata.get("internalMeetingNotes", "")
}
elif engagement_data.get("type") == "CALL":
formatted_engagement["content"] = {
"body": metadata.get("body", ""),
"from_number": metadata.get("fromNumber", ""),
"to_number": metadata.get("toNumber", ""),
"duration_ms": metadata.get("durationMilliseconds"),
"status": metadata.get("status", ""),
"disposition": metadata.get("disposition", "")
}
formatted_engagements.append(formatted_engagement)
converted_engagements = convert_datetime_fields(formatted_engagements)
return json.dumps(converted_engagements)
except ApiException as e:
logger.error(f"API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception: {str(e)}")
return json.dumps({"error": str(e)})
# Import shared space functionality from our module
from .shared_space import create_summary, get_summaries, update_summary, delete_summary
async def main(access_token: Optional[str] = None):
logger.info("Server starting")
hubspot = HubSpotClient(access_token)
server = Server("hubspot-manager")
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
return []
@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> str:
if uri.scheme != "hubspot":
raise ValueError(f"Unsupported URI scheme: {uri.scheme}")
path = str(uri).replace("hubspot://", "")
return ""
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools including HubSpot and shared space tools."""
return [
types.Tool(
name="hubspot_create_contact",
description="Create a new contact in HubSpot",
inputSchema={
"type": "object",
"properties": {
"firstname": {"type": "string", "description": "Contact's first name"},
"lastname": {"type": "string", "description": "Contact's last name"},
"email": {"type": "string", "description": "Contact's email address"},
"properties": {"type": "object", "description": "Additional contact properties"}
},
"required": ["firstname", "lastname"]
},
),
types.Tool(
name="hubspot_create_company",
description="Create a new company in HubSpot",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Company name"},
"properties": {"type": "object", "description": "Additional company properties"}
},
"required": ["name"]
},
),
types.Tool(
name="hubspot_get_company_activity",
description="Get activity history for a specific company",
inputSchema={
"type": "object",
"properties": {
"company_id": {"type": "string", "description": "HubSpot company ID"}
},
"required": ["company_id"]
},
),
types.Tool(
name="hubspot_get_recent_engagements",
description="Get recent engagement activities across all contacts and companies",
inputSchema={
"type": "object",
"properties": {
"days": {"type": "integer", "description": "Number of days to look back (default: 7)"},
"limit": {"type": "integer", "description": "Maximum number of engagements to return (default: 50)"}
},
},
),
types.Tool(
name="hubspot_get_active_companies",
description="Get most recently active companies from HubSpot",
inputSchema={
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Maximum number of companies to return (default: 10)"}
},
},
),
types.Tool(
name="hubspot_get_active_contacts",
description="Get most recently active contacts from HubSpot",
inputSchema={
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Maximum number of contacts to return (default: 10)"}
},
},
),
# Shared space tools
types.Tool(
name="create_shared_summary",
description="Create a new conversation summary in the shared space",
inputSchema={
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "User email/ID creating the summary"},
"company_id": {"type": "string", "description": "Company ID"},
"summary": {"type": "string", "description": "The conversation summary text"}
},
"required": ["user_id", "company_id", "summary"]
}
),
types.Tool(
name="get_shared_summaries",
description="Retrieve all conversation summaries for the company",
inputSchema={
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "User email/ID requesting summaries"},
"company_id": {"type": "string", "description": "Company ID"}
},
"required": ["user_id", "company_id"]
}
),
# Additional tools for update/delete can be added similarly
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
try:
if name == "hubspot_create_contact":
if not arguments:
raise ValueError("Missing arguments for create_contact")
firstname = arguments["firstname"]
lastname = arguments["lastname"]
company = arguments.get("properties", {}).get("company")
try:
from hubspot.crm.contacts import PublicObjectSearchRequest
filter_groups = [{
"filters": [
{"propertyName": "firstname", "operator": "EQ", "value": firstname},
{"propertyName": "lastname", "operator": "EQ", "value": lastname}
]
}]
if company:
filter_groups[0]["filters"].append({
"propertyName": "company", "operator": "EQ", "value": company
})
search_request = PublicObjectSearchRequest(filter_groups=filter_groups)
search_response = hubspot.client.crm.contacts.search_api.do_search(public_object_search_request=search_request)
if search_response.total > 0:
return [types.TextContent(type="text", text=f"Contact already exists: {search_response.results[0].to_dict()}")]
properties = {"firstname": firstname, "lastname": lastname}
if "email" in arguments:
properties["email"] = arguments["email"]
if "properties" in arguments:
properties.update(arguments["properties"])
simple_public_object_input = SimplePublicObjectInputForCreate(properties=properties)
api_response = hubspot.client.crm.contacts.basic_api.create(simple_public_object_input_for_create=simple_public_object_input)
return [types.TextContent(type="text", text=str(api_response.to_dict()))]
except ApiException as e:
return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")]
elif name == "hubspot_create_company":
if not arguments:
raise ValueError("Missing arguments for create_company")
company_name = arguments["name"]
try:
from hubspot.crm.companies import PublicObjectSearchRequest
search_request = PublicObjectSearchRequest(
filter_groups=[{"filters": [{"propertyName": "name", "operator": "EQ", "value": company_name}]}]
)
search_response = hubspot.client.crm.companies.search_api.do_search(public_object_search_request=search_request)
if search_response.total > 0:
return [types.TextContent(type="text", text=f"Company already exists: {search_response.results[0].to_dict()}")]
properties = {"name": company_name}
if "properties" in arguments:
properties.update(arguments["properties"])
simple_public_object_input = SimplePublicObjectInputForCreate(properties=properties)
api_response = hubspot.client.crm.companies.basic_api.create(simple_public_object_input_for_create=simple_public_object_input)
return [types.TextContent(type="text", text=str(api_response.to_dict()))]
except ApiException as e:
return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")]
elif name == "hubspot_get_company_activity":
if not arguments:
raise ValueError("Missing arguments for get_company_activity")
results = hubspot.get_company_activity(arguments["company_id"])
return [types.TextContent(type="text", text=results)]
elif name == "hubspot_get_recent_engagements":
days = arguments.get("days", 7) if arguments else 7
limit = arguments.get("limit", 50) if arguments else 50
days = int(days) if days is not None else 7
limit = int(limit) if limit is not None else 50
results = hubspot.get_recent_engagements(days=days, limit=limit)
return [types.TextContent(type="text", text=results)]
elif name == "hubspot_get_active_companies":
limit = arguments.get("limit", 10) if arguments else 10
limit = int(limit) if limit is not None else 10
results = hubspot.get_recent_companies(limit=limit)
return [types.TextContent(type="text", text=results)]
elif name == "hubspot_get_active_contacts":
limit = arguments.get("limit", 10) if arguments else 10
limit = int(limit) if limit is not None else 10
results = hubspot.get_recent_contacts(limit=limit)
return [types.TextContent(type="text", text=results)]
# Shared space tools handling
elif name == "create_shared_summary":
if not arguments:
raise ValueError("Missing arguments for create_shared_summary")
user_id = arguments["user_id"]
company_id = arguments["company_id"]
summary_text = arguments["summary"]
try:
from .shared_space import create_summary
result = create_summary(user_id, company_id, summary_text)
return [types.TextContent(type="text", text=json.dumps(result))]
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
elif name == "get_shared_summaries":
if not arguments:
raise ValueError("Missing arguments for get_shared_summaries")
user_id = arguments["user_id"]
company_id = arguments["company_id"]
try:
from .shared_space import get_summaries
result = get_summaries(user_id, company_id)
return [types.TextContent(type="text", text=json.dumps(result))]
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
else:
raise ValueError(f"Unknown tool: {name}")
except ApiException as e:
return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")]
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server running with stdio transport")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="hubspot",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
import asyncio
import argparse
parser = argparse.ArgumentParser(description="Run the HubSpot MCP server")
parser.add_argument("--access-token", help="HubSpot API access token (overrides HUBSPOT_ACCESS_TOKEN environment variable)")
args = parser.parse_args()
asyncio.run(main(access_token=args.access_token))