/**
* MCP Server
*
* Orchestrates MCP SDK server with modular components.
* Thin layer that delegates to specialized modules for actual work.
*/
import { randomUUID } from 'crypto';
// Application layer
import { setupMcpHandlers } from './application/mcp-handlers.js';
import { createToolRegistry } from './application/tools/tool-registry.js';
// Twenty CRM Tools
import { createPersonTool, getCreatePersonToolDefinition, getGetPersonToolDefinition, getUpdatePersonToolDefinition, getDeletePersonToolDefinition, getListPersonsToolDefinition, } from './application/tools/person-tool.js';
import { createCompanyTool, getCreateCompanyToolDefinition, getGetCompanyToolDefinition, getListCompaniesToolDefinition, } from './application/tools/company-tool.js';
import { createOpportunityTool, getCreateOpportunityToolDefinition, getGetOpportunityToolDefinition, getListOpportunitiesToolDefinition, } from './application/tools/opportunity-tool.js';
// Infrastructure layer
import { createLogger } from './infrastructure/logging/logger.js';
import { createSessionManager, } from './infrastructure/http/session-manager.js';
import { createRequestValidator, } from './infrastructure/http/request-validator.js';
import { createTwentyGraphQLClient, } from './infrastructure/clients/twenty-graphql-client.js';
import { createTwentyRESTClient, } from './infrastructure/clients/twenty-rest-client.js';
import { createAgentGatewayClient, } from './infrastructure/gateway/agent-gateway-client.js';
/**
* Constants
*/
const SESSION_ID_HEADER_NAME = 'mcp-session-id';
const JSON_RPC = '2.0';
const TOOL_REFRESH_INTERVAL_MS = 5000;
/**
* MCP Server orchestrator
*
* Coordinates all modular components to provide MCP server functionality.
*/
export class MCPServer {
// Core MCP SDK server
server;
// Application layer components
toolRegistry;
personTool;
companyTool;
opportunityTool;
// Infrastructure layer components
logger;
sessionManager;
requestValidator;
graphqlClient;
restClient;
gatewayClient;
// Runtime state
toolRefreshInterval;
constructor(server) {
this.server = server;
// Initialize infrastructure
this.logger = createLogger('mcp-server');
this.sessionManager = createSessionManager(this.logger);
this.requestValidator = createRequestValidator(this.logger);
this.graphqlClient = createTwentyGraphQLClient(this.logger);
this.restClient = createTwentyRESTClient(this.logger);
this.gatewayClient = createAgentGatewayClient(this.logger);
// Initialize application layer
this.toolRegistry = createToolRegistry(this.logger);
this.personTool = createPersonTool(this.graphqlClient, this.restClient, this.logger);
this.companyTool = createCompanyTool(this.graphqlClient, this.restClient, this.logger);
this.opportunityTool = createOpportunityTool(this.graphqlClient, this.restClient, this.logger);
// Setup MCP protocol handlers
this.initialize();
this.logger.info('Twenty CRM MCP Server initialized successfully');
// Register with Agent Gateway if configured
this.registerWithGateway();
}
/**
* Register with Agent Gateway
*/
async registerWithGateway() {
if (this.gatewayClient) {
try {
await this.gatewayClient.start();
this.logger.info('Agent Gateway registration started');
}
catch (error) {
this.logger.error('Failed to start Agent Gateway registration', error instanceof Error ? error : new Error(String(error)));
}
}
}
/**
* Initialize MCP server with handlers and tools
*/
initialize() {
// Register Person tools
this.toolRegistry.register(getCreatePersonToolDefinition(), (args) => this.personTool.createPerson(args));
this.toolRegistry.register(getGetPersonToolDefinition(), (args) => this.personTool.getPerson(args));
this.toolRegistry.register(getUpdatePersonToolDefinition(), (args) => this.personTool.updatePerson(args));
this.toolRegistry.register(getDeletePersonToolDefinition(), (args) => this.personTool.deletePerson(args));
this.toolRegistry.register(getListPersonsToolDefinition(), (args) => this.personTool.listPersons(args));
// Register Company tools
this.toolRegistry.register(getCreateCompanyToolDefinition(), (args) => this.companyTool.createCompany(args));
this.toolRegistry.register(getGetCompanyToolDefinition(), (args) => this.companyTool.getCompany(args));
this.toolRegistry.register(getListCompaniesToolDefinition(), (args) => this.companyTool.listCompanies(args));
// Register Opportunity tools
this.toolRegistry.register(getCreateOpportunityToolDefinition(), (args) => this.opportunityTool.createOpportunity(args));
this.toolRegistry.register(getGetOpportunityToolDefinition(), (args) => this.opportunityTool.getOpportunity(args));
this.toolRegistry.register(getListOpportunitiesToolDefinition(), (args) => this.opportunityTool.listOpportunities(args));
// Setup MCP request handlers
setupMcpHandlers(this.server, this.toolRegistry, this.logger);
// Start tool refresh interval
this.startToolRefresh();
}
/**
* Start periodic tool refresh notifications
*/
startToolRefresh() {
this.toolRefreshInterval = setInterval(() => {
this.notifyToolListChanged();
}, TOOL_REFRESH_INTERVAL_MS);
this.logger.debug('Tool refresh interval started', {
intervalMs: TOOL_REFRESH_INTERVAL_MS,
});
}
/**
* Notify all sessions that tool list has changed
*/
notifyToolListChanged() {
const transports = this.sessionManager.getAllTransports();
if (transports.length === 0) {
return;
}
const notification = {
method: 'notifications/tools/list_changed',
};
transports.forEach((transport) => {
this.sendNotification(transport, notification).catch((error) => {
this.logger.error('Failed to send tool list changed notification', error);
});
});
this.logger.debug('Tool list changed notification sent', {
sessionCount: transports.length,
});
}
/**
* Handle HTTP GET requests (SSE streams)
*/
async handleGetRequest(req, res) {
const sessionId = req.headers[SESSION_ID_HEADER_NAME];
this.logger.debug('Handling GET request', { sessionId });
// Validate session
const validation = this.requestValidator.validateSessionId(sessionId);
if (!validation.valid) {
this.logger.warn('Invalid session ID in GET request', {
error: validation.error,
});
res
.status(400)
.json(this.createErrorResponse('Bad Request: invalid session ID'));
return;
}
const transport = this.sessionManager.get(sessionId);
if (!transport) {
this.logger.warn('Session not found', { sessionId });
res
.status(400)
.json(this.createErrorResponse('Bad Request: session not found'));
return;
}
this.logger.info('Establishing SSE stream', { sessionId });
await transport.handleRequest(req, res);
await this.streamMessages(transport);
}
/**
* Handle HTTP POST requests (initialize and standard requests)
*/
async handlePostRequest(req, res) {
const sessionId = req.headers[SESSION_ID_HEADER_NAME];
this.logger.debug('Handling POST request', {
sessionId,
hasBody: !!req.body,
});
try {
// Handle existing session
if (sessionId && this.sessionManager.has(sessionId)) {
const transport = this.sessionManager.get(sessionId);
await transport.handleRequest(req, res, req.body);
return;
}
// Handle new session (initialize request)
if (!sessionId && this.requestValidator.isInitializeRequest(req.body)) {
this.logger.info('Creating new session');
const transport = this.sessionManager.createTransport();
await this.server.connect(transport);
// Set session ID header before handling request
if (transport.sessionId) {
res.setHeader(SESSION_ID_HEADER_NAME, transport.sessionId);
}
await transport.handleRequest(req, res, req.body);
// Store transport if session was created
if (transport.sessionId) {
this.sessionManager.store(transport.sessionId, transport);
this.logger.info('New session created', {
sessionId: transport.sessionId,
});
}
return;
}
// Invalid request
this.logger.warn('Invalid POST request', {
sessionId,
isInitialize: this.requestValidator.isInitializeRequest(req.body),
});
res
.status(400)
.json(this.createErrorResponse('Bad Request: invalid session ID or method'));
}
catch (error) {
this.logger.error('Error handling POST request', error instanceof Error ? error : new Error(String(error)));
res.status(500).json(this.createErrorResponse('Internal server error'));
}
}
/**
* Check if server is healthy
*/
isHealthy() {
return this.toolRefreshInterval !== undefined;
}
/**
* Get all registered tool definitions
*/
getToolDefinitions() {
return this.toolRegistry.getToolDefinitions();
}
/**
* Cleanup server resources and graceful shutdown
*/
async cleanup() {
this.logger.info('Cleaning up MCP server');
// Stop tool refresh interval
if (this.toolRefreshInterval) {
clearInterval(this.toolRefreshInterval);
this.toolRefreshInterval = undefined;
}
// Unregister from gateway
if (this.gatewayClient) {
try {
await this.gatewayClient.stop();
this.logger.info('Agent Gateway unregistration completed');
}
catch (error) {
this.logger.error('Error during Agent Gateway unregistration', error instanceof Error ? error : new Error(String(error)));
}
}
// Clear sessions and close server
this.sessionManager.clear();
await this.server.close();
this.logger.info('MCP server cleanup complete');
}
/**
* Stream messages to a transport (demo functionality)
*/
async streamMessages(transport) {
try {
const initialMessage = {
method: 'notifications/message',
params: { level: 'info', data: 'SSE Connection established' },
};
await this.sendNotification(transport, initialMessage);
let messageCount = 0;
const interval = setInterval(async () => {
messageCount++;
const message = {
method: 'notifications/message',
params: {
level: 'info',
data: `Message ${messageCount} at ${new Date().toISOString()}`,
},
};
try {
await this.sendNotification(transport, message);
if (messageCount === 2) {
clearInterval(interval);
const finalMessage = {
method: 'notifications/message',
params: { level: 'info', data: 'Streaming complete!' },
};
await this.sendNotification(transport, finalMessage);
}
}
catch (error) {
this.logger.error('Error streaming message', error);
clearInterval(interval);
}
}, 1000);
}
catch (error) {
this.logger.error('Error in streamMessages', error);
}
}
/**
* Send notification to transport
*/
async sendNotification(transport, notification) {
const rpcNotification = {
...notification,
jsonrpc: JSON_RPC,
};
await transport.send(rpcNotification);
}
/**
* Create JSON-RPC error response
*/
createErrorResponse(message) {
return {
jsonrpc: JSON_RPC,
error: {
code: -32000,
message,
},
id: randomUUID(),
};
}
/**
* Get gateway registration status
*/
getGatewayStatus() {
if (!this.gatewayClient) {
return null;
}
return this.gatewayClient.getStatus();
}
}