"""HubSpot API client for MCP server."""
import logging
from typing import Any, Dict, List, Optional
import time
from mcp_base import BaseAPIClient
LOGGER = logging.getLogger(__name__)
class HubSpotClient(BaseAPIClient):
"""Client for HubSpot CRM API v3.
Extends BaseAPIClient to inherit common HTTP request handling.
This client expects to receive an OAuth access token and uses it
to make authenticated requests to the HubSpot API.
"""
BASE_URL = "https://api.hubapi.com"
# ===== Contact Management =====
def list_contacts(
self,
limit: int = 50,
properties: Optional[List[str]] = None,
after: Optional[str] = None,
) -> Dict[str, Any]:
"""List contacts with pagination."""
params = {"limit": limit}
if properties:
params["properties"] = ",".join(properties)
if after:
params["after"] = after
return self._request("GET", "/crm/v3/objects/contacts", params=params)
def get_contact(
self,
contact_id: str,
properties: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Get contact by ID."""
params = {}
if properties:
params["properties"] = ",".join(properties)
return self._request("GET", f"/crm/v3/objects/contacts/{contact_id}", params=params)
def create_contact(self, properties: Dict[str, str]) -> Dict[str, Any]:
"""Create a new contact."""
return self._request(
"POST",
"/crm/v3/objects/contacts",
json_data={"properties": properties}
)
def update_contact(
self,
contact_id: str,
properties: Dict[str, str],
) -> Dict[str, Any]:
"""Update an existing contact."""
return self._request(
"PATCH",
f"/crm/v3/objects/contacts/{contact_id}",
json_data={"properties": properties}
)
def search_contacts(
self,
query: str,
properties: Optional[List[str]] = None,
limit: int = 10,
) -> Dict[str, Any]:
"""Search contacts using the CRM Search API."""
payload = {
"query": query,
"properties": properties or ["firstname", "lastname", "email", "company"],
"limit": limit,
}
return self._request("POST", "/crm/v3/objects/contacts/search", json_data=payload)
# ===== Company Management =====
def list_companies(
self,
limit: int = 50,
properties: Optional[List[str]] = None,
after: Optional[str] = None,
) -> Dict[str, Any]:
"""List companies with pagination."""
params = {"limit": limit}
if properties:
params["properties"] = ",".join(properties)
if after:
params["after"] = after
return self._request("GET", "/crm/v3/objects/companies", params=params)
def get_company(
self,
company_id: str,
properties: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Get company by ID."""
params = {}
if properties:
params["properties"] = ",".join(properties)
return self._request("GET", f"/crm/v3/objects/companies/{company_id}", params=params)
def create_company(self, properties: Dict[str, str]) -> Dict[str, Any]:
"""Create a new company."""
return self._request(
"POST",
"/crm/v3/objects/companies",
json_data={"properties": properties}
)
def search_companies(
self,
query: str,
properties: Optional[List[str]] = None,
limit: int = 10,
) -> Dict[str, Any]:
"""Search companies using the CRM Search API."""
payload = {
"query": query,
"properties": properties or ["name", "domain", "industry"],
"limit": limit,
}
return self._request("POST", "/crm/v3/objects/companies/search", json_data=payload)
# ===== Deal Management =====
def list_deals(
self,
limit: int = 50,
properties: Optional[List[str]] = None,
after: Optional[str] = None,
) -> Dict[str, Any]:
"""List deals with pagination."""
params = {"limit": limit}
if properties:
params["properties"] = ",".join(properties)
if after:
params["after"] = after
return self._request("GET", "/crm/v3/objects/deals", params=params)
def get_deal(
self,
deal_id: str,
properties: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Get deal by ID."""
params = {}
if properties:
params["properties"] = ",".join(properties)
return self._request("GET", f"/crm/v3/objects/deals/{deal_id}", params=params)
def create_deal(self, properties: Dict[str, str]) -> Dict[str, Any]:
"""Create a new deal."""
return self._request(
"POST",
"/crm/v3/objects/deals",
json_data={"properties": properties}
)
def update_deal(
self,
deal_id: str,
properties: Dict[str, str],
) -> Dict[str, Any]:
"""Update an existing deal."""
return self._request(
"PATCH",
f"/crm/v3/objects/deals/{deal_id}",
json_data={"properties": properties}
)
# ===== Pipeline Management =====
def list_pipelines(self, object_type: str = "deals") -> Dict[str, Any]:
"""List pipelines for an object type."""
return self._request("GET", f"/crm/v3/pipelines/{object_type}")
def get_pipeline(
self,
pipeline_id: str,
object_type: str = "deals",
) -> Dict[str, Any]:
"""Get pipeline by ID."""
return self._request("GET", f"/crm/v3/pipelines/{object_type}/{pipeline_id}")
# ===== Owner Management =====
def list_owners(self, limit: int = 100) -> Dict[str, Any]:
"""List HubSpot owners (users)."""
return self._request("GET", "/crm/v3/owners", params={"limit": limit})
def get_owner(self, owner_id: str) -> Dict[str, Any]:
"""Get owner by ID."""
return self._request("GET", f"/crm/v3/owners/{owner_id}")
def update_company(
self,
company_id: str,
properties: Dict[str, str],
) -> Dict[str, Any]:
"""Update an existing company."""
return self._request(
"PATCH",
f"/crm/v3/objects/companies/{company_id}",
json_data={"properties": properties}
)
def search_deals(
self,
query: str,
properties: Optional[List[str]] = None,
limit: int = 10,
) -> Dict[str, Any]:
"""Search deals using the CRM Search API."""
payload = {
"query": query,
"properties": properties or ["dealname", "amount", "dealstage", "closedate"],
"limit": limit,
}
return self._request("POST", "/crm/v3/objects/deals/search", json_data=payload)
# ===== Contact Lists / Segments =====
def list_contact_lists(self, limit: int = 100) -> Dict[str, Any]:
"""List all contact lists."""
return self._request("GET", "/contacts/v1/lists", params={"count": limit})
def get_contact_list(self, list_id: str) -> Dict[str, Any]:
"""Get a specific contact list by ID."""
return self._request("GET", f"/contacts/v1/lists/{list_id}")
def create_contact_list(
self,
name: str,
dynamic: bool = False,
filters: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""Create a new contact list.
For static lists, set dynamic=False.
For dynamic lists, set dynamic=True and provide filters.
"""
payload: Dict[str, Any] = {
"name": name,
"dynamic": dynamic,
}
if filters:
payload["filters"] = filters
return self._request("POST", "/contacts/v1/lists", json_data=payload)
def add_contacts_to_list(
self,
list_id: str,
contact_ids: List[int],
) -> Dict[str, Any]:
"""Add contacts to a static list."""
return self._request(
"POST",
f"/contacts/v1/lists/{list_id}/add",
json_data={"vids": contact_ids}
)
def remove_contacts_from_list(
self,
list_id: str,
contact_ids: List[int],
) -> Dict[str, Any]:
"""Remove contacts from a static list."""
return self._request(
"POST",
f"/contacts/v1/lists/{list_id}/remove",
json_data={"vids": contact_ids}
)
# ===== Engagements / Activities =====
def create_engagement(
self,
engagement_type: str,
associations: Dict[str, List[int]],
metadata: Dict[str, Any],
) -> Dict[str, Any]:
"""Create an engagement (note, email, call, meeting, task).
Args:
engagement_type: One of NOTE, EMAIL, CALL, MEETING, TASK
associations: Dict with contactIds, companyIds, dealIds lists
metadata: Type-specific metadata
"""
payload = {
"engagement": {
"active": True,
"type": engagement_type.upper(),
"timestamp": int(time.time() * 1000),
},
"associations": associations,
"metadata": metadata,
}
return self._request("POST", "/engagements/v1/engagements", json_data=payload)
def create_note(
self,
body: str,
contact_ids: Optional[List[int]] = None,
company_ids: Optional[List[int]] = None,
deal_ids: Optional[List[int]] = None,
) -> Dict[str, Any]:
"""Create a note engagement."""
associations: Dict[str, List[int]] = {}
if contact_ids:
associations["contactIds"] = contact_ids
if company_ids:
associations["companyIds"] = company_ids
if deal_ids:
associations["dealIds"] = deal_ids
return self.create_engagement(
engagement_type="NOTE",
associations=associations,
metadata={"body": body}
)
def create_task(
self,
subject: str,
body: str = "",
due_date: Optional[int] = None,
contact_ids: Optional[List[int]] = None,
company_ids: Optional[List[int]] = None,
deal_ids: Optional[List[int]] = None,
) -> Dict[str, Any]:
"""Create a task engagement."""
associations: Dict[str, List[int]] = {}
if contact_ids:
associations["contactIds"] = contact_ids
if company_ids:
associations["companyIds"] = company_ids
if deal_ids:
associations["dealIds"] = deal_ids
metadata: Dict[str, Any] = {
"subject": subject,
"body": body,
"status": "NOT_STARTED",
}
if due_date:
metadata["dueDate"] = due_date
return self.create_engagement(
engagement_type="TASK",
associations=associations,
metadata=metadata
)
# ===== Associations =====
def create_association(
self,
from_object_type: str,
from_object_id: str,
to_object_type: str,
to_object_id: str,
association_type: str,
) -> Dict[str, Any]:
"""Create an association between two CRM objects."""
return self._request(
"PUT",
f"/crm/v3/objects/{from_object_type}/{from_object_id}/associations/{to_object_type}/{to_object_id}/{association_type}"
)
def get_associations(
self,
object_type: str,
object_id: str,
to_object_type: str,
) -> Dict[str, Any]:
"""Get associations for a CRM object."""
return self._request(
"GET",
f"/crm/v3/objects/{object_type}/{object_id}/associations/{to_object_type}"
)
# ===== Properties =====
def list_properties(self, object_type: str = "contacts") -> Dict[str, Any]:
"""List all properties for an object type."""
return self._request("GET", f"/crm/v3/properties/{object_type}")
def get_property(self, object_type: str, property_name: str) -> Dict[str, Any]:
"""Get a specific property definition."""
return self._request("GET", f"/crm/v3/properties/{object_type}/{property_name}")