Skip to main content
Glama

HubSpot MCP Server

by peakmojo
""" Client for HubSpot ticket-related operations. """ import json import logging import time import requests from typing import Any, Dict, List, Optional, Literal from datetime import datetime, timedelta from hubspot import HubSpot from hubspot.crm.tickets import PublicObjectSearchRequest from hubspot.crm.contacts.exceptions import ApiException from ..core.formatters import convert_datetime_fields from ..core.error_handler import handle_hubspot_errors logger = logging.getLogger('mcp_hubspot_client.ticket') class TicketClient: """Client for HubSpot ticket-related operations.""" def __init__(self, hubspot_client: HubSpot, access_token: str): """Initialize with HubSpot client instance. Args: hubspot_client: Initialized HubSpot client access_token: HubSpot API access token """ self.client = hubspot_client self.access_token = access_token @handle_hubspot_errors def get_tickets( self, criteria: Literal["default", "Closed"] = "default", limit: int = 50, max_retries: int = 3, retry_delay: float = 1.0 ) -> Dict[str, Any]: """Get tickets from HubSpot based on configurable selection criteria. Args: criteria: Selection criteria for tickets - "default": Tickets with "close date" or "last close date" > 1 day ago - "Closed": Tickets with status equals "Closed" limit: Maximum number of tickets to return (default: 50) max_retries: Maximum number of retry attempts for rate limiting (default: 3) retry_delay: Initial delay between retries in seconds (default: 1.0) Returns: Dictionary containing ticket data and pagination information """ # Create filter groups based on criteria filter_groups = self._create_filter_groups_for_criteria(criteria) # Create search request search_request = self._create_ticket_search_request(filter_groups, limit) # Implement retry logic with exponential backoff for rate limiting return self._execute_ticket_search_with_retry( search_request, max_retries, retry_delay ) def _create_filter_groups_for_criteria( self, criteria: Literal["default", "Closed"] ) -> List[Dict[str, Any]]: """Create filter groups based on ticket selection criteria. Args: criteria: Selection criteria for tickets Returns: List of filter groups for the search request """ if criteria == "default": return self._create_default_criteria_filters() elif criteria == "Closed": return self._create_closed_criteria_filters() else: raise ValueError(f"Invalid criteria: {criteria}. Must be 'default' or 'Closed'") def _create_default_criteria_filters(self) -> List[Dict[str, Any]]: """Create filter groups for the default criteria. Default: Tickets with "close date" or "last modified date" > 1 day ago Returns: List of filter groups """ one_day_ago = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ") # Create filter group for close_date > 1 day ago close_date_filter = { "filters": [ { "propertyName": "closedate", "operator": "GT", "value": one_day_ago } ] } # Create filter group for hs_lastmodifieddate > 1 day ago last_close_date_filter = { "filters": [ { "propertyName": "hs_lastmodifieddate", "operator": "GT", "value": one_day_ago } ] } # Add both filter groups (either condition can match) return [close_date_filter, last_close_date_filter] def _create_closed_criteria_filters(self) -> List[Dict[str, Any]]: """Create filter groups for the closed criteria. Closed: Ticket status equals "Closed" Returns: List of filter groups """ return [ # Primary approach: using the pipeline stage ID { "filters": [ { "propertyName": "hs_pipeline_stage", "operator": "EQ", "value": "4" # Using the stage ID from the pipeline data } ] }, # Alternative approach: using the properly capitalized stage name { "filters": [ { "propertyName": "hs_pipeline_stage", "operator": "EQ", "value": "Closed" # Using correct capitalization } ] } ] def _create_ticket_search_request( self, filter_groups: List[Dict[str, Any]], limit: int ) -> PublicObjectSearchRequest: """Create a search request for tickets with specified filters. Args: filter_groups: List of filter groups for the search limit: Maximum number of tickets to return Returns: Configured search request object """ return PublicObjectSearchRequest( filter_groups=filter_groups, sorts=[{ "propertyName": "hs_lastmodifieddate", "direction": "DESCENDING" }], limit=limit, properties=[ "subject", "content", "hs_pipeline", "hs_pipeline_stage", "hs_ticket_status", "status", "hs_ticket_priority", "createdate", "closedate", "hs_lastmodifieddate" ] ) def _execute_ticket_search_with_retry( self, search_request: PublicObjectSearchRequest, max_retries: int, retry_delay: float ) -> Dict[str, Any]: """Execute a ticket search with retry logic for rate limiting. Args: search_request: Search request to execute max_retries: Maximum number of retry attempts retry_delay: Initial delay between retries in seconds Returns: Dictionary with search results and pagination information """ retry_count = 0 current_delay = retry_delay while True: try: # Log filter groups for debugging logger.debug(f"Executing ticket search with filter groups: {json.dumps(search_request.filter_groups)}") # Execute the search search_response = self.client.crm.tickets.search_api.do_search( public_object_search_request=search_request ) # Log raw response logger.debug(f"Search response total results: {search_response.total}") # Convert the response to a dictionary tickets_dict = [ticket.to_dict() for ticket in search_response.results] converted_tickets = convert_datetime_fields(tickets_dict) # Log ticket data if available if tickets_dict: logger.debug(f"First ticket pipeline stage: {tickets_dict[0].get('properties', {}).get('hs_pipeline_stage')}") logger.debug(f"First ticket status: {tickets_dict[0].get('properties', {}).get('hs_ticket_status')}") # Get pagination information next_after = None if hasattr(search_response, 'paging') and hasattr(search_response.paging, 'next'): next_after = search_response.paging.next.after return { "results": converted_tickets, "pagination": { "next": {"after": next_after} }, "total": search_response.total } except ApiException as e: # Check if it's a rate limiting error (429) or server error (5xx) if e.status == 429 or (e.status >= 500 and e.status < 600): retry_count += 1 # Check if we've reached max retries if retry_count > max_retries: logger.error(f"Max retries ({max_retries}) exceeded for API request") raise # Calculate exponential backoff delay sleep_time = current_delay * (2 ** (retry_count - 1)) logger.warning(f"Rate limit hit or server error ({e.status}). Retrying in {sleep_time:.2f} seconds (attempt {retry_count}/{max_retries})") time.sleep(sleep_time) else: # Not a rate limiting or server error, re-raise raise @handle_hubspot_errors def get_conversation_threads(self, ticket_id: str) -> Dict[str, Any]: """Get conversation threads associated with a specific ticket. Args: ticket_id: The ID of the ticket to retrieve conversation threads for Returns: Dictionary containing conversation threads with their messages """ logger.debug(f"Fetching conversation threads for ticket {ticket_id}") try: # Step 1: Use Associations API to retrieve conversation threads associated with the ticket associated_conversations = self._get_associated_conversations(ticket_id) # Step 2: Extract thread IDs from the associated conversations thread_ids = self._extract_thread_ids(associated_conversations) logger.debug(f"Found {len(thread_ids)} conversation threads associated with ticket {ticket_id}") if not thread_ids: logger.info(f"No conversation threads found for ticket {ticket_id}") return self._create_empty_ticket_threads_response(ticket_id) # Step 3: Retrieve all messages for each thread threads, total_messages = self._get_thread_messages(thread_ids) # Convert datetime fields converted_threads = convert_datetime_fields(threads) return { "ticket_id": ticket_id, "threads": converted_threads, "total_threads": len(converted_threads), "total_messages": total_messages } except Exception as e: logger.error(f"Error retrieving conversation threads for ticket {ticket_id}: {str(e)}", exc_info=True) return self._create_empty_ticket_threads_response(ticket_id) def _get_associated_conversations(self, ticket_id: str) -> Dict[str, Any]: """Get conversation threads associated with a ticket. Args: ticket_id: Ticket ID Returns: API response with conversation associations """ url = f"https://api.hubapi.com/crm/v4/objects/tickets/{ticket_id}/associations/conversation" headers = { 'accept': "application/json", 'authorization': f"Bearer {self.access_token}" } response = requests.get(url, headers=headers) response.raise_for_status() # Raise exception for HTTP errors return response.json() def _extract_thread_ids(self, associated_conversations: Dict[str, Any]) -> List[str]: """Extract thread IDs from the conversations association response. Args: associated_conversations: Conversations association response Returns: List of thread IDs """ thread_ids = [] for conversation in associated_conversations.get('results', []): # Check if the conversation has a toObjectId field (this is the conversation ID) if 'toObjectId' in conversation: thread_ids.append(str(conversation['toObjectId'])) elif 'id' in conversation: # Fallback to id if it exists thread_ids.append(str(conversation['id'])) else: # Log warning for debugging logger.warning(f"No 'id' or 'toObjectId' field in conversation: {json.dumps(conversation)}") return thread_ids def _create_empty_ticket_threads_response(self, ticket_id: str) -> Dict[str, Any]: """Create an empty ticket threads response. Args: ticket_id: Ticket ID Returns: Empty response structure """ return { "ticket_id": ticket_id, "threads": [], "total_threads": 0, "total_messages": 0 } def _get_thread_messages(self, thread_ids: List[str]) -> tuple[List[Dict[str, Any]], int]: """Get messages for each thread and format them. Args: thread_ids: List of thread IDs Returns: Tuple of (formatted threads, total message count) """ threads = [] total_messages = 0 for thread_id in thread_ids: try: # Get messages for this thread messages_data = self._fetch_thread_messages(thread_id) message_results = messages_data.get("results", []) # Only keep actual messages (not system messages) actual_messages = [msg for msg in message_results if msg.get("type") == "MESSAGE"] # Format thread with its messages formatted_thread = { "id": thread_id, "messages": [] } # Add formatted messages for msg in actual_messages: formatted_message = self._format_message(msg) formatted_thread["messages"].append(formatted_message) # Sort messages by creation time (ascending) formatted_thread["messages"].sort(key=lambda x: x.get("created_at", "")) # Add thread to the list threads.append(formatted_thread) total_messages += len(formatted_thread["messages"]) except Exception as e: logger.error(f"Error fetching messages for thread {thread_id}: {str(e)}") return threads, total_messages def _fetch_thread_messages(self, thread_id: str) -> Dict[str, Any]: """Fetch messages for a specific thread. Args: thread_id: Thread ID Returns: API response with message results """ messages_url = f"https://api.hubapi.com/conversations/v3/conversations/threads/{thread_id}/messages" headers = { 'accept': "application/json", 'authorization': f"Bearer {self.access_token}" } response = requests.get(messages_url, headers=headers) response.raise_for_status() return response.json() def _format_message(self, msg: Dict[str, Any]) -> Dict[str, Any]: """Format a message. Args: msg: Message data Returns: Formatted message """ # Determine sender type (AGENT or CUSTOMER) sender_type = self._determine_sender_type(msg) # Format the message with required metadata return { "id": msg.get("id"), "created_at": msg.get("createdAt"), "sender_type": sender_type, "text": msg.get("text", ""), # Focus only on text content, ignore attachments } def _determine_sender_type(self, msg: Dict[str, Any]) -> str: """Determine the sender type (AGENT or CUSTOMER). Args: msg: Message data Returns: Sender type string """ sender_type = "UNKNOWN" if msg.get("senders") and len(msg.get("senders")) > 0: sender = msg.get("senders")[0] # In HubSpot, agents typically have senderField as "FROM" and actorId starting with specific prefixes if sender.get("senderField") == "FROM" and sender.get("actorId", "").startswith(("0-1", "0-2")): sender_type = "AGENT" else: sender_type = "CUSTOMER" return sender_type

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/peakmojo/mcp-hubspot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server