import express from 'express';
import helmet from 'helmet';
import compression from 'compression';
import cors from 'cors';
import rateLimit from 'express-rate-limit';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { MCPProxy } from './openapi-mcp-server/mcp/proxy.js';
import { log } from 'apify';
import type { Server } from 'http';
import type { Socket } from 'net';
export class ApifyServer {
private app: express.Application;
private port: number;
private proxy: MCPProxy;
private secretToken: string;
private httpServer?: Server;
private sockets: Set<Socket> = new Set();
// Хранилище активных сессий: sessionId -> Transport
private sessions: Map<string, SSEServerTransport> = new Map();
constructor(
proxy: MCPProxy,
port: number = 8080,
secretToken: string,
notionApiKey: string,
notionApiVersion: string = '2022-06-28'
) {
this.app = express();
this.port = port;
this.proxy = proxy;
this.secretToken = secretToken;
// Middleware
this.app.set('trust proxy', 1);
this.app.use(helmet());
this.app.use(compression());
this.app.use(cors());
// ВАЖНО: Добавляем JSON парсер для обработки POST запросов от Cursor
// Global timeout middleware to prevent hanging requests
this.app.use((req, res, next) => {
res.setTimeout(30000, () => {
if (!res.headersSent) {
res.status(408).json({ error: 'Request Timeout' });
}
});
next();
});
this.setupRoutes();
}
private setupRoutes() {
const sseLimiter = rateLimit({
windowMs: 60_000,
max: 30,
standardHeaders: true,
legacyHeaders: false,
});
const messageLimiter = rateLimit({
windowMs: 60_000,
max: 120,
standardHeaders: true,
legacyHeaders: false,
});
this.app.get('/', (req, res) => {
res.status(200).send('OK');
});
// 1. Health Check
this.app.get('/health', (req, res) => {
// Set timeout to prevent hanging
res.setTimeout(5000, () => {
if (!res.headersSent) {
res.status(504).json({ error: 'Gateway Timeout' });
}
});
res.json({
status: 'healthy',
sessions: this.sessions.size,
timestamp: new Date().toISOString(),
uptime: process.uptime()
});
});
// 2. SSE Endpoint (Канал Сервер -> Клиент)
this.app.get('/sse', sseLimiter, this.authenticate.bind(this), async (req, res) => {
log.info(`[SSE] New connection request from ${req.ip}`);
try {
// Создаем новый транспорт для этого клиента
const transport = new SSEServerTransport('/message', res);
// Подключаем прокси (это запустит сессию и сгенерирует ID)
await this.proxy.connect(transport);
// Получаем sessionId из транспорта
// В SSEServerTransport sessionId доступен через свойство sessionId
const sessionId = (transport as any).sessionId ||
(transport as any)._sessionId ||
Math.random().toString(36).substring(2);
// Сохраняем транспорт в карту сессий
this.sessions.set(sessionId, transport);
log.info(`[SSE] Session created: ${sessionId}`);
// Отправляем клиенту его sessionId для использования в POST запросах
if (!res.headersSent) {
res.write(`data: {"type":"session","sessionId":"${sessionId}"}\n\n`);
}
// Очистка при разрыве
req.on('close', () => {
log.info(`[SSE] Closed connection for session: ${sessionId}`);
this.sessions.delete(sessionId);
try { transport.close(); } catch {}
});
} catch (err) {
log.error(`[SSE] Error: ${err}`);
if (!res.headersSent) res.status(500).end();
}
});
// 3. Message Endpoint (Канал Клиент -> Сервер) - ВОССТАНОВЛЕНО
this.app.post('/message', messageLimiter, this.authenticate.bind(this), async (req, res) => {
const sessionId = req.query.sessionId as string;
if (!sessionId) {
res.status(400).json({ error: 'Missing sessionId' });
return;
}
log.debug(`[POST] Received message for session: ${sessionId}`);
const transport = this.sessions.get(sessionId);
if (!transport) {
log.warning(`[POST] Session not found: ${sessionId}`);
res.status(404).json({ error: 'Session not found' });
return;
}
try {
// Передаем сообщение в транспорт SDK
await transport.handlePostMessage(req, res);
} catch (error) {
log.error(`[POST] Error handling message: ${error}`);
if (!res.headersSent) {
res.status(500).json({ error: 'Internal server error' });
}
}
});
}
private authenticate(req: express.Request, res: express.Response, next: express.NextFunction) {
const authHeader = req.headers['authorization'];
const provided = typeof authHeader === 'string' ? authHeader.replace(/^Bearer\s+/i, '') : undefined;
if (provided && provided === this.secretToken) {
next();
} else {
res.status(401).json({ error: 'Unauthorized', message: 'Invalid or missing secret token' });
}
}
public async start() {
return new Promise<void>((resolve, reject) => {
this.httpServer = this.app.listen(this.port, '0.0.0.0', () => {
log.info(`=== SERVER STARTED on port ${this.port} ===`);
resolve();
});
this.httpServer.on('connection', (socket: Socket) => {
this.sockets.add(socket);
socket.on('close', () => this.sockets.delete(socket));
});
this.httpServer.on('error', (error: any) => {
log.error(`Server failed to start: ${error}`);
reject(error);
});
});
}
public async stop() {
log.info('=== SERVER STOPPING ===');
// Cleanup all active sessions first
for (const [sessionId, transport] of this.sessions) {
try {
log.debug(`Closing session: ${sessionId}`);
transport.close();
} catch (error) {
log.error(`Error closing session ${sessionId}: ${error}`);
}
}
this.sessions.clear();
// Close HTTP server properly
if (this.httpServer) {
const server = this.httpServer;
this.httpServer = undefined;
return new Promise<void>((resolve) => {
const forceCloseTimeout = setTimeout(() => {
for (const socket of this.sockets) {
try { socket.destroy(); } catch {}
}
}, 5000);
server.close((err: any) => {
clearTimeout(forceCloseTimeout);
if (err) {
log.error(`Error closing HTTP server: ${err}`);
} else {
log.info('HTTP server closed successfully');
}
this.sockets.clear();
resolve();
});
});
}
return Promise.resolve();
}
static async create(proxy: MCPProxy, options: any): Promise<ApifyServer> {
const server = new ApifyServer(
proxy,
options.port,
options.secretToken,
options.notionApiKey,
options.notionApiVersion
);
await server.start();
return server;
}
}