HubSpot MCP Server

by SheffieldP
Verified
import logging from typing import Any, Dict, List, Optional import os from dotenv import load_dotenv from hubspot import HubSpot from hubspot.crm.contacts import SimplePublicObjectInputForCreate from hubspot.crm.contacts.exceptions import ApiException from mcp.server.models import InitializationOptions import mcp.types as types from mcp.server import NotificationOptions, Server import mcp.server.stdio from pydantic import AnyUrl import json from datetime import datetime, timedelta from dateutil.tz import tzlocal logger = logging.getLogger('mcp_hubspot_server') def convert_datetime_fields(obj: Any) -> Any: """Convert any datetime or tzlocal objects to string in the given object""" if isinstance(obj, dict): return {k: convert_datetime_fields(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_datetime_fields(item) for item in obj] elif isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, tzlocal): # Get the current timezone offset offset = datetime.now(tzlocal()).strftime('%z') return f"UTC{offset[:3]}:{offset[3:]}" # Format like "UTC+08:00" or "UTC-05:00" return obj class HubSpotClient: def __init__(self, access_token: str): logger.debug(f"Using access token: {'[MASKED]' if access_token else 'None'}") if not access_token: raise ValueError("HubSpot access token is required. It must be provided as an argument.") # Initialize HubSpot client with the provided token # This allows for multi-user support by passing user-specific tokens self.client = HubSpot(access_token=access_token) def get_contacts(self) -> str: """Get all contacts from HubSpot (requires optional crm.objects.contacts.read scope)""" try: contacts = self.client.crm.contacts.get_all() contacts_dict = [contact.to_dict() for contact in contacts] converted_contacts = convert_datetime_fields(contacts_dict) return json.dumps(converted_contacts) except ApiException as e: logger.error(f"API Exception in get_contacts: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception in get_contacts: {str(e)}") return json.dumps({"error": str(e)}) def get_companies(self) -> str: """Get all companies from HubSpot (requires optional crm.objects.companies.read scope)""" try: companies = self.client.crm.companies.get_all() companies_dict = [company.to_dict() for company in companies] converted_companies = convert_datetime_fields(companies_dict) return json.dumps(converted_companies) except ApiException as e: logger.error(f"API Exception in get_companies: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception in get_companies: {str(e)}") return json.dumps({"error": str(e)}) def get_company_activity(self, company_id: str) -> str: """Get activity history for a specific company (requires optional crm.objects.companies.read scope)""" try: # Note: This method only requires standard read scopes, not sensitive scopes # Step 1: Get all engagement IDs associated with the company using CRM Associations v4 API associated_engagements = self.client.crm.associations.v4.basic_api.get_page( object_type="companies", object_id=company_id, to_object_type="engagements", limit=500 ) # Extract engagement IDs from the associations response engagement_ids = [] if hasattr(associated_engagements, 'results'): for result in associated_engagements.results: engagement_ids.append(result.to_object_id) # Step 2: Get detailed information for each engagement activities = [] for engagement_id in engagement_ids: engagement_response = self.client.api_request({ "method": "GET", "path": f"/engagements/v1/engagements/{engagement_id}" }).json() engagement_data = engagement_response.get('engagement', {}) metadata = engagement_response.get('metadata', {}) # Format the engagement formatted_engagement = { "id": engagement_data.get("id"), "type": engagement_data.get("type"), "created_at": engagement_data.get("createdAt"), "last_updated": engagement_data.get("lastUpdated"), "created_by": engagement_data.get("createdBy"), "modified_by": engagement_data.get("modifiedBy"), "timestamp": engagement_data.get("timestamp"), "associations": engagement_response.get("associations", {}) } # Add type-specific metadata formatting if engagement_data.get("type") == "NOTE": formatted_engagement["content"] = metadata.get("body", "") elif engagement_data.get("type") == "EMAIL": formatted_engagement["content"] = { "subject": metadata.get("subject", ""), "from": { "raw": metadata.get("from", {}).get("raw", ""), "email": metadata.get("from", {}).get("email", ""), "firstName": metadata.get("from", {}).get("firstName", ""), "lastName": metadata.get("from", {}).get("lastName", "") }, "to": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("to", [])], "cc": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("cc", [])], "bcc": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("bcc", [])], "sender": { "email": metadata.get("sender", {}).get("email", "") }, "body": metadata.get("text", "") or metadata.get("html", "") } elif engagement_data.get("type") == "TASK": formatted_engagement["content"] = { "subject": metadata.get("subject", ""), "body": metadata.get("body", ""), "status": metadata.get("status", ""), "for_object_type": metadata.get("forObjectType", "") } elif engagement_data.get("type") == "MEETING": formatted_engagement["content"] = { "title": metadata.get("title", ""), "body": metadata.get("body", ""), "start_time": metadata.get("startTime"), "end_time": metadata.get("endTime"), "internal_notes": metadata.get("internalMeetingNotes", "") } elif engagement_data.get("type") == "CALL": formatted_engagement["content"] = { "body": metadata.get("body", ""), "from_number": metadata.get("fromNumber", ""), "to_number": metadata.get("toNumber", ""), "duration_ms": metadata.get("durationMilliseconds"), "status": metadata.get("status", ""), "disposition": metadata.get("disposition", "") } activities.append(formatted_engagement) # Convert any datetime fields and return converted_activities = convert_datetime_fields(activities) return json.dumps(converted_activities) except ApiException as e: logger.error(f"API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception: {str(e)}") return json.dumps({"error": str(e)}) async def main(access_token: str): """ Run the HubSpot MCP server. Args: access_token: HubSpot access token Note: This server requires the following HubSpot scopes: Required: - oauth Optional: - crm.dealsplits.read_write - crm.objects.companies.read - crm.objects.companies.write - crm.objects.contacts.read - crm.objects.contacts.write - crm.objects.deals.read """ logger.info("Server starting") hubspot = HubSpotClient(access_token) server = Server("hubspot-manager") @server.list_resources() async def handle_list_resources() -> list[types.Resource]: return [ types.Resource( uri=AnyUrl("hubspot://hubspot_contacts"), name="HubSpot Contacts", description="List of HubSpot contacts (requires optional crm.objects.contacts.read scope)", mimeType="application/json", ), types.Resource( uri=AnyUrl("hubspot://hubspot_companies"), name="HubSpot Companies", description="List of HubSpot companies (requires optional crm.objects.companies.read scope)", mimeType="application/json", ), ] @server.read_resource() async def handle_read_resource(uri: AnyUrl) -> str: if uri.scheme != "hubspot": raise ValueError(f"Unsupported URI scheme: {uri.scheme}") path = str(uri).replace("hubspot://", "") if path == "hubspot_contacts": return str(hubspot.get_contacts()) elif path == "hubspot_companies": return str(hubspot.get_companies()) else: raise ValueError(f"Unknown resource path: {path}") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools""" return [ types.Tool( name="hubspot_get_contacts", description="Get contacts from HubSpot (requires optional crm.objects.contacts.read scope)", inputSchema={ "type": "object", "properties": {}, }, ), types.Tool( name="hubspot_create_contact", description="Create a new contact in HubSpot (requires optional crm.objects.contacts.write scope)", inputSchema={ "type": "object", "properties": { "firstname": {"type": "string", "description": "Contact's first name"}, "lastname": {"type": "string", "description": "Contact's last name"}, "email": {"type": "string", "description": "Contact's email address"}, "properties": {"type": "object", "description": "Additional contact properties"} }, "required": ["firstname", "lastname"] }, ), types.Tool( name="hubspot_get_companies", description="Get companies from HubSpot (requires optional crm.objects.companies.read scope)", inputSchema={ "type": "object", "properties": {}, }, ), types.Tool( name="hubspot_create_company", description="Create a new company in HubSpot (requires optional crm.objects.companies.write scope)", inputSchema={ "type": "object", "properties": { "name": {"type": "string", "description": "Company name"}, "properties": {"type": "object", "description": "Additional company properties"} }, "required": ["name"] }, ), types.Tool( name="hubspot_get_company_activity", description="Get activity history for a specific company (requires optional crm.objects.companies.read scope)", inputSchema={ "type": "object", "properties": { "company_id": {"type": "string", "description": "HubSpot company ID"} }, "required": ["company_id"] }, ), ] @server.call_tool() async def handle_call_tool( name: str, arguments: dict[str, Any] | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """Handle tool execution requests""" try: if name == "hubspot_get_contacts": results = hubspot.get_contacts() return [types.TextContent(type="text", text=str(results))] elif name == "hubspot_create_contact": if not arguments: raise ValueError("Missing arguments for create_contact") firstname = arguments["firstname"] lastname = arguments["lastname"] company = arguments.get("properties", {}).get("company") # Search for existing contacts with same name and company try: from hubspot.crm.contacts import PublicObjectSearchRequest filter_groups = [{ "filters": [ { "propertyName": "firstname", "operator": "EQ", "value": firstname }, { "propertyName": "lastname", "operator": "EQ", "value": lastname } ] }] # Add company filter if provided if company: filter_groups[0]["filters"].append({ "propertyName": "company", "operator": "EQ", "value": company }) search_request = PublicObjectSearchRequest( filter_groups=filter_groups ) search_response = hubspot.client.crm.contacts.search_api.do_search( public_object_search_request=search_request ) if search_response.total > 0: # Contact already exists return [types.TextContent( type="text", text=f"Contact already exists: {search_response.results[0].to_dict()}" )] # If no existing contact found, proceed with creation properties = { "firstname": firstname, "lastname": lastname } # Add email if provided if "email" in arguments: properties["email"] = arguments["email"] # Add any additional properties if "properties" in arguments: properties.update(arguments["properties"]) # Create contact using SimplePublicObjectInputForCreate simple_public_object_input = SimplePublicObjectInputForCreate( properties=properties ) api_response = hubspot.client.crm.contacts.basic_api.create( simple_public_object_input_for_create=simple_public_object_input ) return [types.TextContent(type="text", text=str(api_response.to_dict()))] except ApiException as e: return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")] elif name == "hubspot_get_companies": results = hubspot.get_companies() return [types.TextContent(type="text", text=str(results))] elif name == "hubspot_create_company": if not arguments: raise ValueError("Missing arguments for create_company") company_name = arguments["name"] # Search for existing companies with same name try: from hubspot.crm.companies import PublicObjectSearchRequest search_request = PublicObjectSearchRequest( filter_groups=[{ "filters": [ { "propertyName": "name", "operator": "EQ", "value": company_name } ] }] ) search_response = hubspot.client.crm.companies.search_api.do_search( public_object_search_request=search_request ) if search_response.total > 0: # Company already exists return [types.TextContent( type="text", text=f"Company already exists: {search_response.results[0].to_dict()}" )] # If no existing company found, proceed with creation properties = { "name": company_name } # Add any additional properties if "properties" in arguments: properties.update(arguments["properties"]) # Create company using SimplePublicObjectInputForCreate simple_public_object_input = SimplePublicObjectInputForCreate( properties=properties ) api_response = hubspot.client.crm.companies.basic_api.create( simple_public_object_input_for_create=simple_public_object_input ) return [types.TextContent(type="text", text=str(api_response.to_dict()))] except ApiException as e: return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")] elif name == "hubspot_get_company_activity": if not arguments: raise ValueError("Missing arguments for get_company_activity") results = hubspot.get_company_activity(arguments["company_id"]) return [types.TextContent(type="text", text=results)] else: raise ValueError(f"Unknown tool: {name}") except ApiException as e: return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): logger.info("Server running with stdio transport") await server.run( read_stream, write_stream, InitializationOptions( server_name="hubspot", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": import asyncio import sys if len(sys.argv) != 2: print("Usage: python server.py <access_token>") sys.exit(1) asyncio.run(main(sys.argv[1]))