Skip to main content
Glama

HubSpot MCP Server

by peakmojo
#!/usr/bin/env python """ Test script for retrieving closed tickets and their conversation threads using the MCP tools. """ import os import sys import json import logging from typing import Dict, Any, List, Optional # Add src directory to path to allow direct imports sys.path.append(os.path.join(os.path.dirname(__file__), "src")) # Import HubSpot client from mcp_server_hubspot.hubspot_client import HubSpotClient # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("test_mcp_ticket_conversations") def test_get_closed_tickets(hubspot_client: HubSpotClient) -> List[Dict[str, Any]]: """Test getting closed tickets using the MCP tool. Args: hubspot_client: Initialized HubSpot client Returns: List of closed tickets """ logger.info("=== Testing hubspot_get_tickets with 'Closed' criteria ===") # Get closed tickets tickets_response = hubspot_client.get_tickets( criteria="Closed", limit=20, max_retries=3, retry_delay=1.0 ) # Extract ticket data tickets = tickets_response.get("results", []) logger.info(f"Found {len(tickets)} closed tickets") # Print ticket details for i, ticket in enumerate(tickets): ticket_id = ticket.get("id") subject = ticket.get("properties", {}).get("subject", "No subject") content = ticket.get("properties", {}).get("content", "No content") pipeline_stage = ticket.get("properties", {}).get("hs_pipeline_stage", "Unknown") logger.info(f"Ticket {i+1}:") logger.info(f" ID: {ticket_id}") logger.info(f" Subject: {subject}") logger.info(f" Content: {content}") logger.info(f" Pipeline Stage: {pipeline_stage}") return tickets def test_get_ticket_conversation_threads(hubspot_client: HubSpotClient, ticket_id: str) -> Dict[str, Any]: """Test getting conversation threads for a specific ticket using the MCP tool. Args: hubspot_client: Initialized HubSpot client ticket_id: ID of the ticket to retrieve conversations for Returns: Conversation thread data """ logger.info(f"=== Testing hubspot_get_ticket_conversation_threads for ticket {ticket_id} ===") # Get conversation threads try: threads_response = hubspot_client.get_ticket_conversation_threads(ticket_id=ticket_id) # Log response details total_threads = threads_response.get("total_threads", 0) total_messages = threads_response.get("total_messages", 0) threads = threads_response.get("threads", []) logger.info(f"Found {total_threads} conversation threads with {total_messages} total messages") # Print thread details for i, thread in enumerate(threads): thread_id = thread.get("id") messages = thread.get("messages", []) logger.info(f"Thread {i+1} (ID: {thread_id}):") logger.info(f" Messages: {len(messages)}") # Print message details for j, message in enumerate(messages): sender_type = message.get("sender_type", "Unknown") text = message.get("text", "No text") created_at = message.get("created_at", "Unknown") logger.info(f" Message {j+1}:") logger.info(f" Sender: {sender_type}") logger.info(f" Created: {created_at}") logger.info(f" Text: {text[:100]}..." if len(text) > 100 else f" Text: {text}") return threads_response except Exception as e: logger.error(f"Error getting conversation threads: {str(e)}") return {"error": str(e)} def main(): """Run the test script.""" # Initialize HubSpot client access_token = os.getenv("HUBSPOT_ACCESS_TOKEN") if not access_token: logger.error("HUBSPOT_ACCESS_TOKEN environment variable is required") sys.exit(1) hubspot_client = HubSpotClient(access_token) logger.info("HubSpot client initialized") # Test getting closed tickets closed_tickets = test_get_closed_tickets(hubspot_client) if not closed_tickets: logger.info("No closed tickets found to test conversation threads") return # Test getting conversation threads for each closed ticket for ticket in closed_tickets: ticket_id = ticket.get("id") subject = ticket.get("properties", {}).get("subject", "No subject") logger.info(f"Testing conversation threads for ticket: {subject} (ID: {ticket_id})") # Get conversation threads test_get_ticket_conversation_threads(hubspot_client, ticket_id) # Add a separator between tickets logger.info("=" * 80) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/peakmojo/mcp-hubspot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server