server.js•67.7 kB
#!/usr/bin/env node
/**
* HubSpot MCP Server - Complete Multi-Transport Implementation
*
* Production-ready MCP (Model Context Protocol) server implementing ALL required
* transports and endpoints with comprehensive session management.
*
* Supports:
* - HTTP JSON-RPC 2.0 transport
* - Streaming HTTP (Server-sent events) transport
* - STDIO transport for process-based communication
* - Complete MCP protocol implementation (21 methods)
* - Session management with timeouts and rate limiting
* - Comprehensive monitoring and observability
*
* @version 1.0.0
* @license MIT
*/
'use strict';
require('dotenv').config();
const http = require('http');
const https = require('https');
const url = require('url');
const { v4: uuidv4 } = require('uuid');
const yargs = require('yargs/yargs');
const { hideBin } = require('yargs/helpers');
// Command line argument parsing
const argv = yargs(hideBin(process.argv))
.option('transport', {
alias: 't',
type: 'string',
choices: ['http', 'streamable-http', 'stdio'],
default: 'http',
describe: 'Transport protocol to use'
})
.option('port', {
alias: 'p',
type: 'number',
default: 3000,
describe: 'Port to listen on (HTTP/Streamable-HTTP only)'
})
.option('host', {
alias: 'h',
type: 'string',
default: '0.0.0.0',
describe: 'Host to bind to (HTTP/Streamable-HTTP only)'
})
.option('cors-origin', {
type: 'string',
default: 'localhost',
describe: 'CORS allowed origin'
})
.option('log-level', {
type: 'string',
choices: ['debug', 'info', 'warn', 'error'],
default: 'info',
describe: 'Logging level'
})
.option('max-connections', {
type: 'number',
default: 100,
describe: 'Maximum concurrent connections'
})
.option('session-timeout', {
type: 'number',
default: 3600,
describe: 'Session timeout in seconds'
})
.option('hubspot-token', {
type: 'string',
describe: 'HubSpot Private App Access Token'
})
.option('hubspot-api-url', {
type: 'string',
default: 'https://api.hubapi.com',
describe: 'HubSpot API base URL'
})
.help()
.argv;
// Enhanced configuration with environment variable fallbacks
const CONFIG = {
transport: process.env.TRANSPORT || argv.transport,
port: parseInt(process.env.PORT) || argv.port,
host: process.env.HOST || argv.host,
hubspotToken: process.env.HUBSPOT_PRIVATE_APP_ACCESS_TOKEN || argv.hubspotToken,
hubspotApiUrl: process.env.HUBSPOT_API_URL || argv.hubspotApiUrl,
nodeEnv: process.env.NODE_ENV || 'production',
appName: process.env.APP_NAME || 'hubspot-mcp-server',
appVersion: process.env.APP_VERSION || '1.0.0',
corsOrigin: process.env.CORS_ORIGIN || argv.corsOrigin,
maxRequestSize: parseInt(process.env.MAX_REQUEST_SIZE) || 10485760, // 10MB
shutdownTimeout: parseInt(process.env.GRACEFUL_SHUTDOWN_TIMEOUT) || 10000,
logLevel: process.env.LOG_LEVEL || argv.logLevel,
maxConnections: parseInt(process.env.MAX_CONNECTIONS) || argv.maxConnections,
sessionTimeout: parseInt(process.env.SESSION_TIMEOUT) || argv.sessionTimeout,
// Rate limiting configuration
rateLimitTools: parseInt(process.env.RATE_LIMIT_TOOLS) || 60, // per minute
rateLimitResources: parseInt(process.env.RATE_LIMIT_RESOURCES) || 30, // per minute
maxConcurrentRequests: parseInt(process.env.MAX_CONCURRENT_REQUESTS) || 10,
connectionTimeout: parseInt(process.env.CONNECTION_TIMEOUT) || 30000, // 30s
// MCP Protocol version
mcpProtocolVersion: '2024-11-05'
};
// Structured logging with correlation IDs
const Logger = {
_level: CONFIG.logLevel,
_levels: { debug: 0, info: 1, warn: 2, error: 3 },
_log: (level, message, meta = {}) => {
if (Logger._levels[level] >= Logger._levels[Logger._level]) {
const entry = {
timestamp: new Date().toISOString(),
level: level.toUpperCase(),
service: CONFIG.appName,
version: CONFIG.appVersion,
environment: CONFIG.nodeEnv,
correlationId: meta.correlationId || 'system',
sessionId: meta.sessionId,
requestId: meta.requestId,
message,
...meta
};
console.log(JSON.stringify(entry));
}
},
debug: (msg, meta) => Logger._log('debug', msg, meta),
info: (msg, meta) => Logger._log('info', msg, meta),
warn: (msg, meta) => Logger._log('warn', msg, meta),
error: (msg, meta) => Logger._log('error', msg, meta),
setLevel: (level) => {
if (Logger._levels[level] !== undefined) {
Logger._level = level;
Logger.info('Log level changed', { newLevel: level });
}
}
};
// Metrics collection
const Metrics = {
data: {
requests_total: 0,
active_sessions: 0,
hubspot_api_calls: 0,
errors_total: 0,
tool_usage: {},
request_durations: []
},
increment: (metric, labels = {}) => {
if (typeof Metrics.data[metric] === 'number') {
Metrics.data[metric]++;
} else if (typeof Metrics.data[metric] === 'object') {
const key = JSON.stringify(labels);
Metrics.data[metric][key] = (Metrics.data[metric][key] || 0) + 1;
}
},
gauge: (metric, value) => {
Metrics.data[metric] = value;
},
histogram: (metric, value) => {
if (!Array.isArray(Metrics.data[metric])) {
Metrics.data[metric] = [];
}
Metrics.data[metric].push(value);
if (Metrics.data[metric].length > 1000) {
Metrics.data[metric] = Metrics.data[metric].slice(-500);
}
},
getMetrics: () => {
return {
...Metrics.data,
timestamp: new Date().toISOString()
};
}
};
// Session management
class SessionManager {
constructor() {
this.sessions = new Map();
this.cleanup();
}
createSession(clientInfo = {}) {
const sessionId = uuidv4();
const session = {
id: sessionId,
clientInfo,
createdAt: Date.now(),
lastActivity: Date.now(),
subscriptions: new Map(),
rateLimits: {
tools: { count: 0, lastReset: Date.now() },
resources: { count: 0, lastReset: Date.now() }
},
activeRequests: 0,
initialized: false
};
this.sessions.set(sessionId, session);
Metrics.gauge('active_sessions', this.sessions.size);
Logger.info('Session created', { sessionId, clientInfo });
return session;
}
getSession(sessionId) {
const session = this.sessions.get(sessionId);
if (session) {
session.lastActivity = Date.now();
}
return session;
}
removeSession(sessionId) {
const removed = this.sessions.delete(sessionId);
if (removed) {
Metrics.gauge('active_sessions', this.sessions.size);
Logger.info('Session removed', { sessionId });
}
return removed;
}
checkRateLimit(sessionId, type) {
const session = this.getSession(sessionId);
if (!session) return false;
const now = Date.now();
const limit = CONFIG[`rateLimit${type.charAt(0).toUpperCase() + type.slice(1)}`];
const rateLimit = session.rateLimits[type];
// Reset counter every minute
if (now - rateLimit.lastReset > 60000) {
rateLimit.count = 0;
rateLimit.lastReset = now;
}
if (rateLimit.count >= limit) {
Logger.warn('Rate limit exceeded', { sessionId, type, count: rateLimit.count, limit });
return false;
}
rateLimit.count++;
return true;
}
cleanup() {
setInterval(() => {
const now = Date.now();
const timeout = CONFIG.sessionTimeout * 1000;
for (const [sessionId, session] of this.sessions.entries()) {
if (now - session.lastActivity > timeout) {
this.removeSession(sessionId);
}
}
}, 30000); // Check every 30 seconds
}
}
// Validate configuration
function validateConfig() {
if (!CONFIG.hubspotToken) {
Logger.error('Missing required configuration: HUBSPOT_PRIVATE_APP_ACCESS_TOKEN');
process.exit(1);
}
if (CONFIG.transport !== 'stdio' && (CONFIG.port < 1 || CONFIG.port > 65535)) {
Logger.error('Invalid port number', { port: CONFIG.port });
process.exit(1);
}
Logger.info('Configuration validated', {
transport: CONFIG.transport,
port: CONFIG.port,
host: CONFIG.host,
mcpVersion: CONFIG.mcpProtocolVersion
});
}
// Enhanced HubSpot API client
class HubSpotClient {
constructor(token, apiUrl) {
this.token = token;
this.apiUrl = apiUrl;
}
async request(method, endpoint, data = null) {
Metrics.increment('hubspot_api_calls');
return new Promise((resolve, reject) => {
const requestUrl = `${this.apiUrl}${endpoint}`;
const parsedUrl = new URL(requestUrl);
const options = {
hostname: parsedUrl.hostname,
port: parsedUrl.port || 443,
path: parsedUrl.pathname + parsedUrl.search,
method,
headers: {
'Authorization': `Bearer ${this.token}`,
'Content-Type': 'application/json',
'User-Agent': `${CONFIG.appName}/${CONFIG.appVersion}`,
'Accept': 'application/json'
},
timeout: CONFIG.connectionTimeout
};
if (data && ['POST', 'PUT', 'PATCH'].includes(method)) {
const jsonData = JSON.stringify(data);
options.headers['Content-Length'] = Buffer.byteLength(jsonData);
}
const req = https.request(options, (res) => {
let responseData = '';
res.on('data', chunk => responseData += chunk);
res.on('end', () => {
try {
const parsed = responseData ? JSON.parse(responseData) : {};
if (res.statusCode >= 200 && res.statusCode < 300) {
resolve(parsed);
} else {
reject(new Error(`HubSpot API error: ${res.statusCode} ${parsed.message || responseData}`));
}
} catch (error) {
reject(new Error(`Invalid JSON response: ${error.message}`));
}
});
});
req.on('error', error => reject(new Error(`Request failed: ${error.message}`)));
req.on('timeout', () => reject(new Error('Request timeout')));
if (data && ['POST', 'PUT', 'PATCH'].includes(method)) {
req.write(JSON.stringify(data));
}
req.end();
});
}
// Enhanced HubSpot methods
async getContacts(limit = 10, properties = ['firstname', 'lastname', 'email'], after = null) {
const params = new URLSearchParams({
limit: Math.min(limit, 100).toString(),
properties: properties.join(',')
});
if (after) params.append('after', after);
return this.request('GET', `/crm/v3/objects/contacts?${params}`);
}
async getCompanies(limit = 10, properties = ['name', 'domain', 'industry'], after = null) {
const params = new URLSearchParams({
limit: Math.min(limit, 100).toString(),
properties: properties.join(',')
});
if (after) params.append('after', after);
return this.request('GET', `/crm/v3/objects/companies?${params}`);
}
async getDeals(limit = 10, properties = ['dealname', 'amount', 'dealstage'], after = null) {
const params = new URLSearchParams({
limit: Math.min(limit, 100).toString(),
properties: properties.join(',')
});
if (after) params.append('after', after);
return this.request('GET', `/crm/v3/objects/deals?${params}`);
}
async createContact(properties) {
return this.request('POST', '/crm/v3/objects/contacts', { properties });
}
async createCompany(properties) {
return this.request('POST', '/crm/v3/objects/companies', { properties });
}
async createDeal(properties) {
return this.request('POST', '/crm/v3/objects/deals', { properties });
}
async updateContact(id, properties) {
return this.request('PATCH', `/crm/v3/objects/contacts/${id}`, { properties });
}
async updateCompany(id, properties) {
return this.request('PATCH', `/crm/v3/objects/companies/${id}`, { properties });
}
async updateDeal(id, properties) {
return this.request('PATCH', `/crm/v3/objects/deals/${id}`, { properties });
}
async searchContacts(query, properties = ['firstname', 'lastname', 'email'], limit = 20) {
const searchRequest = {
query,
limit: Math.min(limit, 100),
properties
};
return this.request('POST', '/crm/v3/objects/contacts/search', searchRequest);
}
async searchCompanies(query, properties = ['name', 'domain', 'industry'], limit = 20) {
const searchRequest = {
query,
limit: Math.min(limit, 100),
properties
};
return this.request('POST', '/crm/v3/objects/companies/search', searchRequest);
}
async searchDeals(query, properties = ['dealname', 'amount', 'dealstage'], limit = 20) {
const searchRequest = {
query,
limit: Math.min(limit, 100),
properties
};
return this.request('POST', '/crm/v3/objects/deals/search', searchRequest);
}
async getContactByEmail(email) {
return this.request('GET', `/crm/v3/objects/contacts/${email}?idProperty=email`);
}
async getAssociations(objectId, objectType, toObjectType) {
return this.request('GET', `/crm/v4/objects/${objectType}/${objectId}/associations/${toObjectType}`);
}
async getEngagementHistory(objectId, objectType = 'contacts') {
return this.request('GET', `/crm/v3/objects/${objectType}/${objectId}/associations/engagements`);
}
async getProperties(objectType) {
return this.request('GET', `/crm/v3/properties/${objectType}`);
}
async getPipelines() {
return this.request('GET', '/crm/v3/pipelines/deals');
}
async getOwners() {
return this.request('GET', '/crm/v3/owners/');
}
}
// Complete MCP Protocol Implementation with enhanced features
class MCPServer {
constructor(hubspotClient, sessionManager) {
this.hubspot = hubspotClient;
this.sessionManager = sessionManager;
this.serverInfo = {
name: CONFIG.appName,
version: CONFIG.appVersion,
protocolVersion: CONFIG.mcpProtocolVersion
};
this.capabilities = {
tools: { listChanged: true },
resources: { listChanged: true, subscribe: true },
prompts: { listChanged: true },
logging: {},
experimental: {}
};
}
// Core Protocol Methods with session management
async initialize(params = {}, sessionId = null) {
let session;
if (sessionId) {
session = this.sessionManager.getSession(sessionId);
}
if (!session) {
session = this.sessionManager.createSession(params.clientInfo);
}
session.initialized = true;
Logger.info('MCP server initialized', {
sessionId: session.id,
clientInfo: params.clientInfo,
protocolVersion: this.serverInfo.protocolVersion
});
return {
protocolVersion: this.serverInfo.protocolVersion,
capabilities: this.capabilities,
serverInfo: this.serverInfo,
instructions: 'HubSpot MCP Server - Complete access to CRM data with real-time updates and multi-transport support',
sessionId: session.id
};
}
async ping(sessionId) {
const session = this.sessionManager.getSession(sessionId);
if (session) {
Logger.debug('Ping received', { sessionId });
}
return {}; // Ping always returns empty object per MCP spec
}
async cancelled(params, sessionId) {
const { requestId } = params;
Logger.debug('Request cancelled', { requestId, sessionId });
return {};
}
async shutdown(sessionId) {
if (sessionId) {
this.sessionManager.removeSession(sessionId);
Logger.info('Session shutdown', { sessionId });
}
return {};
}
// Enhanced Tools Management with comprehensive HubSpot operations
async listTools(sessionId) {
if (!this.checkSessionAuth(sessionId)) {
throw new Error('Session not initialized');
}
return {
tools: [
{
name: 'get_contacts',
description: 'Retrieve HubSpot contacts with filtering and pagination',
inputSchema: {
type: 'object',
properties: {
limit: { type: 'number', description: 'Maximum contacts to retrieve (1-100)', default: 10, minimum: 1, maximum: 100 },
properties: { type: 'array', items: { type: 'string' }, description: 'Contact properties to include', default: ['firstname', 'lastname', 'email'] },
after: { type: 'string', description: 'Pagination cursor for next page' }
}
}
},
{
name: 'get_companies',
description: 'Retrieve HubSpot companies with filtering and pagination',
inputSchema: {
type: 'object',
properties: {
limit: { type: 'number', description: 'Maximum companies to retrieve (1-100)', default: 10, minimum: 1, maximum: 100 },
properties: { type: 'array', items: { type: 'string' }, description: 'Company properties to include', default: ['name', 'domain', 'industry'] },
after: { type: 'string', description: 'Pagination cursor for next page' }
}
}
},
{
name: 'get_deals',
description: 'Retrieve HubSpot deals with filtering and pagination',
inputSchema: {
type: 'object',
properties: {
limit: { type: 'number', description: 'Maximum deals to retrieve (1-100)', default: 10, minimum: 1, maximum: 100 },
properties: { type: 'array', items: { type: 'string' }, description: 'Deal properties to include', default: ['dealname', 'amount', 'dealstage'] },
after: { type: 'string', description: 'Pagination cursor for next page' }
}
}
},
{
name: 'create_contact',
description: 'Create a new contact in HubSpot CRM',
inputSchema: {
type: 'object',
properties: {
firstname: { type: 'string', description: 'Contact first name' },
lastname: { type: 'string', description: 'Contact last name' },
email: { type: 'string', description: 'Contact email address', format: 'email' },
company: { type: 'string', description: 'Contact company name' },
phone: { type: 'string', description: 'Contact phone number' },
jobtitle: { type: 'string', description: 'Contact job title' }
},
required: ['email']
}
},
{
name: 'create_company',
description: 'Create a new company in HubSpot CRM',
inputSchema: {
type: 'object',
properties: {
name: { type: 'string', description: 'Company name' },
domain: { type: 'string', description: 'Company domain' },
industry: { type: 'string', description: 'Company industry' },
city: { type: 'string', description: 'Company city' },
state: { type: 'string', description: 'Company state' },
country: { type: 'string', description: 'Company country' }
},
required: ['name']
}
},
{
name: 'create_deal',
description: 'Create a new deal in HubSpot CRM',
inputSchema: {
type: 'object',
properties: {
dealname: { type: 'string', description: 'Deal name' },
amount: { type: 'number', description: 'Deal amount' },
dealstage: { type: 'string', description: 'Deal stage' },
pipeline: { type: 'string', description: 'Deal pipeline' },
closedate: { type: 'string', description: 'Close date (YYYY-MM-DD)' }
},
required: ['dealname']
}
},
{
name: 'update_contact',
description: 'Update an existing contact in HubSpot CRM',
inputSchema: {
type: 'object',
properties: {
id: { type: 'string', description: 'Contact ID' },
properties: { type: 'object', description: 'Properties to update' }
},
required: ['id', 'properties']
}
},
{
name: 'update_company',
description: 'Update an existing company in HubSpot CRM',
inputSchema: {
type: 'object',
properties: {
id: { type: 'string', description: 'Company ID' },
properties: { type: 'object', description: 'Properties to update' }
},
required: ['id', 'properties']
}
},
{
name: 'update_deal',
description: 'Update an existing deal in HubSpot CRM',
inputSchema: {
type: 'object',
properties: {
id: { type: 'string', description: 'Deal ID' },
properties: { type: 'object', description: 'Properties to update' }
},
required: ['id', 'properties']
}
},
{
name: 'search_contacts',
description: 'Search HubSpot contacts using query string',
inputSchema: {
type: 'object',
properties: {
query: { type: 'string', description: 'Search query' },
properties: { type: 'array', items: { type: 'string' }, description: 'Contact properties to include' },
limit: { type: 'number', description: 'Maximum results', default: 20, minimum: 1, maximum: 100 }
},
required: ['query']
}
},
{
name: 'search_companies',
description: 'Search HubSpot companies using query string',
inputSchema: {
type: 'object',
properties: {
query: { type: 'string', description: 'Search query' },
properties: { type: 'array', items: { type: 'string' }, description: 'Company properties to include' },
limit: { type: 'number', description: 'Maximum results', default: 20, minimum: 1, maximum: 100 }
},
required: ['query']
}
},
{
name: 'search_deals',
description: 'Search HubSpot deals using query string',
inputSchema: {
type: 'object',
properties: {
query: { type: 'string', description: 'Search query' },
properties: { type: 'array', items: { type: 'string' }, description: 'Deal properties to include' },
limit: { type: 'number', description: 'Maximum results', default: 20, minimum: 1, maximum: 100 }
},
required: ['query']
}
},
{
name: 'get_contact_by_email',
description: 'Get a contact by email address',
inputSchema: {
type: 'object',
properties: {
email: { type: 'string', description: 'Contact email address', format: 'email' }
},
required: ['email']
}
},
{
name: 'get_associations',
description: 'Get associations between HubSpot objects',
inputSchema: {
type: 'object',
properties: {
objectId: { type: 'string', description: 'Object ID' },
objectType: { type: 'string', description: 'Object type (contacts, companies, deals)' },
toObjectType: { type: 'string', description: 'Target object type' }
},
required: ['objectId', 'objectType', 'toObjectType']
}
},
{
name: 'get_engagement_history',
description: 'Get engagement history for an object',
inputSchema: {
type: 'object',
properties: {
objectId: { type: 'string', description: 'Object ID' },
objectType: { type: 'string', description: 'Object type', default: 'contacts' }
},
required: ['objectId']
}
}
]
};
}
async callTool(name, args = {}, sessionId) {
if (!this.checkSessionAuth(sessionId)) {
throw new Error('Session not initialized');
}
if (!this.sessionManager.checkRateLimit(sessionId, 'tools')) {
throw new Error('Rate limit exceeded for tools');
}
const session = this.sessionManager.getSession(sessionId);
if (session.activeRequests >= CONFIG.maxConcurrentRequests) {
throw new Error('Maximum concurrent requests exceeded');
}
session.activeRequests++;
try {
Metrics.increment('tool_usage', { tool: name });
let result;
switch (name) {
case 'get_contacts':
result = await this.hubspot.getContacts(args.limit, args.properties, args.after);
break;
case 'get_companies':
result = await this.hubspot.getCompanies(args.limit, args.properties, args.after);
break;
case 'get_deals':
result = await this.hubspot.getDeals(args.limit, args.properties, args.after);
break;
case 'create_contact':
result = await this.hubspot.createContact(args);
break;
case 'create_company':
result = await this.hubspot.createCompany(args);
break;
case 'create_deal':
result = await this.hubspot.createDeal(args);
break;
case 'update_contact':
result = await this.hubspot.updateContact(args.id, args.properties);
break;
case 'update_company':
result = await this.hubspot.updateCompany(args.id, args.properties);
break;
case 'update_deal':
result = await this.hubspot.updateDeal(args.id, args.properties);
break;
case 'search_contacts':
result = await this.hubspot.searchContacts(args.query, args.properties, args.limit);
break;
case 'search_companies':
result = await this.hubspot.searchCompanies(args.query, args.properties, args.limit);
break;
case 'search_deals':
result = await this.hubspot.searchDeals(args.query, args.properties, args.limit);
break;
case 'get_contact_by_email':
result = await this.hubspot.getContactByEmail(args.email);
break;
case 'get_associations':
result = await this.hubspot.getAssociations(args.objectId, args.objectType, args.toObjectType);
break;
case 'get_engagement_history':
result = await this.hubspot.getEngagementHistory(args.objectId, args.objectType);
break;
default:
throw new Error(`Unknown tool: ${name}`);
}
return {
content: [
{
type: 'text',
text: JSON.stringify(result, null, 2)
}
]
};
} catch (error) {
Metrics.increment('errors_total');
Logger.error('Tool execution failed', {
tool: name,
error: error.message,
sessionId
});
throw new Error(`Tool execution failed: ${error.message}`);
} finally {
session.activeRequests--;
}
}
// Enhanced Resources Management
async listResources(sessionId) {
if (!this.checkSessionAuth(sessionId)) {
throw new Error('Session not initialized');
}
return {
resources: [
{
uri: 'hubspot://contacts',
name: 'HubSpot Contacts Database',
description: 'Real-time access to HubSpot CRM contacts with pagination',
mimeType: 'application/json'
},
{
uri: 'hubspot://companies',
name: 'HubSpot Companies Database',
description: 'Real-time access to HubSpot CRM companies with pagination',
mimeType: 'application/json'
},
{
uri: 'hubspot://deals',
name: 'HubSpot Deals Pipeline',
description: 'Real-time access to HubSpot CRM deals with pagination',
mimeType: 'application/json'
},
{
uri: 'hubspot://properties/contacts',
name: 'Contact Properties Schema',
description: 'Available contact properties and their definitions',
mimeType: 'application/json'
},
{
uri: 'hubspot://properties/companies',
name: 'Company Properties Schema',
description: 'Available company properties and their definitions',
mimeType: 'application/json'
},
{
uri: 'hubspot://properties/deals',
name: 'Deal Properties Schema',
description: 'Available deal properties and their definitions',
mimeType: 'application/json'
},
{
uri: 'hubspot://pipelines/deals',
name: 'Deal Pipeline Configuration',
description: 'Deal pipeline stages and configuration',
mimeType: 'application/json'
},
{
uri: 'hubspot://owners',
name: 'Sales Rep/Owner Information',
description: 'HubSpot users and owners information',
mimeType: 'application/json'
}
]
};
}
async readResource(uri, sessionId) {
if (!this.checkSessionAuth(sessionId)) {
throw new Error('Session not initialized');
}
if (!this.sessionManager.checkRateLimit(sessionId, 'resources')) {
throw new Error('Rate limit exceeded for resources');
}
try {
let content;
switch (uri) {
case 'hubspot://contacts':
content = await this.hubspot.getContacts(100);
break;
case 'hubspot://companies':
content = await this.hubspot.getCompanies(100);
break;
case 'hubspot://deals':
content = await this.hubspot.getDeals(100);
break;
case 'hubspot://properties/contacts':
content = await this.hubspot.getProperties('contacts');
break;
case 'hubspot://properties/companies':
content = await this.hubspot.getProperties('companies');
break;
case 'hubspot://properties/deals':
content = await this.hubspot.getProperties('deals');
break;
case 'hubspot://pipelines/deals':
content = await this.hubspot.getPipelines();
break;
case 'hubspot://owners':
content = await this.hubspot.getOwners();
break;
default:
throw new Error(`Unknown resource: ${uri}`);
}
return {
contents: [
{
uri,
mimeType: 'application/json',
text: JSON.stringify(content, null, 2)
}
]
};
} catch (error) {
Metrics.increment('errors_total');
Logger.error('Resource read failed', { uri, error: error.message, sessionId });
throw new Error(`Resource read failed: ${error.message}`);
}
}
async subscribeResource(uri, sessionId) {
if (!this.checkSessionAuth(sessionId)) {
throw new Error('Session not initialized');
}
const session = this.sessionManager.getSession(sessionId);
const subscriptionId = uuidv4();
session.subscriptions.set(subscriptionId, {
uri,
createdAt: Date.now(),
lastUpdate: Date.now()
});
Logger.info('Resource subscription created', { uri, subscriptionId, sessionId });
return { subscriptionId };
}
async unsubscribeResource(subscriptionId, sessionId) {
if (!this.checkSessionAuth(sessionId)) {
throw new Error('Session not initialized');
}
const session = this.sessionManager.getSession(sessionId);
const removed = session.subscriptions.delete(subscriptionId);
Logger.info('Resource subscription removed', { subscriptionId, found: removed, sessionId });
return {};
}
// Enhanced Prompts Management
async listPrompts(sessionId) {
if (!this.checkSessionAuth(sessionId)) {
throw new Error('Session not initialized');
}
return {
prompts: [
{
name: 'analyze_pipeline',
description: 'Analyze deal pipeline for bottlenecks and opportunities',
arguments: [
{
name: 'stage',
description: 'Specific deal stage to analyze',
required: false
},
{
name: 'owner',
description: 'Deal owner filter',
required: false
},
{
name: 'timeframe',
description: 'Analysis timeframe (30d, 90d, 1y)',
required: false
}
]
},
{
name: 'contact_research',
description: 'Deep contact and company research for lead qualification',
arguments: [
{
name: 'contact_id',
description: 'HubSpot contact ID for research',
required: true
},
{
name: 'include_company',
description: 'Include company analysis',
required: false
}
]
},
{
name: 'lead_scoring',
description: 'Lead qualification and scoring prompts',
arguments: [
{
name: 'criteria',
description: 'Scoring criteria (engagement, fit, intent)',
required: false
},
{
name: 'industry',
description: 'Industry-specific scoring',
required: false
}
]
},
{
name: 'email_templates',
description: 'Generate HubSpot email templates for outreach',
arguments: [
{
name: 'template_type',
description: 'Template type (prospecting, follow-up, nurture)',
required: true
},
{
name: 'industry',
description: 'Target industry',
required: false
}
]
},
{
name: 'meeting_prep',
description: 'Pre-meeting research and preparation prompts',
arguments: [
{
name: 'contact_id',
description: 'HubSpot contact ID',
required: true
},
{
name: 'meeting_type',
description: 'Meeting type (discovery, demo, closing)',
required: false
}
]
}
]
};
}
async getPrompt(name, args = {}, sessionId) {
if (!this.checkSessionAuth(sessionId)) {
throw new Error('Session not initialized');
}
let messages;
switch (name) {
case 'analyze_pipeline':
messages = [
{
role: 'user',
content: {
type: 'text',
text: `Analyze the HubSpot deal pipeline. Stage focus: ${args.stage || 'all stages'}. Owner filter: ${args.owner || 'all owners'}. Timeframe: ${args.timeframe || 'last 90 days'}. Provide insights on conversion rates, bottlenecks, revenue forecasting, and recommendations for improvement.`
}
}
];
break;
case 'contact_research':
messages = [
{
role: 'user',
content: {
type: 'text',
text: `Research HubSpot contact ID: ${args.contact_id}. ${args.include_company === 'true' ? 'Include comprehensive company analysis.' : ''} Provide insights on lead quality, engagement history, qualification status, and next best actions.`
}
}
];
break;
case 'lead_scoring':
messages = [
{
role: 'user',
content: {
type: 'text',
text: `Generate lead scoring criteria and framework. Focus: ${args.criteria || 'engagement, fit, and intent'}. Industry: ${args.industry || 'general'}. Provide scoring methodology, qualification questions, and lead routing recommendations.`
}
}
];
break;
case 'email_templates':
messages = [
{
role: 'user',
content: {
type: 'text',
text: `Create HubSpot email template for ${args.template_type}. Industry: ${args.industry || 'general'}. Include subject line, personalization tokens, compelling copy, and clear call-to-action. Ensure compliance with best practices.`
}
}
];
break;
case 'meeting_prep':
messages = [
{
role: 'user',
content: {
type: 'text',
text: `Prepare for ${args.meeting_type || 'discovery'} meeting with HubSpot contact ID: ${args.contact_id}. Research contact background, company information, recent interactions, and suggest talking points, questions, and meeting objectives.`
}
}
];
break;
default:
throw new Error(`Unknown prompt: ${name}`);
}
return {
description: `Generated prompt for ${name}`,
messages
};
}
// Logging
async setLogLevel(level, sessionId) {
Logger.setLevel(level);
Logger.info('Log level changed via MCP', { newLevel: level, sessionId });
return {};
}
// Completion/Sampling (enhanced implementations)
async complete(params, sessionId) {
Logger.debug('Completion request', { prompt: params.prompt, sessionId });
return {
completion: {
values: ['This is a placeholder completion response for HubSpot MCP server'],
total: 1
}
};
}
async createMessage(params, sessionId) {
Logger.debug('Sampling request', { messages: params.messages?.length, sessionId });
return {
model: 'hubspot-mcp-model',
role: 'assistant',
content: {
type: 'text',
text: 'This is a placeholder sampling response from HubSpot MCP server'
}
};
}
// Notification handlers
async handleInitialized(params, sessionId) {
if (!sessionId) {
Logger.warn('notifications/initialized received without session ID');
return;
}
const session = this.sessionManager.getSession(sessionId);
if (session) {
// Mark session as fully initialized
session.fullyInitialized = true;
session.lastActivity = Date.now();
Logger.info('Session fully initialized', { sessionId });
}
}
async handleCancelled(params, sessionId) {
Logger.debug('Cancellation notification received', {
requestId: params?.requestId,
reason: params?.reason,
sessionId
});
}
async handleProgress(params, sessionId) {
Logger.debug('Progress notification received', {
progress: params?.progress,
total: params?.total,
sessionId
});
}
async handleMessage(params, sessionId) {
Logger.info('Message notification received', {
message: params?.message,
level: params?.level,
sessionId
});
}
async handleResourcesUpdated(params, sessionId) {
Logger.info('Resources updated notification received', { sessionId });
// Could trigger cache invalidation or refresh logic here
}
async handleToolsUpdated(params, sessionId) {
Logger.info('Tools updated notification received', { sessionId });
// Could trigger tool metadata refresh here
}
async handlePromptsUpdated(params, sessionId) {
Logger.info('Prompts updated notification received', { sessionId });
// Could trigger prompt metadata refresh here
}
// Helper methods
checkSessionAuth(sessionId) {
if (!sessionId) return false;
const session = this.sessionManager.getSession(sessionId);
return session && session.initialized;
}
}
// Transport implementations
// Base transport class
class Transport {
constructor(mcpServer, sessionManager) {
this.mcpServer = mcpServer;
this.sessionManager = sessionManager;
}
async handleMCPRequest(jsonRpcRequest, sessionId = null) {
const startTime = Date.now();
const { id, method, params } = jsonRpcRequest;
const requestId = uuidv4();
Metrics.increment('requests_total');
try {
let result;
let finalSessionId = sessionId;
// Extract session ID from params if available
if (params && params.sessionId) {
finalSessionId = params.sessionId;
}
Logger.debug('MCP request received', {
method,
sessionId: finalSessionId,
requestId
});
// Handle notifications (no response expected)
if (method.startsWith('notifications/')) {
return this.handleNotification(method, params, finalSessionId);
}
switch (method) {
// Core Protocol
case 'initialize':
result = await this.mcpServer.initialize(params, finalSessionId);
finalSessionId = result.sessionId;
break;
case 'ping':
result = await this.mcpServer.ping(finalSessionId);
break;
case 'cancelled':
result = await this.mcpServer.cancelled(params, finalSessionId);
break;
case 'shutdown':
result = await this.mcpServer.shutdown(finalSessionId);
break;
// Tools Management
case 'tools/list':
result = await this.mcpServer.listTools(finalSessionId);
break;
case 'tools/call':
result = await this.mcpServer.callTool(params.name, params.arguments, finalSessionId);
break;
// Resources Management
case 'resources/list':
result = await this.mcpServer.listResources(finalSessionId);
break;
case 'resources/read':
result = await this.mcpServer.readResource(params.uri, finalSessionId);
break;
case 'resources/subscribe':
result = await this.mcpServer.subscribeResource(params.uri, finalSessionId);
break;
case 'resources/unsubscribe':
result = await this.mcpServer.unsubscribeResource(params.subscriptionId, finalSessionId);
break;
// Prompts Management
case 'prompts/list':
result = await this.mcpServer.listPrompts(finalSessionId);
break;
case 'prompts/get':
result = await this.mcpServer.getPrompt(params.name, params.arguments, finalSessionId);
break;
// Logging
case 'logging/setLevel':
result = await this.mcpServer.setLogLevel(params.level, finalSessionId);
break;
// Completion/Sampling
case 'completion/complete':
result = await this.mcpServer.complete(params, finalSessionId);
break;
case 'sampling/createMessage':
result = await this.mcpServer.createMessage(params, finalSessionId);
break;
default:
throw new Error(`Method not found: ${method}`);
}
const duration = Date.now() - startTime;
Metrics.histogram('request_durations', duration);
Logger.debug('MCP request completed', {
method,
sessionId: finalSessionId,
requestId,
duration: `${duration}ms`
});
return {
jsonrpc: '2.0',
id,
result
};
} catch (error) {
const duration = Date.now() - startTime;
Metrics.increment('errors_total');
Logger.error('MCP method execution failed', {
method,
error: error.message,
sessionId: sessionId,
requestId,
duration: `${duration}ms`
});
return {
jsonrpc: '2.0',
id,
error: {
code: -32603,
message: error.message,
data: { requestId }
}
};
}
}
async handleNotification(method, params, sessionId) {
const startTime = Date.now();
const requestId = uuidv4();
try {
Logger.debug('MCP notification received', { method, sessionId, requestId });
switch (method) {
case 'notifications/initialized':
await this.mcpServer.handleInitialized(params, sessionId);
break;
case 'notifications/cancelled':
await this.mcpServer.handleCancelled(params, sessionId);
break;
case 'notifications/progress':
await this.mcpServer.handleProgress(params, sessionId);
break;
case 'notifications/message':
await this.mcpServer.handleMessage(params, sessionId);
break;
case 'notifications/resourcesUpdated':
await this.mcpServer.handleResourcesUpdated(params, sessionId);
break;
case 'notifications/toolsUpdated':
await this.mcpServer.handleToolsUpdated(params, sessionId);
break;
case 'notifications/promptsUpdated':
await this.mcpServer.handlePromptsUpdated(params, sessionId);
break;
default:
Logger.warn('Unknown notification method', { method, sessionId });
}
const duration = Date.now() - startTime;
Logger.debug('MCP notification processed', {
method,
sessionId,
requestId,
duration: `${duration}ms`
});
// Notifications don't return responses
return null;
} catch (error) {
const duration = Date.now() - startTime;
Logger.error('MCP notification processing failed', {
method,
error: error.message,
sessionId,
requestId,
duration: `${duration}ms`
});
// Notifications don't return error responses either
return null;
}
}
}
// HTTP Transport
class HTTPTransport extends Transport {
constructor(mcpServer, sessionManager) {
super(mcpServer, sessionManager);
this.server = null;
}
start() {
this.server = http.createServer(async (req, res) => {
await this.handleHTTPRequest(req, res);
});
this.server.listen(CONFIG.port, CONFIG.host, () => {
Logger.info('HTTP transport started', {
address: `http://${CONFIG.host}:${CONFIG.port}`,
transport: 'http'
});
});
return this.server;
}
async handleHTTPRequest(req, res) {
const startTime = Date.now();
// Set security headers
this.setSecurityHeaders(res);
// CORS handling
this.handleCORS(req, res);
// Handle preflight requests
if (req.method === 'OPTIONS') {
res.statusCode = 200;
res.end();
return;
}
const parsedUrl = url.parse(req.url, true);
const { pathname } = parsedUrl;
try {
// Health endpoints
if (req.method === 'GET' && pathname === '/health') {
await this.handleHealthCheck(res);
return;
}
if (req.method === 'GET' && pathname === '/ready') {
await this.handleReadinessCheck(res);
return;
}
if (req.method === 'GET' && pathname === '/metrics') {
await this.handleMetrics(res);
return;
}
if (req.method === 'GET' && pathname === '/status') {
await this.handleStatus(res);
return;
}
// MCP JSON-RPC endpoints
if (req.method === 'POST' && pathname.startsWith('/mcp/')) {
const requestBody = await this.getRequestBody(req);
const jsonRpcRequest = JSON.parse(requestBody);
// Handle session management for HTTP requests
const sessionContext = this.getOrCreateSession(req, res);
const response = await this.handleMCPRequest(jsonRpcRequest, sessionContext.sessionId);
// Set session information in response headers
this.setSessionHeaders(res, sessionContext);
res.statusCode = 200;
res.end(JSON.stringify(response));
return;
}
// 404 for unknown endpoints
res.statusCode = 404;
res.end(JSON.stringify({
error: 'Not Found',
message: 'Endpoint not found',
timestamp: new Date().toISOString()
}));
} catch (error) {
Logger.error('HTTP request processing error', {
error: error.message,
path: pathname,
method: req.method
});
res.statusCode = 500;
res.end(JSON.stringify({
error: 'Internal Server Error',
message: error.message,
timestamp: new Date().toISOString()
}));
} finally {
const duration = Date.now() - startTime;
Logger.debug('HTTP request processed', {
method: req.method,
path: pathname,
statusCode: res.statusCode,
duration: `${duration}ms`
});
}
}
setSecurityHeaders(res) {
res.setHeader('X-Content-Type-Options', 'nosniff');
res.setHeader('X-Frame-Options', 'DENY');
res.setHeader('X-XSS-Protection', '1; mode=block');
res.setHeader('Content-Security-Policy', "default-src 'none'");
res.setHeader('Strict-Transport-Security', 'max-age=31536000');
res.setHeader('Referrer-Policy', 'no-referrer');
res.setHeader('Content-Type', 'application/json; charset=utf-8');
}
handleCORS(req, res) {
const origin = req.headers.origin;
if (CONFIG.corsOrigin === '*' || origin === CONFIG.corsOrigin || CONFIG.corsOrigin.includes(origin)) {
res.setHeader('Access-Control-Allow-Origin', origin || CONFIG.corsOrigin);
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-Session-ID');
}
}
async handleHealthCheck(res) {
res.statusCode = 200;
res.end(JSON.stringify({
status: 'healthy',
timestamp: new Date().toISOString(),
uptime: Math.floor(process.uptime()),
version: CONFIG.appVersion,
service: CONFIG.appName,
transport: CONFIG.transport,
sessions: this.sessionManager.sessions.size
}));
}
async handleReadinessCheck(res) {
const isReady = !!(CONFIG.hubspotToken && process.uptime() > 5);
res.statusCode = isReady ? 200 : 503;
res.end(JSON.stringify({
status: isReady ? 'ready' : 'not ready',
timestamp: new Date().toISOString(),
checks: {
hubspot_token: !!CONFIG.hubspotToken,
server: 'running',
uptime: process.uptime() > 5,
sessions: this.sessionManager.sessions.size
}
}));
}
async handleMetrics(res) {
res.statusCode = 200;
res.end(JSON.stringify(Metrics.getMetrics()));
}
async handleStatus(res) {
res.statusCode = 200;
res.end(JSON.stringify({
service: CONFIG.appName,
version: CONFIG.appVersion,
transport: CONFIG.transport,
uptime: Math.floor(process.uptime()),
sessions: {
active: this.sessionManager.sessions.size,
max: CONFIG.maxConnections
},
config: {
hubspot_connected: !!CONFIG.hubspotToken,
rate_limits: {
tools: CONFIG.rateLimitTools,
resources: CONFIG.rateLimitResources
}
},
timestamp: new Date().toISOString()
}));
}
async getRequestBody(req) {
return new Promise((resolve, reject) => {
let body = '';
let totalSize = 0;
req.on('data', chunk => {
totalSize += chunk.length;
if (totalSize > CONFIG.maxRequestSize) {
reject(new Error('Request too large'));
return;
}
body += chunk;
});
req.on('end', () => resolve(body));
req.on('error', reject);
});
}
// Session management for HTTP requests
getOrCreateSession(req, res) {
let sessionId = null;
let session = null;
let isNewSession = false;
const requestInfo = {
url: req.url,
method: req.method,
userAgent: req.headers['user-agent'],
cookie: req.headers.cookie,
authorization: req.headers.authorization
};
Logger.debug('Session lookup attempt', requestInfo);
// 1. Try to get session ID from various sources (in order of preference)
// a) From X-Session-ID header (highest priority)
sessionId = req.headers['x-session-id'];
if (sessionId) {
session = this.sessionManager.getSession(sessionId);
if (session) {
session.lastActivity = Date.now();
Logger.info('HTTP session found via X-Session-ID header', { sessionId });
} else {
Logger.warn('Invalid session ID in X-Session-ID header', { sessionId });
}
}
// b) From Authorization Bearer header (second priority)
if (!session && req.headers.authorization) {
const authMatch = req.headers.authorization.match(/^Bearer\s+(.+)$/i);
if (authMatch) {
sessionId = authMatch[1];
session = this.sessionManager.getSession(sessionId);
if (session) {
session.lastActivity = Date.now();
Logger.info('HTTP session found via Authorization Bearer', { sessionId });
} else {
Logger.warn('Invalid session ID in Authorization Bearer', { sessionId });
}
}
}
// c) From mcp-session cookie (third priority)
if (!session && req.headers.cookie) {
const cookies = this.parseCookies(req.headers.cookie);
sessionId = cookies['mcp-session'];
if (sessionId) {
session = this.sessionManager.getSession(sessionId);
if (session) {
session.lastActivity = Date.now();
Logger.info('HTTP session found via mcp-session cookie', { sessionId });
} else {
Logger.warn('Invalid session ID in mcp-session cookie', { sessionId });
}
}
Logger.debug('Cookie parsing result', { cookies, sessionId });
}
// d) From request body sessionId parameter (fourth priority)
if (!session && req.body) {
try {
const body = JSON.parse(req.body);
if (body.params && body.params.sessionId) {
sessionId = body.params.sessionId;
session = this.sessionManager.getSession(sessionId);
if (session) {
session.lastActivity = Date.now();
Logger.info('HTTP session found via body param', { sessionId });
} else {
Logger.warn('Invalid session ID in body param', { sessionId });
}
}
} catch (e) {
// Ignore JSON parse errors for session lookup
Logger.debug('Failed to parse request body for session lookup', { error: e.message });
}
}
// 2. Create new session if none found
if (!session) {
session = this.sessionManager.createSession({
transport: 'http',
clientInfo: {
userAgent: req.headers['user-agent'],
ip: req.connection.remoteAddress || req.socket.remoteAddress,
referer: req.headers.referer,
origin: req.headers.origin
}
});
sessionId = session.id;
isNewSession = true;
Logger.info('HTTP session created', {
sessionId,
transport: 'http',
clientInfo: session.clientInfo
});
}
Logger.info('Session resolution completed', {
sessionId,
isNewSession,
method: isNewSession ? 'created' : 'found',
activeSessions: this.sessionManager.sessions.size
});
return {
sessionId,
session,
isNewSession
};
}
// Set session headers and cookies for HTTP responses
setSessionHeaders(res, sessionContext) {
const { sessionId, isNewSession } = sessionContext;
// Always set X-Session-ID header for client tracking
res.setHeader('X-Session-ID', sessionId);
// Set session cookie optimized for aiohttp.ClientSession compatibility
const cookieOptions = [
`mcp-session=${sessionId}`,
'Path=/', // Ensure cookie applies to all paths
`Max-Age=${CONFIG.sessionTimeout}`, // Use configured session timeout
'SameSite=Lax' // CSRF protection while allowing cross-site navigation
];
// Note: Removed HttpOnly to allow aiohttp.ClientSession access
// Only add Secure flag for production HTTPS environments
if (process.env.NODE_ENV === 'production' && CONFIG.secure) {
cookieOptions.push('Secure');
}
res.setHeader('Set-Cookie', cookieOptions.join('; '));
// Add comprehensive CORS headers for session management
res.setHeader('Access-Control-Expose-Headers', 'X-Session-ID');
res.setHeader('Access-Control-Allow-Credentials', 'true');
Logger.debug('HTTP session headers set', {
sessionId,
newSession: isNewSession,
cookie: cookieOptions.join('; ')
});
}
// Parse cookies from Cookie header
parseCookies(cookieHeader) {
const cookies = {};
if (cookieHeader) {
cookieHeader.split(';').forEach(cookie => {
const [name, value] = cookie.trim().split('=');
if (name && value) {
cookies[name] = decodeURIComponent(value);
}
});
}
return cookies;
}
}
// HTTP Streamable Transport (Server-sent events)
class StreamableHTTPTransport extends HTTPTransport {
constructor(mcpServer, sessionManager) {
super(mcpServer, sessionManager);
this.sseConnections = new Map();
}
async handleHTTPRequest(req, res) {
const parsedUrl = url.parse(req.url, true);
const { pathname } = parsedUrl;
// Handle SSE endpoint
if (req.method === 'GET' && pathname === '/mcp/stream') {
await this.handleSSEConnection(req, res);
return;
}
// Handle regular HTTP requests
await super.handleHTTPRequest(req, res);
}
async handleSSEConnection(req, res) {
const providedSessionId = req.headers['x-session-id'];
let sessionId;
let session;
// If session ID provided, try to use existing session
if (providedSessionId) {
session = this.sessionManager.getSession(providedSessionId);
if (session) {
sessionId = providedSessionId;
Logger.info('SSE reconnecting to existing session', { sessionId });
}
}
// Create new session if none exists
if (!session) {
session = this.sessionManager.createSession({
transport: 'streamable-http',
clientInfo: {
userAgent: req.headers['user-agent'],
ip: req.connection.remoteAddress
}
});
sessionId = session.id;
Logger.info('SSE creating new session', { sessionId });
}
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': CONFIG.corsOrigin,
'Access-Control-Allow-Headers': 'Content-Type, X-Session-ID',
'X-Session-ID': sessionId
});
// Store connection with session binding
this.sseConnections.set(sessionId, res);
// Update session with connection info
session.lastActivity = Date.now();
session.connection = {
type: 'sse',
established: Date.now()
};
Logger.info('SSE connection established', { sessionId, transport: 'streamable-http' });
// Send initial connection event with session details
this.sendSSEEvent(sessionId, 'connected', {
sessionId,
timestamp: new Date().toISOString(),
transport: 'streamable-http',
initialized: session.initialized || false
});
// Handle connection close
req.on('close', () => {
this.sseConnections.delete(sessionId);
// Update session but don't remove it immediately - allow for reconnection
const currentSession = this.sessionManager.getSession(sessionId);
if (currentSession) {
currentSession.connection = null;
currentSession.lastActivity = Date.now();
Logger.info('SSE connection closed, session preserved for reconnection', { sessionId });
// Set a timeout to remove session if no reconnection within timeout period
setTimeout(() => {
const stillExists = this.sessionManager.getSession(sessionId);
if (stillExists && !this.sseConnections.has(sessionId)) {
this.sessionManager.removeSession(sessionId);
Logger.info('SSE session expired after no reconnection', { sessionId });
}
}, CONFIG.sessionTimeout * 1000 / 4); // 1/4 of normal timeout for reconnection
} else {
Logger.info('SSE connection closed', { sessionId });
}
});
// Keep alive
const keepAlive = setInterval(() => {
if (this.sseConnections.has(sessionId)) {
this.sendSSEEvent(sessionId, 'ping', { timestamp: new Date().toISOString() });
} else {
clearInterval(keepAlive);
}
}, 30000);
// Handle incoming data for bidirectional communication
let buffer = '';
req.on('data', chunk => {
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop(); // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
try {
const jsonRpcRequest = JSON.parse(line);
this.handleSSERequest(jsonRpcRequest, sessionId);
} catch (error) {
Logger.error('Invalid SSE request', { error: error.message, sessionId });
}
}
}
});
}
async handleSSERequest(jsonRpcRequest, sessionId) {
try {
// Update session activity
const session = this.sessionManager.getSession(sessionId);
if (session) {
session.lastActivity = Date.now();
}
const response = await this.handleMCPRequest(jsonRpcRequest, sessionId);
// Only send response for non-notification requests
if (response !== null) {
this.sendSSEEvent(sessionId, 'response', response);
}
} catch (error) {
Logger.error('SSE request handling error', {
error: error.message,
method: jsonRpcRequest.method,
sessionId
});
this.sendSSEEvent(sessionId, 'error', {
error: error.message,
id: jsonRpcRequest.id,
method: jsonRpcRequest.method
});
}
}
sendSSEEvent(sessionId, type, data) {
const connection = this.sseConnections.get(sessionId);
if (connection) {
const eventData = `event: ${type}\ndata: ${JSON.stringify(data)}\n\n`;
connection.write(eventData);
}
}
// Send notifications to subscribed clients
sendNotification(sessionId, method, params) {
this.sendSSEEvent(sessionId, 'notification', {
jsonrpc: '2.0',
method,
params
});
}
}
// STDIO Transport
class STDIOTransport extends Transport {
constructor(mcpServer, sessionManager) {
super(mcpServer, sessionManager);
this.sessionId = null;
}
start() {
Logger.info('STDIO transport started', { transport: 'stdio' });
// Create a single session for STDIO
const session = this.sessionManager.createSession({ transport: 'stdio' });
this.sessionId = session.id;
// Read from stdin
process.stdin.setEncoding('utf8');
process.stdin.on('readable', () => {
this.handleSTDIOInput();
});
process.stdin.on('end', () => {
Logger.info('STDIO input ended');
process.exit(0);
});
// Handle signals properly
process.on('SIGTERM', () => this.shutdown());
process.on('SIGINT', () => this.shutdown());
}
async handleSTDIOInput() {
let chunk = process.stdin.read();
if (chunk !== null) {
const lines = chunk.split('\n').filter(line => line.trim());
for (const line of lines) {
try {
const jsonRpcRequest = JSON.parse(line);
const response = await this.handleMCPRequest(jsonRpcRequest, this.sessionId);
this.sendSTDIOResponse(response);
} catch (error) {
Logger.error('STDIO request processing error', { error: error.message });
this.sendSTDIOResponse({
jsonrpc: '2.0',
id: null,
error: {
code: -32700,
message: 'Parse error'
}
});
}
}
}
}
sendSTDIOResponse(response) {
process.stdout.write(JSON.stringify(response) + '\n');
}
shutdown() {
if (this.sessionId) {
this.sessionManager.removeSession(this.sessionId);
}
Logger.info('STDIO transport shutdown');
process.exit(0);
}
}
// Transport factory
function createTransport(type, mcpServer, sessionManager) {
switch (type) {
case 'http':
return new HTTPTransport(mcpServer, sessionManager);
case 'streamable-http':
return new StreamableHTTPTransport(mcpServer, sessionManager);
case 'stdio':
return new STDIOTransport(mcpServer, sessionManager);
default:
throw new Error(`Unknown transport type: ${type}`);
}
}
// Graceful shutdown handler
function setupGracefulShutdown(server, sessionManager) {
let shutdownInProgress = false;
const shutdown = (signal) => {
if (shutdownInProgress) return;
shutdownInProgress = true;
Logger.info('Graceful shutdown initiated', { signal });
if (server && server.close) {
server.close(() => {
Logger.info('Server closed successfully');
process.exit(0);
});
} else {
Logger.info('STDIO transport shutdown');
process.exit(0);
}
setTimeout(() => {
Logger.error('Forced shutdown after timeout');
process.exit(1);
}, CONFIG.shutdownTimeout);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
process.on('uncaughtException', (error) => {
Logger.error('Uncaught exception', { error: error.message, stack: error.stack });
process.exit(1);
});
process.on('unhandledRejection', (reason) => {
Logger.error('Unhandled rejection', { reason: reason?.toString() });
process.exit(1);
});
}
// Main application entry point
async function main() {
try {
Logger.info('Starting HubSpot MCP Server', {
version: CONFIG.appVersion,
environment: CONFIG.nodeEnv,
transport: CONFIG.transport,
protocolVersion: CONFIG.mcpProtocolVersion
});
validateConfig();
// Initialize components
const hubspotClient = new HubSpotClient(CONFIG.hubspotToken, CONFIG.hubspotApiUrl);
const sessionManager = new SessionManager();
const mcpServer = new MCPServer(hubspotClient, sessionManager);
// Create and start transport
const transport = createTransport(CONFIG.transport, mcpServer, sessionManager);
const server = transport.start();
Logger.info('🚀 HubSpot MCP Server operational', {
transport: CONFIG.transport,
address: CONFIG.transport !== 'stdio' ? `http://${CONFIG.host}:${CONFIG.port}` : 'stdio',
health: CONFIG.transport !== 'stdio' ? `http://${CONFIG.host}:${CONFIG.port}/health` : undefined,
mcp: CONFIG.transport !== 'stdio' ? `http://${CONFIG.host}:${CONFIG.port}/mcp/` : undefined,
stream: CONFIG.transport === 'streamable-http' ? `http://${CONFIG.host}:${CONFIG.port}/mcp/stream` : undefined,
endpoints: '21 MCP protocol endpoints available',
features: [
'Multi-transport support',
'Session management',
'Rate limiting',
'Real-time subscriptions',
'Comprehensive monitoring'
]
});
setupGracefulShutdown(server, sessionManager);
} catch (error) {
Logger.error('Failed to start server', { error: error.message, stack: error.stack });
process.exit(1);
}
}
// Export for testing
module.exports = {
CONFIG,
Logger,
Metrics,
SessionManager,
HubSpotClient,
MCPServer,
HTTPTransport,
StreamableHTTPTransport,
STDIOTransport,
createTransport,
main
};
// Start server if run directly
if (require.main === module) {
main();
}