/**
* Streaming HTTP Transport for Instantly MCP Server
* Implements native StreamableHTTPServerTransport with session management
*/
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import express from 'express';
import cors from 'cors';
import { createServer, Server as HttpServer } from 'http';
import { readFileSync } from 'fs';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
import { randomUUID } from 'node:crypto';
import { TOOLS_DEFINITION } from './tools/index.js';
import { executeToolDirectly } from './handlers/tool-executor.js';
// Simple rate limiting interface
interface RateLimitEntry {
count: number;
resetTime: number;
}
// Instantly.ai custom icon (embedded for reliability)
const INSTANTLY_ICON = '';
// Load the Instantly.ai icon
function loadInstantlyIcon(): string {
return INSTANTLY_ICON;
}
export interface StreamingHttpConfig {
port: number;
host: string;
cors: {
origin: string | string[] | boolean;
credentials?: boolean;
};
auth?: {
apiKeyHeader: string;
requiredApiKey?: string;
};
}
export class StreamingHttpTransport {
private server: Server;
private config: StreamingHttpConfig;
private app: express.Application;
private httpServer?: HttpServer;
private activeSessions = new Map<string, any>();
private transports = new Map<string, StreamableHTTPServerTransport>(); // Store transports by session ID
private sseTransports = new Map<string, SSEServerTransport>(); // Store SSE transports by session ID
private sseSessionMetadata = new Map<string, { apiKey?: string }>(); // Store API key and other metadata per SSE session
private requestHandlers?: {
toolsList: (id: any) => Promise<any>;
toolCall: (params: any, id: any) => Promise<any>;
};
private rateLimitMap = new Map<string, RateLimitEntry>();
private readonly RATE_LIMIT_WINDOW = 60 * 1000; // 1 minute
private readonly RATE_LIMIT_MAX_REQUESTS = 100; // 100 requests per minute per IP
constructor(server: Server, config: StreamingHttpConfig) {
this.server = server;
this.config = config;
this.app = express();
this.setupMiddleware();
// NOTE: We create transport instances per-request in stateful mode
// This allows multiple concurrent sessions with different session IDs
this.setupRoutes();
}
/**
* Handle MCP request with OPTIONAL session-based transport management
* Falls back to stateless mode if sessions aren't supported/available
*/
private async handleMcpRequest(req: express.Request, res: express.Response): Promise<void> {
try {
// Check for existing session ID from request header
const sessionId = req.headers['mcp-session-id'] as string | undefined;
const isInitRequest = req.body?.method === 'initialize';
let transport: StreamableHTTPServerTransport;
let effectiveSessionId: string | undefined;
if (sessionId && this.transports.has(sessionId)) {
// Subsequent request with valid session: reuse existing transport
transport = this.transports.get(sessionId)!;
effectiveSessionId = sessionId;
console.error(`[HTTP] π Reusing existing session: ${sessionId}`);
// Return session ID in response header
res.setHeader('Mcp-Session-Id', sessionId);
} else if (sessionId && !this.transports.has(sessionId)) {
// Session ID provided but not found - fall back to stateless mode
console.error(`[HTTP] β οΈ Session not found: ${sessionId} - falling back to stateless mode`);
// Create new stateless transport for this request
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined, // Stateless mode
enableDnsRebindingProtection: false,
enableJsonResponse: true,
});
await this.server.connect(transport);
} else {
// No session ID - use stateless mode (backward compatible)
console.error(`[HTTP] π Stateless request (no session ID)`);
// Create new stateless transport for this request
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined, // Stateless mode
enableDnsRebindingProtection: false,
enableJsonResponse: true,
});
await this.server.connect(transport);
}
// Handle the request with the transport
await transport.handleRequest(req, res, req.body);
console.error(`[HTTP] β
Request handled successfully${effectiveSessionId ? ` for session: ${effectiveSessionId}` : ' (stateless)'}`);
} catch (error) {
console.error('[HTTP] β MCP request error:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
data: (error as Error).message
},
id: req.body?.id || null,
});
}
}
}
/**
* Setup Express middleware
*/
private setupMiddleware(): void {
// Claude Desktop compatibility: Inject Accept header if missing or incomplete
// The MCP SDK requires BOTH 'application/json' AND 'text/event-stream' in the Accept header
// Claude Desktop/Web may send only 'application/json', while iOS sends '*/*' or no header
// This middleware ensures all clients get the required header combination
// IMPORTANT: Only inject for POST requests to /mcp, not GET requests (which are for discovery)
this.app.use((req, res, next) => {
// Only inject Accept header for POST requests to MCP endpoints
if (req.method === 'POST' && req.path.startsWith('/mcp')) {
const accept = req.headers.accept || '';
const hasJson = accept.includes('application/json');
const hasEventStream = accept.includes('text/event-stream');
if (!hasJson || !hasEventStream) {
req.headers.accept = 'application/json, text/event-stream';
console.error('[HTTP] π§ Injected Accept header for MCP compatibility (was: %s)', accept || 'missing');
}
}
next();
});
// Security: Origin header validation (TEMPORARILY DISABLED FOR DEBUGGING)
// TODO: Re-enable after we capture what origin Claude Web actually sends
this.app.use((req, res, next) => {
const origin = req.headers.origin;
console.error(`[HTTP] οΏ½ DEBUG - Origin header received: ${origin || 'NO ORIGIN'}`);
next();
});
// Enhanced headers for Claude Desktop remote connector compatibility
this.app.use((req, res, next) => {
res.set({
'Connection': 'keep-alive',
'Keep-Alive': 'timeout=120, max=100', // Increased from 30s to 120s for large campaigns
'Cache-Control': 'no-cache, no-store, must-revalidate',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Server': 'instantly-mcp/1.2.0'
});
next();
});
// CORS configuration for instantly.ai domain
this.app.use(cors({
origin: this.config.cors.origin,
credentials: this.config.cors.credentials || true,
methods: ['GET', 'POST', 'OPTIONS'],
allowedHeaders: [
'Content-Type',
'Authorization',
'x-api-key',
'x-instantly-api-key',
'mcp-session-id',
'mcp-protocol-version',
'User-Agent',
'X-Requested-With'
]
}));
// JSON parsing with larger limit for complex requests
this.app.use(express.json({ limit: '10mb' }));
// Rate limiting middleware
this.app.use((req, res, next) => {
if (process.env.NODE_ENV === 'production') {
const clientIp = req.ip || req.connection.remoteAddress || 'unknown';
const now = Date.now();
const rateLimitEntry = this.rateLimitMap.get(clientIp);
if (rateLimitEntry) {
if (now < rateLimitEntry.resetTime) {
if (rateLimitEntry.count >= this.RATE_LIMIT_MAX_REQUESTS) {
res.status(429).json({
jsonrpc: '2.0',
id: req.body?.id || null,
error: {
code: -32000,
message: 'Rate limit exceeded',
data: {
limit: this.RATE_LIMIT_MAX_REQUESTS,
window: this.RATE_LIMIT_WINDOW / 1000,
resetTime: new Date(rateLimitEntry.resetTime).toISOString()
}
}
});
return;
}
rateLimitEntry.count++;
} else {
// Reset the rate limit window
this.rateLimitMap.set(clientIp, { count: 1, resetTime: now + this.RATE_LIMIT_WINDOW });
}
} else {
// First request from this IP
this.rateLimitMap.set(clientIp, { count: 1, resetTime: now + this.RATE_LIMIT_WINDOW });
}
}
next();
});
// Enhanced request logging with user agent and auth method detection
this.app.use((req, res, next) => {
const timestamp = new Date().toISOString();
const userAgent = req.headers['user-agent'] || 'unknown';
const hasAuth = !!(req.headers.authorization || req.headers['x-instantly-api-key'] || req.params?.apiKey);
const authMethod = req.params?.apiKey ? 'URL' : (req.headers.authorization ? 'Bearer' : (req.headers['x-instantly-api-key'] ? 'Header' : 'None'));
console.error(`[HTTP] ${timestamp} ${req.method} ${req.path} - ${req.ip} - Auth: ${authMethod} - UA: ${userAgent.substring(0, 50)}`);
// DETAILED LOGGING FOR CLAUDE DESKTOP DEBUGGING
console.error(`[HTTP] π FULL REQUEST:`);
console.error(`[HTTP] Origin: ${req.headers.origin || 'NONE'}`);
console.error(`[HTTP] Referer: ${req.headers.referer || 'NONE'}`);
console.error(`[HTTP] Accept: ${req.headers.accept || 'NONE'}`);
console.error(`[HTTP] Content-Type: ${req.headers['content-type'] || 'NONE'}`);
console.error(`[HTTP] All Headers: ${JSON.stringify(req.headers)}`);
// Update client detection with user agent for HTTP requests
if (req.method === 'POST' && (req.path === '/mcp' || req.path.startsWith('/mcp/'))) {
import('./client-detection.js').then(({ globalClientManager }) => {
globalClientManager.updateClientInfo(undefined, userAgent);
}).catch(err => {
console.error('[HTTP] Client detection update failed:', err);
});
}
next();
});
// MCP Protocol Version header (required by spec)
// Updated to 2025-06-18 for latest MCP protocol features
this.app.use((req, res, next) => {
res.setHeader('mcp-protocol-version', '2025-06-18');
next();
});
}
/**
* Setup routes
*/
private setupRoutes(): void {
// Fast ping endpoint for Claude Desktop connection testing
this.app.get('/ping', (req, res) => {
res.json({
pong: true,
timestamp: Date.now(),
server: 'instantly-mcp'
});
});
// Claude Desktop connection test endpoint
this.app.options('*', (req, res) => {
res.status(200).end();
});
// Health check endpoint
this.app.get('/health', (req, res) => {
res.json({
status: 'healthy',
service: 'instantly-mcp',
version: '1.2.0',
transport: 'dual-protocol',
protocols: {
streamable_http: '2025-06-18',
sse: '2024-11-05'
},
timestamp: new Date().toISOString(),
activeSessions: this.activeSessions.size,
sseSessions: this.sseTransports.size,
endpoints: {
mcp: '/mcp (GET for SSE, POST for Streamable HTTP)',
'mcp-with-key': '/mcp/:apiKey',
messages: '/messages (POST for SSE transport)',
health: '/health',
info: '/info',
ping: '/ping'
}
});
});
// Server info endpoint
this.app.get('/info', (req, res) => {
res.json({
name: 'Instantly MCP Server',
version: '1.2.0',
description: 'Official Instantly.ai MCP server with 34 email automation tools',
transport: 'streaming-http',
endpoint: 'https://mcp.instantly.ai/mcp',
protocol: '2025-06-18',
tools: 34,
capabilities: {
tools: true,
resources: false,
prompts: false,
logging: false
},
features: [
'Email campaign management',
'Lead management',
'Account management',
'Analytics and reporting',
'Email verification',
'Subsequence management'
],
authentication: {
methods: [
{
type: 'header-based',
description: 'API key in request headers (more secure)',
endpoint: '/mcp',
formats: [
'Authorization: Bearer [INSTANTLY_API_KEY]',
'x-instantly-api-key: [INSTANTLY_API_KEY]',
'x-api-key: [INSTANTLY_API_KEY] (legacy)'
]
},
{
type: 'url-based',
description: 'API key as path parameter',
endpoint: '/mcp/{API_KEY}',
example: '/mcp/your-instantly-api-key-here'
}
]
},
documentation: 'https://github.com/Instantly-ai/instantly-mcp'
});
});
// DELETE /mcp - Session cleanup endpoint
// MUST be registered BEFORE .all('/mcp') to avoid being caught by it
this.app.delete('/mcp', async (req, res) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
console.error('[HTTP] ποΈ DELETE request received');
console.error('[HTTP] Session ID from header:', sessionId);
if (!sessionId) {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: Session ID required',
data: {
hint: 'Include Mcp-Session-Id header with the session ID to delete'
}
},
id: null,
});
return;
}
if (this.transports.has(sessionId)) {
// Clean up the session
this.transports.delete(sessionId);
console.error(`[HTTP] β
Session deleted: ${sessionId}`);
res.status(204).end();
} else {
console.error(`[HTTP] β Session not found: ${sessionId}`);
res.status(404).json({
jsonrpc: '2.0',
error: {
code: -32001,
message: 'Session not found',
data: {
sessionId,
hint: 'Session may have already been cleaned up'
}
},
id: null,
});
}
});
// Main MCP endpoint with header-based authentication
// Uses POST only - MCP protocol uses JSON-RPC over POST
// GET requests handled separately for discovery
this.app.post('/mcp', async (req, res) => {
// VERBOSE LOGGING FOR CLAUDE DESKTOP/WEB DEBUGGING
console.error('[HTTP] ========== INCOMING MCP REQUEST (HEADER AUTH) ==========');
console.error('[HTTP] π FULL REQUEST HEADERS:', JSON.stringify(req.headers, null, 2));
console.error('[HTTP] π REQUEST BODY:', JSON.stringify(req.body, null, 2));
console.error('[HTTP] π REQUEST METHOD:', req.body?.method || 'unknown');
console.error('[HTTP] =======================================');
// Try to extract API key from headers FIRST (for Claude Desktop compatibility)
let apiKey = req.headers.authorization?.replace('Bearer ', '') ||
req.headers['x-instantly-api-key'] as string ||
req.headers['x-api-key'] as string;
if (apiKey) {
// API key provided in header - store it and proceed WITHOUT auth middleware
console.error('[HTTP] π API key found in headers, bypassing auth middleware');
req.headers['x-instantly-api-key'] = apiKey;
(req as any).instantlyApiKey = apiKey;
} else {
// No API key in headers - this might be an initialize request
// Allow it through for protocol negotiation
console.error('[HTTP] β οΈ No API key in headers - allowing for initialize');
}
// Handle session-based transport management (stateful mode)
await this.handleMcpRequest(req, res);
});
// URL-based authentication endpoint: /mcp/{API_KEY}
// Uses POST only - MCP protocol uses JSON-RPC over POST
this.app.post('/mcp/:apiKey', async (req, res) => {
// Extract API key from URL path
let apiKey = req.params.apiKey;
if (!apiKey || apiKey.length < 10) {
res.status(401).json({
jsonrpc: '2.0',
id: req.body?.id || null,
error: {
code: -32001,
message: 'Invalid API key in URL path',
data: {
reason: 'API key too short or missing',
format: '/mcp/{INSTANTLY_API_KEY}',
example: '/mcp/your-instantly-api-key-here',
note: 'Provide your Instantly.ai API key as the path parameter'
}
}
});
return;
}
// VERBOSE LOGGING FOR CLAUDE DESKTOP/WEB DEBUGGING
console.error('[HTTP] ========== INCOMING MCP REQUEST (URL AUTH) ==========');
console.error('[HTTP] π API Key from URL:', apiKey.substring(0, 20) + '...');
console.error('[HTTP] π FULL REQUEST HEADERS:', JSON.stringify(req.headers, null, 2));
console.error('[HTTP] π REQUEST BODY:', JSON.stringify(req.body, null, 2));
console.error('[HTTP] π REQUEST METHOD:', req.body?.method || 'unknown');
console.error('[HTTP] =======================================');
// Store the API key in request headers for SDK to pass through via extra.requestInfo.headers
req.headers['x-instantly-api-key'] = apiKey;
// Also store in request object as backup
(req as any).instantlyApiKey = apiKey;
// Handle session-based transport management (stateful mode)
await this.handleMcpRequest(req, res);
});
// OAuth 2.1 Authorization Server Metadata (RFC 8414)
// Claude Desktop/Web checks this endpoint to discover auth capabilities
// Return 404 to indicate no OAuth is supported - API key only
this.app.get('/.well-known/oauth-authorization-server', (req, res) => {
console.error('[HTTP] π OAuth discovery: /.well-known/oauth-authorization-server - returning 404 (API key auth only)');
// Return 404 to clearly indicate OAuth is not supported
// This tells Claude Desktop/Web to skip OAuth and allow direct connection
res.status(404).json({
error: 'OAuth not supported',
message: 'This server uses API key authentication only. No OAuth flow required.',
authentication: {
type: 'api_key',
methods: ['url_path', 'header'],
url_format: '/mcp/{API_KEY}',
header_format: 'Authorization: Bearer {API_KEY} or x-instantly-api-key: {API_KEY}'
},
documentation: 'https://github.com/Instantly-ai/instantly-mcp'
});
});
// OAuth 2.0 Protected Resource Metadata (RFC 8414)
// Return 404 to indicate no OAuth is supported
this.app.get('/.well-known/oauth-protected-resource', (req, res) => {
console.error('[HTTP] π OAuth discovery: /.well-known/oauth-protected-resource - returning 404 (API key auth only)');
res.status(404).json({
error: 'OAuth not supported',
message: 'This server uses API key authentication only. No OAuth flow required.',
authentication: {
type: 'api_key',
methods: ['url_path', 'header']
},
documentation: 'https://github.com/Instantly-ai/instantly-mcp'
});
});
// OAuth 2.0 Protected Resource Metadata with path parameter
this.app.get('/.well-known/oauth-protected-resource/mcp/:apiKey', (req, res) => {
console.error('[HTTP] π OAuth discovery with API key in path');
res.json({
resource: 'https://mcp.instantly.ai',
authorization_servers: ['https://mcp.instantly.ai'],
bearer_methods_supported: ['header', 'url_path'],
resource_documentation: 'https://github.com/Instantly-ai/instantly-mcp',
api_key_provided: true,
ready: true
});
});
// OAuth 2.1 Authorization Server Metadata with path parameter
this.app.get('/.well-known/oauth-authorization-server/mcp/:apiKey', (req, res) => {
console.error('[HTTP] π OAuth authorization server discovery with API key in path');
res.json({
issuer: 'https://mcp.instantly.ai',
service_documentation: 'https://github.com/Instantly-ai/instantly-mcp',
api_key_provided: true,
ready: true,
// Indicate no OAuth flow needed - API key already provided
authorization_required: false
});
});
// OAuth Dynamic Client Registration (RFC 7591)
// Return 404 to indicate OAuth registration is not supported
this.app.post('/register', (req, res) => {
console.error('[HTTP] π OAuth client registration attempted - returning 404 (not supported)');
// Return 404 to clearly indicate OAuth registration is not supported
res.status(404).json({
error: 'OAuth registration not supported',
message: 'This server uses API key authentication only. No client registration required.',
authentication: {
type: 'api_key',
description: 'Provide your Instantly.ai API key via URL path or headers',
methods: ['url_path', 'header']
}
});
});
// Minimal /authorize endpoint for MCP clients that expect OAuth-style discovery
this.app.get('/authorize', (req, res) => {
console.error('[HTTP] π /authorize endpoint accessed - MCP client discovery');
// Return MCP server capabilities instead of OAuth flow
res.json({
server: 'instantly-mcp',
version: '1.2.0',
protocol: 'mcp',
transport: 'streamable-http',
auth: {
type: 'api_key',
methods: ['url_path', 'header'],
description: 'Provide API key via URL path or x-instantly-api-key header'
},
endpoints: {
mcp: '/mcp',
mcp_with_key: '/mcp/{API_KEY}',
health: '/health',
info: '/info'
},
capabilities: ['tools'],
ready: true
});
});
// GET endpoint for MCP clients
// IMPORTANT: Returns immediately - no blocking on SSE setup
// Cursor and most MCP clients use POST for actual communication
this.app.get('/mcp/:apiKey?', (req, res) => {
const apiKey = req.params.apiKey;
const acceptHeader = req.headers.accept || '';
console.error(`[HTTP] π GET /mcp request - API Key: ${apiKey ? 'β
Present' : 'β Missing'}`);
console.error(`[HTTP] π Accept: ${acceptHeader}`);
// For ALL GET requests, return server info immediately
// This ensures Cursor and other clients don't hang on discovery
// Actual MCP communication happens via POST
res.json({
server: 'instantly-mcp',
version: '1.2.0',
transport: 'streamable-http',
protocol: '2025-06-18',
tools: TOOLS_DEFINITION.length,
endpoints: {
'mcp_post': apiKey ? `/mcp/${apiKey}` : '/mcp',
'health': '/health',
'info': '/info'
},
auth: {
required: true,
methods: ['path_parameter', 'header'],
note: 'Use POST to /mcp/{API_KEY} for MCP communication'
},
status: 'ready'
});
});
// POST /messages endpoint for SSE transport (Claude.ai proxy compatibility)
// ENHANCED: Handle requests even without SSE session (SSE handshake sometimes fails)
this.app.post('/messages', async (req, res) => {
const sessionId = req.query.sessionId as string;
const apiKeyFromQuery = req.query.apiKey as string;
console.error(`[HTTP] π¨ POST /messages request - Session ID: ${sessionId || 'Missing'}`);
console.error(`[HTTP] π¨ Request body:`, JSON.stringify(req.body, null, 2));
// Try to get API key from multiple sources
let apiKey: string | undefined;
// Source 1: Query parameter (explicit)
if (apiKeyFromQuery) {
apiKey = apiKeyFromQuery;
console.error(`[HTTP] π API key from query parameter`);
}
// Source 2: Session metadata (if SSE session exists)
if (!apiKey && sessionId) {
const sessionMetadata = this.sseSessionMetadata.get(sessionId);
if (sessionMetadata?.apiKey) {
apiKey = sessionMetadata.apiKey;
console.error(`[HTTP] π API key from SSE session metadata`);
}
}
// Source 3: Extract from sessionId if it looks like an API key
if (!apiKey && sessionId && sessionId.length > 20) {
// SessionId might actually be the API key (Claude.ai sometimes does this)
// Accept any sessionId that's long enough (could be sk_xxx or base64 encoded)
apiKey = sessionId;
console.error(`[HTTP] π Using sessionId as API key (fallback)`);
}
if (!apiKey) {
console.error(`[HTTP] β No API key found in any source`);
return res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: API key required (provide via sessionId or apiKey query parameter)'
},
id: req.body?.id || null
});
}
// Inject API key into request headers for tool handlers
if (!req.headers) {
req.headers = {};
}
req.headers['x-instantly-api-key'] = apiKey;
console.error(`[HTTP] π Injected API key into request headers`);
// Try to use SSE transport if available, otherwise handle directly
const transport = this.sseTransports.get(sessionId);
if (transport) {
console.error(`[HTTP] π‘ Using SSE transport for session ${sessionId}`);
try {
await transport.handlePostMessage(req, res, req.body);
console.error(`[HTTP] β
Message handled via SSE transport`);
return;
} catch (error) {
console.error(`[HTTP] β SSE transport failed, falling back to direct handling:`, error);
}
}
// FALLBACK: Handle the request directly without SSE transport
// This allows tools to work even if SSE handshake failed
console.error(`[HTTP] π No SSE transport available - handling request directly`);
try {
const { method, params, id } = req.body;
if (method === 'tools/call') {
const { name, arguments: args } = params;
console.error(`[HTTP] π§ Executing tool directly: ${name}`);
// Use the static import from top of file (no dynamic import!)
const result = await executeToolDirectly(name, args, apiKey);
return res.status(200).json({
jsonrpc: '2.0',
result,
id
});
} else if (method === 'tools/list') {
// Return the tools list using static import
console.error(`[HTTP] π Returning tools list directly`);
return res.status(200).json({
jsonrpc: '2.0',
result: { tools: TOOLS_DEFINITION },
id
});
} else {
return res.status(200).json({
jsonrpc: '2.0',
error: {
code: -32601,
message: `Method not found: ${method}`
},
id
});
}
} catch (error) {
console.error(`[HTTP] β Error handling request directly:`, error);
return res.status(200).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: `Internal error: ${error instanceof Error ? error.message : String(error)}`
},
id: req.body?.id || null
});
}
});
// Server-Sent Events endpoint for streaming MCP clients
this.app.get('/sse', (req, res) => {
console.error('[HTTP] π‘ SSE connection request from MCP client');
// Setup SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control',
'X-Accel-Buffering': 'no' // Disable Nginx buffering
});
// Send initial connection message
res.write('data: {"type":"connection","status":"connected"}\n\n');
// Keep connection alive with periodic heartbeat
const keepAlive = setInterval(() => {
res.write(': heartbeat\n\n');
}, 30000);
// Handle client disconnect
req.on('close', () => {
console.error('[HTTP] π‘ SSE connection closed by client');
clearInterval(keepAlive);
});
req.on('error', (error) => {
console.error('[HTTP] β SSE connection error:', error);
clearInterval(keepAlive);
});
});
// SSE endpoint with API key in URL for authenticated streaming
this.app.get('/sse/:apiKey', (req, res) => {
const apiKey = req.params.apiKey;
console.error(`[HTTP] π‘ Authenticated SSE connection request (API key: ${apiKey.substring(0, 8)}...)`);
// Setup SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control',
'X-Accel-Buffering': 'no'
});
// Send initial connection message with API key confirmation
res.write(`data: {"type":"connection","status":"connected","authenticated":true}\n\n`);
// Keep connection alive
const keepAlive = setInterval(() => {
res.write(': heartbeat\n\n');
}, 30000);
// Handle client disconnect
req.on('close', () => {
console.error('[HTTP] π‘ Authenticated SSE connection closed');
clearInterval(keepAlive);
});
req.on('error', (error) => {
console.error('[HTTP] β Authenticated SSE connection error:', error);
clearInterval(keepAlive);
});
});
// CORS preflight
this.app.options('*', (req, res) => {
res.sendStatus(200);
});
// 404 handler
this.app.use((req, res) => {
res.status(404).json({
error: 'Not Found',
message: `Endpoint ${req.path} not found`,
availableEndpoints: ['/mcp', '/mcp/{API_KEY}', '/authorize', '/health', '/info'],
transport: 'streamable-http',
protocol: '2025-06-18'
});
});
}
/**
* Authentication middleware - Extracts per-request API keys for passthrough
*/
private authMiddleware(req: express.Request, res: express.Response, next: express.NextFunction): void {
// Extract Instantly API key from request headers
let instantlyApiKey = '';
// Try multiple header formats
const authHeader = req.headers.authorization as string;
if (authHeader) {
if (authHeader.startsWith('Bearer ')) {
instantlyApiKey = authHeader.substring(7);
} else {
instantlyApiKey = authHeader;
}
}
// Fallback to x-instantly-api-key header
if (!instantlyApiKey) {
instantlyApiKey = req.headers['x-instantly-api-key'] as string;
}
// Fallback to query parameter
if (!instantlyApiKey) {
instantlyApiKey = req.query.instantly_api_key as string;
}
// For backward compatibility, check old x-api-key header
if (!instantlyApiKey) {
instantlyApiKey = req.headers['x-api-key'] as string;
}
if (!instantlyApiKey) {
res.status(401).json({
jsonrpc: '2.0',
id: req.body?.id || null,
error: {
code: -32001,
message: 'Instantly API key required',
data: {
reason: 'Missing API key',
supportedFormats: [
'Authorization: Bearer [INSTANTLY_API_KEY]',
'x-instantly-api-key: [INSTANTLY_API_KEY]',
'x-api-key: [INSTANTLY_API_KEY] (legacy)',
'?instantly_api_key=[INSTANTLY_API_KEY]'
],
note: 'Provide your Instantly.ai API key, not a server authentication key'
}
}
});
return;
}
// Use API key as-is from headers (Instantly.ai expects base64-encoded format)
console.error(`[HTTP] π Using API key from headers as-is`);
// Store the API key in request headers for SDK to pass through via extra.requestInfo.headers
req.headers['x-instantly-api-key'] = instantlyApiKey;
// Also store in request object as backup
(req as any).instantlyApiKey = instantlyApiKey;
next();
}
// Deprecated placeholder handlers are no longer needed because we delegate
// to the official transport. setRequestHandlers is retained for backward
// compatibility but not used by the transport.
/**
* Start the HTTP server
*/
async start(): Promise<void> {
return new Promise((resolve, reject) => {
this.httpServer = createServer(this.app);
// Enhanced timeout and connection handling for Claude Desktop and Claude.ai
// INCREASED: Timeout from 30s to 120s to accommodate large campaigns with many sender emails
this.httpServer.timeout = 120000; // 120 second timeout (was 60s)
this.httpServer.keepAliveTimeout = 125000; // 125 second keep-alive
this.httpServer.headersTimeout = 126000; // 126 second headers timeout
// NOTE: In stateful mode, we don't pre-connect the server to a transport
// Instead, we create transport instances per-session in handleMcpRequest()
// This allows multiple concurrent sessions with different session IDs
// eslint-disable-next-line @typescript-eslint/no-misused-promises
this.httpServer!.listen(this.config.port, this.config.host, () => {
console.error(`[Instantly MCP] π Streaming HTTP server running at http://${this.config.host}:${this.config.port}`);
console.error(`[Instantly MCP] π Health check: http://${this.config.host}:${this.config.port}/health`);
console.error(`[Instantly MCP] π Ping endpoint: http://${this.config.host}:${this.config.port}/ping`);
console.error(`[Instantly MCP] π MCP endpoint: http://${this.config.host}:${this.config.port}/mcp`);
if (process.env.NODE_ENV === 'production') {
console.error(`[Instantly MCP] π’ Production endpoints:`);
console.error(`[Instantly MCP] π Header auth: https://mcp.instantly.ai/mcp`);
console.error(`[Instantly MCP] π URL auth: https://mcp.instantly.ai/mcp/{API_KEY}`);
}
resolve();
});
this.httpServer.on('error', (error) => {
console.error('[Instantly MCP] β HTTP server error:', error);
reject(error);
});
// Enhanced connection handling for Claude Desktop and Claude.ai
this.httpServer.on('connection', (socket) => {
socket.setKeepAlive(true, 30000);
socket.setTimeout(120000); // Increased from 30s to 120s for large campaigns
socket.on('timeout', () => {
console.error('[Instantly MCP] β οΈ Socket timeout (120s), closing connection');
socket.destroy();
});
});
// Session cleanup interval
setInterval(() => {
this.cleanupSessions();
}, 60000); // Clean up every minute
});
}
/**
* Clean up inactive sessions and rate limit entries
*/
private cleanupSessions(): void {
const now = Date.now();
const sessionTimeout = 30 * 60 * 1000; // 30 minutes
let cleanedSessions = 0;
let cleanedRateLimits = 0;
// Clean up inactive sessions
for (const [sessionId, session] of this.activeSessions.entries()) {
if (now - session.lastActivity > sessionTimeout) {
this.activeSessions.delete(sessionId);
cleanedSessions++;
}
}
// Clean up expired rate limit entries
for (const [ip, entry] of this.rateLimitMap.entries()) {
if (now > entry.resetTime) {
this.rateLimitMap.delete(ip);
cleanedRateLimits++;
}
}
if (cleanedSessions > 0 || cleanedRateLimits > 0) {
console.error(`[HTTP] π§Ή Cleanup: ${cleanedSessions} sessions, ${cleanedRateLimits} rate limits - Active: ${this.activeSessions.size} sessions, ${this.rateLimitMap.size} rate limits`);
}
}
/**
* Set request handlers (to be called by main server)
*/
setRequestHandlers(handlers: {
toolsList: (id: any) => Promise<any>;
toolCall: (params: any, id: any) => Promise<any>;
}): void {
this.requestHandlers = handlers;
}
/**
* Get the active transports map (for debugging/monitoring)
* In stateful mode, we maintain multiple transport instances per session
*/
getTransports() {
return this.transports;
}
/**
* Graceful shutdown
*/
async shutdown(): Promise<void> {
console.error('[Instantly MCP] π Shutting down HTTP server...');
if (this.httpServer) {
return new Promise((resolve) => {
this.httpServer!.close(() => {
console.error('[Instantly MCP] β
HTTP server shut down gracefully');
resolve();
});
});
}
}
}