Skip to main content
Glama

HubSpot MCP Server

by peakmojo
#!/usr/bin/env python """ Script to retrieve all closed tickets and their conversation threads from HubSpot. """ import os import sys import json import logging from typing import Dict, Any, List # Add src directory to path to allow direct imports sys.path.append(os.path.join(os.path.dirname(__file__), "src")) # Import HubSpot client from mcp_server_hubspot.hubspot_client import HubSpotClient # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("get_closed_ticket_conversations") def get_closed_tickets(hubspot_client: HubSpotClient) -> List[Dict[str, Any]]: """Get all closed tickets from HubSpot. Args: hubspot_client: Initialized HubSpot client Returns: List of closed tickets """ logger.info("Retrieving closed tickets...") # Get closed tickets tickets_response = hubspot_client.get_tickets( criteria="Closed", limit=20, max_retries=3, retry_delay=1.0 ) # Extract ticket data tickets = tickets_response.get("results", []) logger.info(f"Found {len(tickets)} closed tickets") return tickets def get_ticket_conversations(hubspot_client: HubSpotClient, ticket_id: str) -> Dict[str, Any]: """Get conversation threads for a specific ticket. Args: hubspot_client: Initialized HubSpot client ticket_id: ID of the ticket to retrieve conversations for Returns: Conversation thread data or error information """ logger.info(f"Retrieving conversation threads for ticket {ticket_id}...") try: # Get conversation threads threads_response = hubspot_client.get_ticket_conversation_threads(ticket_id=ticket_id) # Check if response is a string (error message) if isinstance(threads_response, str): logger.error(f"Error retrieving conversation threads: {threads_response}") return {"error": threads_response, "total_threads": 0, "total_messages": 0, "threads": []} return threads_response except Exception as e: logger.error(f"Exception retrieving conversation threads: {str(e)}") return {"error": str(e), "total_threads": 0, "total_messages": 0, "threads": []} def main(): """Run the script to get closed tickets and their conversations.""" # Initialize HubSpot client access_token = os.getenv("HUBSPOT_ACCESS_TOKEN") if not access_token: logger.error("HUBSPOT_ACCESS_TOKEN environment variable is required") sys.exit(1) hubspot_client = HubSpotClient(access_token) logger.info("HubSpot client initialized") # Get closed tickets closed_tickets = get_closed_tickets(hubspot_client) if not closed_tickets: logger.info("No closed tickets found") return # Get conversation threads for each ticket all_conversations = [] for ticket in closed_tickets: ticket_id = ticket.get("id") subject = ticket.get("properties", {}).get("subject", "No subject") logger.info(f"Processing ticket: {subject} (ID: {ticket_id})") # Get conversation threads conversation_data = get_ticket_conversations(hubspot_client, ticket_id) # Create a new dictionary with ticket info and conversation data ticket_conversation = { "ticket_id": ticket_id, "ticket_subject": subject, "ticket_content": ticket.get("properties", {}).get("content", ""), "conversation_data": conversation_data } all_conversations.append(ticket_conversation) # Save results to file output_file = "closed_ticket_conversations.json" with open(output_file, "w") as f: json.dump(all_conversations, f, indent=2) logger.info(f"All conversation data saved to {output_file}") # Print summary total_threads = sum(conv.get("conversation_data", {}).get("total_threads", 0) for conv in all_conversations) total_messages = sum(conv.get("conversation_data", {}).get("total_messages", 0) for conv in all_conversations) logger.info(f"Summary: {len(closed_tickets)} tickets, {total_threads} threads, {total_messages} messages") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/peakmojo/mcp-hubspot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server