HubSpot MCP Server

by KaranThink41
Verified
MIT License
  • Linux
  • Apple
import logging from typing import Any, Dict, List, Optional import os from dotenv import load_dotenv from hubspot import HubSpot from hubspot.crm.contacts import SimplePublicObjectInputForCreate from hubspot.crm.contacts.exceptions import ApiException from mcp.server.models import InitializationOptions import mcp.types as types from mcp.server import NotificationOptions, Server import mcp.server.stdio from pydantic import AnyUrl import json from datetime import datetime, timedelta from dateutil.tz import tzlocal import argparse # Load environment variables if needed load_dotenv() logger = logging.getLogger('mcp_hubspot_server') def convert_datetime_fields(obj: Any) -> Any: if isinstance(obj, dict): return {k: convert_datetime_fields(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_datetime_fields(item) for item in obj] elif isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, tzlocal): offset = datetime.now(tzlocal()).strftime('%z') return f"UTC{offset[:3]}:{offset[3:]}" return obj class HubSpotClient: def __init__(self, access_token: Optional[str] = None): access_token = access_token or os.getenv("HUBSPOT_ACCESS_TOKEN") logger.debug(f"Using access token: {'[MASKED]' if access_token else 'None'}") if not access_token: raise ValueError("HUBSPOT_ACCESS_TOKEN environment variable is required") self.client = HubSpot(access_token=access_token) def get_recent_companies(self, limit: int = 10) -> str: try: from hubspot.crm.companies import PublicObjectSearchRequest search_request = PublicObjectSearchRequest( sorts=[{"propertyName": "lastmodifieddate", "direction": "DESCENDING"}], limit=limit, properties=["name", "domain", "website", "phone", "industry", "hs_lastmodifieddate"] ) search_response = self.client.crm.companies.search_api.do_search(public_object_search_request=search_request) companies_dict = [company.to_dict() for company in search_response.results] converted_companies = convert_datetime_fields(companies_dict) return json.dumps(converted_companies) except ApiException as e: logger.error(f"API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception: {str(e)}") return json.dumps({"error": str(e)}) def get_recent_contacts(self, limit: int = 10) -> str: try: from hubspot.crm.contacts import PublicObjectSearchRequest search_request = PublicObjectSearchRequest( sorts=[{"propertyName": "lastmodifieddate", "direction": "DESCENDING"}], limit=limit, properties=["firstname", "lastname", "email", "phone", "company", "hs_lastmodifieddate", "lastmodifieddate"] ) search_response = self.client.crm.contacts.search_api.do_search(public_object_search_request=search_request) contacts_dict = [contact.to_dict() for contact in search_response.results] converted_contacts = convert_datetime_fields(contacts_dict) return json.dumps(converted_contacts) except ApiException as e: logger.error(f"API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception: {str(e)}") return json.dumps({"error": str(e)}) def get_company_activity(self, company_id: str) -> str: try: associated_engagements = self.client.crm.associations.v4.basic_api.get_page( object_type="companies", object_id=company_id, to_object_type="engagements", limit=500 ) engagement_ids = [] if hasattr(associated_engagements, 'results'): for result in associated_engagements.results: engagement_ids.append(result.to_object_id) activities = [] for engagement_id in engagement_ids: engagement_response = self.client.api_request({ "method": "GET", "path": f"/engagements/v1/engagements/{engagement_id}" }).json() engagement_data = engagement_response.get('engagement', {}) metadata = engagement_response.get('metadata', {}) formatted_engagement = { "id": engagement_data.get("id"), "type": engagement_data.get("type"), "created_at": engagement_data.get("createdAt"), "last_updated": engagement_data.get("lastUpdated"), "created_by": engagement_data.get("createdBy"), "modified_by": engagement_data.get("modifiedBy"), "timestamp": engagement_data.get("timestamp"), "associations": engagement_response.get("associations", {}) } if engagement_data.get("type") == "NOTE": formatted_engagement["content"] = metadata.get("body", "") elif engagement_data.get("type") == "EMAIL": formatted_engagement["content"] = { "subject": metadata.get("subject", ""), "from": { "raw": metadata.get("from", {}).get("raw", ""), "email": metadata.get("from", {}).get("email", ""), "firstName": metadata.get("from", {}).get("firstName", ""), "lastName": metadata.get("from", {}).get("lastName", "") }, "to": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("to", [])], "cc": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("cc", [])], "bcc": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("bcc", [])], "sender": {"email": metadata.get("sender", {}).get("email", "")}, "body": metadata.get("text", "") or metadata.get("html", "") } elif engagement_data.get("type") == "TASK": formatted_engagement["content"] = { "subject": metadata.get("subject", ""), "body": metadata.get("body", ""), "status": metadata.get("status", ""), "for_object_type": metadata.get("forObjectType", "") } elif engagement_data.get("type") == "MEETING": formatted_engagement["content"] = { "title": metadata.get("title", ""), "body": metadata.get("body", ""), "start_time": metadata.get("startTime"), "end_time": metadata.get("endTime"), "internal_notes": metadata.get("internalMeetingNotes", "") } elif engagement_data.get("type") == "CALL": formatted_engagement["content"] = { "body": metadata.get("body", ""), "from_number": metadata.get("fromNumber", ""), "to_number": metadata.get("toNumber", ""), "duration_ms": metadata.get("durationMilliseconds"), "status": metadata.get("status", ""), "disposition": metadata.get("disposition", "") } activities.append(formatted_engagement) converted_activities = convert_datetime_fields(activities) return json.dumps(converted_activities) except ApiException as e: logger.error(f"API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception: {str(e)}") return json.dumps({"error": str(e)}) def get_recent_engagements(self, days: int = 7, limit: int = 50) -> str: try: end_time = datetime.now() start_time = end_time - timedelta(days=days) start_timestamp = int(start_time.timestamp() * 1000) end_timestamp = int(end_time.timestamp() * 1000) engagements_response = self.client.api_request({ "method": "GET", "path": f"/engagements/v1/engagements/recent/modified", "params": {"count": limit, "since": start_timestamp, "offset": 0} }).json() formatted_engagements = [] for engagement in engagements_response.get('results', []): engagement_data = engagement.get('engagement', {}) metadata = engagement.get('metadata', {}) formatted_engagement = { "id": engagement_data.get("id"), "type": engagement_data.get("type"), "created_at": engagement_data.get("createdAt"), "last_updated": engagement_data.get("lastUpdated"), "created_by": engagement_data.get("createdBy"), "modified_by": engagement_data.get("modifiedBy"), "timestamp": engagement_data.get("timestamp"), "associations": engagement.get("associations", {}) } if engagement_data.get("type") == "NOTE": formatted_engagement["content"] = metadata.get("body", "") elif engagement_data.get("type") == "EMAIL": formatted_engagement["content"] = { "subject": metadata.get("subject", ""), "from": { "raw": metadata.get("from", {}).get("raw", ""), "email": metadata.get("from", {}).get("email", ""), "firstName": metadata.get("from", {}).get("firstName", ""), "lastName": metadata.get("from", {}).get("lastName", "") }, "to": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("to", [])], "cc": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("cc", [])], "bcc": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("bcc", [])], "sender": {"email": metadata.get("sender", {}).get("email", "")}, "body": metadata.get("text", "") or metadata.get("html", "") } elif engagement_data.get("type") == "TASK": formatted_engagement["content"] = { "subject": metadata.get("subject", ""), "body": metadata.get("body", ""), "status": metadata.get("status", ""), "for_object_type": metadata.get("forObjectType", "") } elif engagement_data.get("type") == "MEETING": formatted_engagement["content"] = { "title": metadata.get("title", ""), "body": metadata.get("body", ""), "start_time": metadata.get("startTime"), "end_time": metadata.get("endTime"), "internal_notes": metadata.get("internalMeetingNotes", "") } elif engagement_data.get("type") == "CALL": formatted_engagement["content"] = { "body": metadata.get("body", ""), "from_number": metadata.get("fromNumber", ""), "to_number": metadata.get("toNumber", ""), "duration_ms": metadata.get("durationMilliseconds"), "status": metadata.get("status", ""), "disposition": metadata.get("disposition", "") } formatted_engagements.append(formatted_engagement) converted_engagements = convert_datetime_fields(formatted_engagements) return json.dumps(converted_engagements) except ApiException as e: logger.error(f"API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception: {str(e)}") return json.dumps({"error": str(e)}) # Import shared space functionality from our module from .shared_space import create_summary, get_summaries, update_summary, delete_summary async def main(access_token: Optional[str] = None): logger.info("Server starting") hubspot = HubSpotClient(access_token) server = Server("hubspot-manager") @server.list_resources() async def handle_list_resources() -> list[types.Resource]: return [] @server.read_resource() async def handle_read_resource(uri: AnyUrl) -> str: if uri.scheme != "hubspot": raise ValueError(f"Unsupported URI scheme: {uri.scheme}") path = str(uri).replace("hubspot://", "") return "" @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools including HubSpot and shared space tools.""" return [ types.Tool( name="hubspot_create_contact", description="Create a new contact in HubSpot", inputSchema={ "type": "object", "properties": { "firstname": {"type": "string", "description": "Contact's first name"}, "lastname": {"type": "string", "description": "Contact's last name"}, "email": {"type": "string", "description": "Contact's email address"}, "properties": {"type": "object", "description": "Additional contact properties"} }, "required": ["firstname", "lastname"] }, ), types.Tool( name="hubspot_create_company", description="Create a new company in HubSpot", inputSchema={ "type": "object", "properties": { "name": {"type": "string", "description": "Company name"}, "properties": {"type": "object", "description": "Additional company properties"} }, "required": ["name"] }, ), types.Tool( name="hubspot_get_company_activity", description="Get activity history for a specific company", inputSchema={ "type": "object", "properties": { "company_id": {"type": "string", "description": "HubSpot company ID"} }, "required": ["company_id"] }, ), types.Tool( name="hubspot_get_recent_engagements", description="Get recent engagement activities across all contacts and companies", inputSchema={ "type": "object", "properties": { "days": {"type": "integer", "description": "Number of days to look back (default: 7)"}, "limit": {"type": "integer", "description": "Maximum number of engagements to return (default: 50)"} }, }, ), types.Tool( name="hubspot_get_active_companies", description="Get most recently active companies from HubSpot", inputSchema={ "type": "object", "properties": { "limit": {"type": "integer", "description": "Maximum number of companies to return (default: 10)"} }, }, ), types.Tool( name="hubspot_get_active_contacts", description="Get most recently active contacts from HubSpot", inputSchema={ "type": "object", "properties": { "limit": {"type": "integer", "description": "Maximum number of contacts to return (default: 10)"} }, }, ), # Shared space tools types.Tool( name="create_shared_summary", description="Create a new conversation summary in the shared space", inputSchema={ "type": "object", "properties": { "user_id": {"type": "string", "description": "User email/ID creating the summary"}, "company_id": {"type": "string", "description": "Company ID"}, "summary": {"type": "string", "description": "The conversation summary text"} }, "required": ["user_id", "company_id", "summary"] } ), types.Tool( name="get_shared_summaries", description="Retrieve all conversation summaries for the company", inputSchema={ "type": "object", "properties": { "user_id": {"type": "string", "description": "User email/ID requesting summaries"}, "company_id": {"type": "string", "description": "Company ID"} }, "required": ["user_id", "company_id"] } ), # Additional tools for update/delete can be added similarly ] @server.call_tool() async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: try: if name == "hubspot_create_contact": if not arguments: raise ValueError("Missing arguments for create_contact") firstname = arguments["firstname"] lastname = arguments["lastname"] company = arguments.get("properties", {}).get("company") try: from hubspot.crm.contacts import PublicObjectSearchRequest filter_groups = [{ "filters": [ {"propertyName": "firstname", "operator": "EQ", "value": firstname}, {"propertyName": "lastname", "operator": "EQ", "value": lastname} ] }] if company: filter_groups[0]["filters"].append({ "propertyName": "company", "operator": "EQ", "value": company }) search_request = PublicObjectSearchRequest(filter_groups=filter_groups) search_response = hubspot.client.crm.contacts.search_api.do_search(public_object_search_request=search_request) if search_response.total > 0: return [types.TextContent(type="text", text=f"Contact already exists: {search_response.results[0].to_dict()}")] properties = {"firstname": firstname, "lastname": lastname} if "email" in arguments: properties["email"] = arguments["email"] if "properties" in arguments: properties.update(arguments["properties"]) simple_public_object_input = SimplePublicObjectInputForCreate(properties=properties) api_response = hubspot.client.crm.contacts.basic_api.create(simple_public_object_input_for_create=simple_public_object_input) return [types.TextContent(type="text", text=str(api_response.to_dict()))] except ApiException as e: return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")] elif name == "hubspot_create_company": if not arguments: raise ValueError("Missing arguments for create_company") company_name = arguments["name"] try: from hubspot.crm.companies import PublicObjectSearchRequest search_request = PublicObjectSearchRequest( filter_groups=[{"filters": [{"propertyName": "name", "operator": "EQ", "value": company_name}]}] ) search_response = hubspot.client.crm.companies.search_api.do_search(public_object_search_request=search_request) if search_response.total > 0: return [types.TextContent(type="text", text=f"Company already exists: {search_response.results[0].to_dict()}")] properties = {"name": company_name} if "properties" in arguments: properties.update(arguments["properties"]) simple_public_object_input = SimplePublicObjectInputForCreate(properties=properties) api_response = hubspot.client.crm.companies.basic_api.create(simple_public_object_input_for_create=simple_public_object_input) return [types.TextContent(type="text", text=str(api_response.to_dict()))] except ApiException as e: return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")] elif name == "hubspot_get_company_activity": if not arguments: raise ValueError("Missing arguments for get_company_activity") results = hubspot.get_company_activity(arguments["company_id"]) return [types.TextContent(type="text", text=results)] elif name == "hubspot_get_recent_engagements": days = arguments.get("days", 7) if arguments else 7 limit = arguments.get("limit", 50) if arguments else 50 days = int(days) if days is not None else 7 limit = int(limit) if limit is not None else 50 results = hubspot.get_recent_engagements(days=days, limit=limit) return [types.TextContent(type="text", text=results)] elif name == "hubspot_get_active_companies": limit = arguments.get("limit", 10) if arguments else 10 limit = int(limit) if limit is not None else 10 results = hubspot.get_recent_companies(limit=limit) return [types.TextContent(type="text", text=results)] elif name == "hubspot_get_active_contacts": limit = arguments.get("limit", 10) if arguments else 10 limit = int(limit) if limit is not None else 10 results = hubspot.get_recent_contacts(limit=limit) return [types.TextContent(type="text", text=results)] # Shared space tools handling elif name == "create_shared_summary": if not arguments: raise ValueError("Missing arguments for create_shared_summary") user_id = arguments["user_id"] company_id = arguments["company_id"] summary_text = arguments["summary"] try: from .shared_space import create_summary result = create_summary(user_id, company_id, summary_text) return [types.TextContent(type="text", text=json.dumps(result))] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] elif name == "get_shared_summaries": if not arguments: raise ValueError("Missing arguments for get_shared_summaries") user_id = arguments["user_id"] company_id = arguments["company_id"] try: from .shared_space import get_summaries result = get_summaries(user_id, company_id) return [types.TextContent(type="text", text=json.dumps(result))] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] else: raise ValueError(f"Unknown tool: {name}") except ApiException as e: return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): logger.info("Server running with stdio transport") await server.run( read_stream, write_stream, InitializationOptions( server_name="hubspot", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": import asyncio import argparse parser = argparse.ArgumentParser(description="Run the HubSpot MCP server") parser.add_argument("--access-token", help="HubSpot API access token (overrides HUBSPOT_ACCESS_TOKEN environment variable)") args = parser.parse_args() asyncio.run(main(access_token=args.access_token))