http-server.js•11.1 kB
/**
* Google Cloud MCP HTTP Server
*
* This is an HTTP-based version of the Google Cloud MCP server that supports
* SSE transport for web-based integrations and remote access.
*/
import express from 'express';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { randomUUID } from 'crypto';
import dotenv from 'dotenv';
import cors from 'cors';
// Import service modules (same as stdio version)
import { registerLoggingResources, registerLoggingTools } from './services/logging/index.js';
import { registerSpannerResources, registerSpannerTools, registerSpannerQueryCountTool } from './services/spanner/index.js';
import { registerMonitoringResources, registerMonitoringTools } from './services/monitoring/index.js';
import { registerTraceService } from './services/trace/index.js';
import { registerPrompts } from './prompts/index.js';
import { initGoogleAuth } from './utils/auth.js';
import { registerResourceDiscovery } from './utils/resource-discovery.js';
import { registerProjectTools } from './utils/project-tools.js';
// Load environment variables
dotenv.config();
const SESSION_ID_HEADER_NAME = 'x-mcp-session-id';
/**
* Custom logger that writes to stderr (won't interfere with HTTP responses)
*/
const logger = {
debug: (message, ...args) => {
if (process.env.DEBUG) {
console.error(`[DEBUG] ${message}`, ...args);
}
},
info: (message, ...args) => {
console.error(`[INFO] ${message}`, ...args);
},
warn: (message, ...args) => {
console.error(`[WARN] ${message}`, ...args);
},
error: (message, ...args) => {
console.error(`[ERROR] ${message}`, ...args);
}
};
/**
* MCP HTTP Server class to handle multiple concurrent connections
*/
class MCPHttpServer {
transports = {};
app;
constructor() {
this.app = express();
this.setupMiddleware();
this.setupRoutes();
}
setupMiddleware() {
// Enable CORS for web clients
this.app.use(cors({
origin: process.env.ALLOWED_ORIGINS ? process.env.ALLOWED_ORIGINS.split(',') : '*',
methods: ['GET', 'POST', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization', SESSION_ID_HEADER_NAME],
credentials: true
}));
// Parse JSON bodies
this.app.use(express.json({ limit: '10mb' }));
// Request logging
this.app.use((req, res, next) => {
if (process.env.DEBUG) {
logger.debug(`${req.method} ${req.path} - ${req.get('User-Agent')}`);
}
next();
});
}
setupRoutes() {
const MCP_ENDPOINT = '/mcp';
// Handle GET requests (for SSE streaming)
this.app.get(MCP_ENDPOINT, async (req, res) => {
await this.handleGetRequest(req, res);
});
// Handle POST requests (main MCP communication)
this.app.post(MCP_ENDPOINT, async (req, res) => {
await this.handlePostRequest(req, res);
});
// Health check endpoint
this.app.get('/health', (req, res) => {
res.json({ status: 'healthy', timestamp: new Date().toISOString() });
});
// Capabilities endpoint
this.app.get('/capabilities', (req, res) => {
res.json({
name: 'Google Cloud MCP HTTP',
version: '0.1.0',
description: 'Model Context Protocol HTTP server for Google Cloud services',
capabilities: {
prompts: {},
resources: {},
tools: {},
logging: {}
},
transports: ['sse'],
endpoints: {
mcp: MCP_ENDPOINT,
health: '/health',
capabilities: '/capabilities'
}
});
});
}
async handleGetRequest(req, res) {
logger.debug('GET request received - setting up SSE connection');
try {
// Create a new session
const sessionId = randomUUID();
// Create a new MCP server instance for this session
const mcpServer = new McpServer({
name: 'Google Cloud MCP HTTP',
version: '0.1.0',
description: 'Model Context Protocol HTTP server for Google Cloud services'
}, {
capabilities: {
prompts: {},
resources: {},
tools: {},
logging: {}
}
});
// Register services for this server instance
await this.registerServices(mcpServer);
// Create SSE transport - it handles the SSE setup internally
const transport = new SSEServerTransport('/mcp', res);
// Store session with the transport's own session ID
this.transports[transport.sessionId] = { transport, mcpServer };
// Connect server to transport BEFORE starting
await mcpServer.connect(transport);
// Start SSE stream - this handles headers and initial setup
await transport.start();
logger.info(`New SSE session created: ${transport.sessionId}`);
// Clean up on close
transport.onclose = () => {
delete this.transports[transport.sessionId];
logger.info(`SSE session closed: ${transport.sessionId}`);
};
}
catch (error) {
logger.error('Error setting up SSE connection:', error);
// Only send error response if headers haven't been sent yet
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal error setting up SSE connection'
},
id: null
});
}
}
}
async handlePostRequest(req, res) {
const sessionId = req.headers[SESSION_ID_HEADER_NAME];
logger.debug('POST request received', { sessionId, method: req.body?.method });
if (!sessionId) {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32600,
message: 'Invalid Request: missing session ID header'
},
id: null
});
return;
}
const session = this.transports[sessionId];
if (!session) {
res.status(404).json({
jsonrpc: '2.0',
error: {
code: -32601,
message: 'Session not found'
},
id: null
});
return;
}
try {
// Handle the POST message through the transport
// The transport expects Node.js IncomingMessage and ServerResponse
await session.transport.handlePostMessage(req, res, req.body);
}
catch (error) {
logger.error('Error handling POST message:', error);
// Only send error if headers haven't been sent by transport
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal error'
},
id: null
});
}
}
}
async registerServices(mcpServer) {
try {
// Initialize Google Cloud authentication in lazy mode
const lazyAuth = process.env.LAZY_AUTH !== 'false';
logger.debug(`Initializing Google Cloud authentication in lazy loading mode: ${lazyAuth}`);
if (!lazyAuth) {
const auth = await initGoogleAuth(false);
if (auth) {
logger.debug('Google Cloud authentication initialized successfully');
}
}
// Register all services (same as stdio version)
logger.debug('Registering Google Cloud services...');
// Logging
registerLoggingResources(mcpServer);
registerLoggingTools(mcpServer);
// Spanner
registerSpannerResources(mcpServer);
registerSpannerTools(mcpServer);
registerSpannerQueryCountTool(mcpServer);
// Monitoring
registerMonitoringResources(mcpServer);
await registerMonitoringTools(mcpServer);
// Trace
await registerTraceService(mcpServer);
// Prompts
registerPrompts(mcpServer);
// Project tools
registerProjectTools(mcpServer);
// Resource discovery
await registerResourceDiscovery(mcpServer);
logger.debug('All Google Cloud services registered successfully');
}
catch (error) {
logger.error('Error registering services:', error);
// Continue running even if some services fail to register
}
}
start(port = 3001) {
this.app.listen(port, () => {
logger.info(`Google Cloud MCP HTTP Server listening on port ${port}`);
logger.info(`SSE endpoint: http://localhost:${port}/mcp (GET to establish connection)`);
logger.info(`POST endpoint: http://localhost:${port}/mcp (POST with ${SESSION_ID_HEADER_NAME} header)`);
logger.info(`Health check: http://localhost:${port}/health`);
logger.info(`Capabilities: http://localhost:${port}/capabilities`);
});
}
getApp() {
return this.app;
}
}
/**
* Main function to start the HTTP server
*/
async function main() {
// Set up error handlers
process.on('uncaughtException', (error) => {
logger.error(`Uncaught exception: ${error.message}`, error.stack);
});
process.on('unhandledRejection', (reason, promise) => {
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
});
// Graceful shutdown
process.on('SIGINT', () => {
logger.info('Received SIGINT signal, shutting down gracefully');
process.exit(0);
});
process.on('SIGTERM', () => {
logger.info('Received SIGTERM signal, shutting down gracefully');
process.exit(0);
});
try {
const server = new MCPHttpServer();
const port = parseInt(process.env.PORT || '3001', 10);
server.start(port);
}
catch (error) {
logger.error('Failed to start HTTP server:', error);
process.exit(1);
}
}
// Export for use as a module
export { MCPHttpServer };
// Start the server if this file is run directly
if (import.meta.url === `file://${process.argv[1]}`) {
main();
}
//# sourceMappingURL=http-server.js.map