/**
* MCP Server
*
* Orchestrates MCP SDK server with modular components.
* Thin layer that delegates to specialized modules for actual work.
*/
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import {
Notification,
LoggingMessageNotification,
ToolListChangedNotification,
JSONRPCNotification,
JSONRPCError,
} from '@modelcontextprotocol/sdk/types.js';
import { Request, Response } from 'express';
import { randomUUID } from 'crypto';
// Application layer
import { setupMcpHandlers } from './application/mcp-handlers.js';
import { ToolRegistry, createToolRegistry } from './application/tools/tool-registry.js';
// Twenty CRM Tools
import {
PersonTool,
createPersonTool,
getCreatePersonToolDefinition,
getGetPersonToolDefinition,
getUpdatePersonToolDefinition,
getDeletePersonToolDefinition,
getListPersonsToolDefinition,
} from './application/tools/person-tool.js';
import {
CompanyTool,
createCompanyTool,
getCreateCompanyToolDefinition,
getGetCompanyToolDefinition,
getListCompaniesToolDefinition,
} from './application/tools/company-tool.js';
import {
OpportunityTool,
createOpportunityTool,
getCreateOpportunityToolDefinition,
getGetOpportunityToolDefinition,
getListOpportunitiesToolDefinition,
} from './application/tools/opportunity-tool.js';
// Infrastructure layer
import { ILogger, createLogger } from './infrastructure/logging/logger.js';
import {
SessionManager,
createSessionManager,
} from './infrastructure/http/session-manager.js';
import {
RequestValidator,
createRequestValidator,
} from './infrastructure/http/request-validator.js';
import {
ITwentyGraphQLClient,
createTwentyGraphQLClient,
} from './infrastructure/clients/twenty-graphql-client.js';
import {
ITwentyRESTClient,
createTwentyRESTClient,
} from './infrastructure/clients/twenty-rest-client.js';
import {
AgentGatewayClient,
createAgentGatewayClient,
} from './infrastructure/gateway/agent-gateway-client.js';
/**
* Constants
*/
const SESSION_ID_HEADER_NAME = 'mcp-session-id';
const JSON_RPC = '2.0';
const TOOL_REFRESH_INTERVAL_MS = 5000;
/**
* MCP Server orchestrator
*
* Coordinates all modular components to provide MCP server functionality.
*/
export class MCPServer {
// Core MCP SDK server
private readonly server: Server;
// Application layer components
private readonly toolRegistry: ToolRegistry;
private readonly personTool: PersonTool;
private readonly companyTool: CompanyTool;
private readonly opportunityTool: OpportunityTool;
// Infrastructure layer components
private readonly logger: ILogger;
private readonly sessionManager: SessionManager;
private readonly requestValidator: RequestValidator;
private readonly graphqlClient: ITwentyGraphQLClient;
private readonly restClient: ITwentyRESTClient;
private readonly gatewayClient: AgentGatewayClient | null;
// Runtime state
private toolRefreshInterval: NodeJS.Timeout | undefined;
constructor(server: Server) {
this.server = server;
// Initialize infrastructure
this.logger = createLogger('mcp-server');
this.sessionManager = createSessionManager(this.logger);
this.requestValidator = createRequestValidator(this.logger);
this.graphqlClient = createTwentyGraphQLClient(this.logger);
this.restClient = createTwentyRESTClient(this.logger);
this.gatewayClient = createAgentGatewayClient(this.logger);
// Initialize application layer
this.toolRegistry = createToolRegistry(this.logger);
this.personTool = createPersonTool(this.graphqlClient, this.restClient, this.logger);
this.companyTool = createCompanyTool(this.graphqlClient, this.restClient, this.logger);
this.opportunityTool = createOpportunityTool(this.graphqlClient, this.restClient, this.logger);
// Setup MCP protocol handlers
this.initialize();
this.logger.info('Twenty CRM MCP Server initialized successfully');
// Register with Agent Gateway if configured
this.registerWithGateway();
}
/**
* Register with Agent Gateway
*/
private async registerWithGateway(): Promise<void> {
if (this.gatewayClient) {
try {
await this.gatewayClient.start();
this.logger.info('Agent Gateway registration started');
} catch (error) {
this.logger.error(
'Failed to start Agent Gateway registration',
error instanceof Error ? error : new Error(String(error))
);
}
}
}
/**
* Initialize MCP server with handlers and tools
*/
private initialize(): void {
// Register Person tools
this.toolRegistry.register(getCreatePersonToolDefinition(), (args) => this.personTool.createPerson(args));
this.toolRegistry.register(getGetPersonToolDefinition(), (args) => this.personTool.getPerson(args));
this.toolRegistry.register(getUpdatePersonToolDefinition(), (args) => this.personTool.updatePerson(args));
this.toolRegistry.register(getDeletePersonToolDefinition(), (args) => this.personTool.deletePerson(args));
this.toolRegistry.register(getListPersonsToolDefinition(), (args) => this.personTool.listPersons(args));
// Register Company tools
this.toolRegistry.register(getCreateCompanyToolDefinition(), (args) => this.companyTool.createCompany(args));
this.toolRegistry.register(getGetCompanyToolDefinition(), (args) => this.companyTool.getCompany(args));
this.toolRegistry.register(getListCompaniesToolDefinition(), (args) => this.companyTool.listCompanies(args));
// Register Opportunity tools
this.toolRegistry.register(getCreateOpportunityToolDefinition(), (args) => this.opportunityTool.createOpportunity(args));
this.toolRegistry.register(getGetOpportunityToolDefinition(), (args) => this.opportunityTool.getOpportunity(args));
this.toolRegistry.register(getListOpportunitiesToolDefinition(), (args) => this.opportunityTool.listOpportunities(args));
// Setup MCP request handlers
setupMcpHandlers(this.server, this.toolRegistry, this.logger);
// Start tool refresh interval
this.startToolRefresh();
}
/**
* Start periodic tool refresh notifications
*/
private startToolRefresh(): void {
this.toolRefreshInterval = setInterval(() => {
this.notifyToolListChanged();
}, TOOL_REFRESH_INTERVAL_MS);
this.logger.debug('Tool refresh interval started', {
intervalMs: TOOL_REFRESH_INTERVAL_MS,
});
}
/**
* Notify all sessions that tool list has changed
*/
private notifyToolListChanged(): void {
const transports = this.sessionManager.getAllTransports();
if (transports.length === 0) {
return;
}
const notification: ToolListChangedNotification = {
method: 'notifications/tools/list_changed',
};
transports.forEach((transport) => {
this.sendNotification(transport, notification).catch((error) => {
this.logger.error('Failed to send tool list changed notification', error);
});
});
this.logger.debug('Tool list changed notification sent', {
sessionCount: transports.length,
});
}
/**
* Handle HTTP GET requests (SSE streams)
*/
async handleGetRequest(req: Request, res: Response): Promise<void> {
const sessionId = req.headers[SESSION_ID_HEADER_NAME] as string | undefined;
this.logger.debug('Handling GET request', { sessionId });
// Validate session
const validation = this.requestValidator.validateSessionId(sessionId);
if (!validation.valid) {
this.logger.warn('Invalid session ID in GET request', {
error: validation.error,
});
res
.status(400)
.json(this.createErrorResponse('Bad Request: invalid session ID'));
return;
}
const transport = this.sessionManager.get(sessionId!);
if (!transport) {
this.logger.warn('Session not found', { sessionId });
res
.status(400)
.json(this.createErrorResponse('Bad Request: session not found'));
return;
}
this.logger.info('Establishing SSE stream', { sessionId });
await transport.handleRequest(req, res);
await this.streamMessages(transport);
}
/**
* Handle HTTP POST requests (initialize and standard requests)
*/
async handlePostRequest(req: Request, res: Response): Promise<void> {
const sessionId = req.headers[SESSION_ID_HEADER_NAME] as string | undefined;
this.logger.debug('Handling POST request', {
sessionId,
hasBody: !!req.body,
});
try {
// Handle existing session
if (sessionId && this.sessionManager.has(sessionId)) {
const transport = this.sessionManager.get(sessionId)!;
await transport.handleRequest(req, res, req.body);
return;
}
// Handle new session (initialize request)
if (!sessionId && this.requestValidator.isInitializeRequest(req.body)) {
this.logger.info('Creating new session');
const transport = this.sessionManager.createTransport();
await this.server.connect(transport);
// Set session ID header before handling request
if (transport.sessionId) {
res.setHeader(SESSION_ID_HEADER_NAME, transport.sessionId);
}
await transport.handleRequest(req, res, req.body);
// Store transport if session was created
if (transport.sessionId) {
this.sessionManager.store(transport.sessionId, transport);
this.logger.info('New session created', {
sessionId: transport.sessionId,
});
}
return;
}
// Invalid request
this.logger.warn('Invalid POST request', {
sessionId,
isInitialize: this.requestValidator.isInitializeRequest(req.body),
});
res
.status(400)
.json(this.createErrorResponse('Bad Request: invalid session ID or method'));
} catch (error) {
this.logger.error(
'Error handling POST request',
error instanceof Error ? error : new Error(String(error))
);
res.status(500).json(this.createErrorResponse('Internal server error'));
}
}
/**
* Check if server is healthy
*/
isHealthy(): boolean {
return this.toolRefreshInterval !== undefined;
}
/**
* Get all registered tool definitions
*/
getToolDefinitions(): ReadonlyArray<any> {
return this.toolRegistry.getToolDefinitions();
}
/**
* Cleanup server resources and graceful shutdown
*/
async cleanup(): Promise<void> {
this.logger.info('Cleaning up MCP server');
// Stop tool refresh interval
if (this.toolRefreshInterval) {
clearInterval(this.toolRefreshInterval);
this.toolRefreshInterval = undefined;
}
// Unregister from gateway
if (this.gatewayClient) {
try {
await this.gatewayClient.stop();
this.logger.info('Agent Gateway unregistration completed');
} catch (error) {
this.logger.error(
'Error during Agent Gateway unregistration',
error instanceof Error ? error : new Error(String(error))
);
}
}
// Clear sessions and close server
this.sessionManager.clear();
await this.server.close();
this.logger.info('MCP server cleanup complete');
}
/**
* Stream messages to a transport (demo functionality)
*/
private async streamMessages(transport: any): Promise<void> {
try {
const initialMessage: LoggingMessageNotification = {
method: 'notifications/message',
params: { level: 'info', data: 'SSE Connection established' },
};
await this.sendNotification(transport, initialMessage);
let messageCount = 0;
const interval = setInterval(async () => {
messageCount++;
const message: LoggingMessageNotification = {
method: 'notifications/message',
params: {
level: 'info',
data: `Message ${messageCount} at ${new Date().toISOString()}`,
},
};
try {
await this.sendNotification(transport, message);
if (messageCount === 2) {
clearInterval(interval);
const finalMessage: LoggingMessageNotification = {
method: 'notifications/message',
params: { level: 'info', data: 'Streaming complete!' },
};
await this.sendNotification(transport, finalMessage);
}
} catch (error) {
this.logger.error('Error streaming message', error as Error);
clearInterval(interval);
}
}, 1000);
} catch (error) {
this.logger.error('Error in streamMessages', error as Error);
}
}
/**
* Send notification to transport
*/
private async sendNotification(
transport: any,
notification: Notification
): Promise<void> {
const rpcNotification: JSONRPCNotification = {
...notification,
jsonrpc: JSON_RPC,
};
await transport.send(rpcNotification);
}
/**
* Create JSON-RPC error response
*/
private createErrorResponse(message: string): JSONRPCError {
return {
jsonrpc: JSON_RPC,
error: {
code: -32000,
message,
},
id: randomUUID(),
};
}
/**
* Get gateway registration status
*/
getGatewayStatus(): { isRegistered: boolean; gateway?: string; agent?: string } | null {
if (!this.gatewayClient) {
return null;
}
return this.gatewayClient.getStatus();
}
}