/**
* HybridHub MCP Server - Main Server Module
* Handles server initialization and MCP transport
*/
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import express from "express";
import path from "path";
import { readFileSync } from "fs";
import { fileURLToPath } from "url";
import { z } from "zod";
import type { Request, Response, NextFunction } from "express";
import {
resolveSourceConfigs,
resolveTransport,
resolvePort,
resolveApiKey,
getSourcesSummary,
} from "./config/env.js";
import type { DatabaseSourceConfig, StorageSourceConfig } from "./types/config.js";
// Create __dirname equivalent for ES modules
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// Load package.json to get version
let packageJson: { version: string };
try {
const packageJsonPath = path.join(__dirname, "..", "package.json");
packageJson = JSON.parse(readFileSync(packageJsonPath, "utf8"));
} catch {
packageJson = { version: "0.1.0" };
}
// Server info
export const SERVER_NAME = "HybridHub MCP Server";
export const SERVER_VERSION = packageJson.version;
/**
* Generate ASCII art banner with version information
*/
export function generateBanner(version: string): string {
return `
_ _ _ _ _ _ _ _
| | | |_ _| |__ _ __(_) __| | | | |_ _| |__
| |_| | | | | '_ \\| '__| |/ _\` | |_| | | | | '_ \\
| _ | |_| | |_) | | | | (_| | _ | |_| | |_) |
|_| |_|\\__, |_.__/|_| |_|\\__,_|_| |_|\\__,_|_.__/
|___/
v${version} - Universal Hybrid Data MCP Server
`;
}
// Store source configurations globally for tool access
let globalDatabases: DatabaseSourceConfig[] = [];
let globalStorages: StorageSourceConfig[] = [];
/**
* Get all configured database sources
*/
export function getDatabases() {
return globalDatabases;
}
/**
* Get all configured storage sources
*/
export function getStorages() {
return globalStorages;
}
/**
* Create success response for tools
*/
function createToolSuccessResponse(data: any) {
return {
content: [
{
type: "text" as const,
text: JSON.stringify(data, null, 2),
},
],
};
}
/**
* Create error response for tools
*/
function createToolErrorResponse(message: string, code: string) {
return {
content: [
{
type: "text" as const,
text: JSON.stringify({ error: code, message }, null, 2),
},
],
isError: true,
};
}
/**
* Register all tools with the MCP server
*/
function registerTools(server: McpServer): void {
// Register database tools for each database source
for (const db of globalDatabases) {
const sourceId = db.id;
const isDefault = sourceId === "default" || globalDatabases[0] === db;
const toolSuffix = isDefault ? "" : `_${sourceId}`;
// execute_sql tool
server.tool(
`execute_sql${toolSuffix}`,
`Execute SQL query on the '${sourceId}' database. Returns query results as JSON.`,
{
sql: z.string().describe("SQL query or multiple SQL statements to execute"),
},
async (args) => {
// TODO: Implement actual SQL execution with database connector
return createToolSuccessResponse({
message: `SQL execution on '${sourceId}' not yet implemented`,
sql: args.sql,
source_id: sourceId,
note: "Database connector integration pending",
});
}
);
// search_objects (database) tool - search database schema objects
server.tool(
`search_db_objects${toolSuffix}`,
`Search database objects (schemas, tables, columns, procedures) on '${sourceId}'. Supports SQL LIKE patterns.`,
{
object_type: z.enum(["schema", "table", "column", "procedure", "index"]).optional()
.describe("Type of database object to search for"),
pattern: z.string().optional().describe("Search pattern (SQL LIKE syntax, default: '%' for all)"),
schema: z.string().optional().describe("Filter by schema name"),
table: z.string().optional().describe("Filter by table name (for columns)"),
},
async (args) => {
// TODO: Implement actual database object search
return createToolSuccessResponse({
message: `Database object search on '${sourceId}' not yet implemented`,
filter: args,
source_id: sourceId,
note: "Database connector integration pending",
});
}
);
console.error(` - execute_sql${toolSuffix} registered`);
console.error(` - search_db_objects${toolSuffix} registered`);
}
// Register storage tools for each storage source
for (const storage of globalStorages) {
const sourceId = storage.id;
const isDefault = sourceId === "default" || globalStorages[0] === storage;
const toolSuffix = isDefault ? "" : `_${sourceId}`;
// list_buckets tool
server.tool(
`list_buckets${toolSuffix}`,
`List all storage buckets available in the '${sourceId}' source. Returns bucket names, creation dates, and locations.`,
{},
async () => {
// TODO: Implement actual bucket listing with storage provider
return createToolSuccessResponse({
message: `Bucket listing from '${sourceId}' not yet implemented`,
source_id: sourceId,
endpoint: storage.endpoint,
note: "Storage provider integration pending",
});
}
);
// list_objects tool
server.tool(
`list_objects${toolSuffix}`,
`List objects in a bucket from the '${sourceId}' source. Supports prefix filtering, delimiter for hierarchical listing, and pagination.`,
{
bucket: z.string().describe("The name of the bucket to list objects from"),
prefix: z.string().optional().describe("Filter objects by key prefix (e.g., 'folder/subfolder/')"),
delimiter: z.string().optional().describe("Character to use for grouping keys (e.g., '/' for folder-like listing)"),
max_keys: z.number().optional().describe("Maximum number of objects to return (default: 1000)"),
continuation_token: z.string().optional().describe("Token for pagination, returned from previous request"),
},
async (args) => {
// TODO: Implement actual object listing with storage provider
return createToolSuccessResponse({
message: `Object listing from '${sourceId}' not yet implemented`,
bucket: args.bucket,
filter: {
prefix: args.prefix,
delimiter: args.delimiter,
max_keys: args.max_keys,
},
source_id: sourceId,
note: "Storage provider integration pending",
});
}
);
// get_object tool
server.tool(
`get_object${toolSuffix}`,
`Retrieve the content of an object from the '${sourceId}' source. Text files are returned as plain text, binary files as base64. Default max size: 10MB`,
{
bucket: z.string().describe("The name of the bucket containing the object"),
key: z.string().describe("The object key (full path) to retrieve"),
max_size: z.number().optional().describe("Maximum content size to read in bytes (default: 10MB). Larger files will be truncated."),
},
async (args) => {
// TODO: Implement actual object retrieval with storage provider
return createToolSuccessResponse({
message: `Object retrieval from '${sourceId}' not yet implemented`,
bucket: args.bucket,
key: args.key,
max_size: args.max_size || 10485760,
source_id: sourceId,
note: "Storage provider integration pending",
});
}
);
// get_object_metadata tool
server.tool(
`get_object_metadata${toolSuffix}`,
`Get metadata for an object from the '${sourceId}' source without downloading content. Returns content type, size, last modified date, ETag, storage class, and custom metadata.`,
{
bucket: z.string().describe("The name of the bucket containing the object"),
key: z.string().describe("The object key (full path) to get metadata for"),
},
async (args) => {
// TODO: Implement actual metadata retrieval with storage provider
return createToolSuccessResponse({
message: `Metadata retrieval from '${sourceId}' not yet implemented`,
bucket: args.bucket,
key: args.key,
source_id: sourceId,
note: "Storage provider integration pending",
});
}
);
// search_objects tool
server.tool(
`search_objects${toolSuffix}`,
`Search for objects in a bucket from the '${sourceId}' source. Supports filtering by prefix, suffix, extensions, glob patterns, file size, and modification date.`,
{
bucket: z.string().describe("The name of the bucket to search in"),
prefix: z.string().optional().describe("Filter objects by key prefix"),
suffix: z.string().optional().describe("Filter objects by key suffix"),
extensions: z.array(z.string()).optional().describe("Filter by file extensions (e.g., ['.jpg', '.png'])"),
pattern: z.string().optional().describe("Glob pattern to match object keys (e.g., '*.txt', 'folder/**/file.json')"),
min_size: z.number().optional().describe("Minimum file size in bytes"),
max_size: z.number().optional().describe("Maximum file size in bytes"),
modified_after: z.string().optional().describe("Filter objects modified after this date (ISO 8601 format)"),
modified_before: z.string().optional().describe("Filter objects modified before this date (ISO 8601 format)"),
max_results: z.number().optional().describe("Maximum number of results to return (default: 100)"),
},
async (args) => {
// TODO: Implement actual object search with storage provider
return createToolSuccessResponse({
message: `Object search in '${sourceId}' not yet implemented`,
bucket: args.bucket,
filter: {
prefix: args.prefix,
suffix: args.suffix,
extensions: args.extensions,
pattern: args.pattern,
min_size: args.min_size,
max_size: args.max_size,
modified_after: args.modified_after,
modified_before: args.modified_before,
max_results: args.max_results || 100,
},
source_id: sourceId,
note: "Storage provider integration pending",
});
}
);
console.error(` - list_buckets${toolSuffix} registered`);
console.error(` - list_objects${toolSuffix} registered`);
console.error(` - get_object${toolSuffix} registered`);
console.error(` - get_object_metadata${toolSuffix} registered`);
console.error(` - search_objects${toolSuffix} registered`);
}
}
/**
* Main entry point for the HybridHub server
*/
export async function main(): Promise<void> {
try {
// Load source configurations
const sourceConfigs = await resolveSourceConfigs();
if (!sourceConfigs) {
console.error(`
ERROR: No data sources configured!
Please provide configuration in one of these ways:
1. TOML config file: --config=path/to/hybridhub.toml or ./hybridhub.toml
2. Environment variables in .env file
Database configuration:
DSN=postgres://user:pass@localhost:5432/mydb
or
DB_TYPE=postgres DB_HOST=localhost DB_NAME=mydb DB_USER=user DB_PASSWORD=pass
Storage configuration:
STORAGE_TYPE=s3 STORAGE_ENDPOINT=http://localhost:9000 STORAGE_ACCESS_KEY=xxx STORAGE_SECRET_KEY=xxx
See documentation for more details.
`);
process.exit(1);
}
// Store configurations globally
globalDatabases = sourceConfigs.databases;
globalStorages = sourceConfigs.storages;
console.error(`Configuration source: ${sourceConfigs.source}`);
console.error("");
console.error(getSourcesSummary(sourceConfigs.databases, sourceConfigs.storages));
// Create MCP server factory function
const createServer = () => {
const server = new McpServer({
name: SERVER_NAME,
version: SERVER_VERSION,
});
// Register all tools
console.error("");
console.error("Registering tools...");
registerTools(server);
return server;
};
// Resolve transport type
const transportData = resolveTransport();
const portData = transportData.type === "http" ? resolvePort() : null;
// Print banner
console.error(generateBanner(SERVER_VERSION));
// Print summary
const totalDbTools = globalDatabases.length * 2; // execute_sql + search_db_objects
const totalStorageTools = globalStorages.length * 5; // list_buckets + list_objects + get_object + get_object_metadata + search_objects
const totalTools = totalDbTools + totalStorageTools;
console.error("Summary:");
console.error(` - Database sources: ${sourceConfigs.databases.length}`);
console.error(` - Storage sources: ${sourceConfigs.storages.length}`);
console.error(` - Total tools: ${totalTools}`);
console.error(` - Transport: ${transportData.type}`);
if (portData) {
console.error(` - Port: ${portData.port}`);
}
console.error("");
// Set up transport-specific server
if (transportData.type === "http") {
const port = portData!.port;
const app = express();
// Enable JSON parsing
app.use(express.json());
// Resolve API key configuration
const apiKeyData = resolveApiKey();
const configuredApiKey = apiKeyData?.apiKey || null;
if (configuredApiKey) {
console.error(`API key authentication: ENABLED`);
} else {
console.error(`API key authentication: DISABLED`);
}
// API Key authentication middleware
const apiKeyAuthMiddleware = (req: Request, res: Response, next: NextFunction): void => {
if (!configuredApiKey) {
return next();
}
const providedApiKey = req.headers["x-api-key"] || req.headers["X-API-Key"];
if (!providedApiKey || providedApiKey !== configuredApiKey) {
res.status(401).json({
error: "Unauthorized",
message: "Invalid or missing API key",
});
return;
}
next();
};
// CORS headers
app.use((req, res, next) => {
const origin = req.headers.origin;
if (origin && !origin.startsWith("http://localhost") && !origin.startsWith("https://localhost")) {
res.status(403).json({ error: "Forbidden origin" });
return;
}
res.header("Access-Control-Allow-Origin", origin || "http://localhost");
res.header("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
res.header("Access-Control-Allow-Headers", "Content-Type, Mcp-Session-Id, x-api-key, X-API-Key");
res.header("Access-Control-Allow-Credentials", "true");
if (req.method === "OPTIONS") {
res.sendStatus(200);
return;
}
next();
});
// Health check endpoint
app.get("/healthz", (req, res) => {
res.status(200).send("OK");
});
// Sources API endpoint
app.get("/api/sources", apiKeyAuthMiddleware, (req, res) => {
res.json({
databases: globalDatabases.map((db) => ({
id: db.id,
type: db.type,
host: db.host,
database: db.database,
})),
storages: globalStorages.map((s) => ({
id: s.id,
type: s.type,
endpoint: s.endpoint,
region: s.region,
})),
});
});
// MCP endpoint - GET not supported in stateless mode
app.get("/mcp", apiKeyAuthMiddleware, (req, res) => {
res.status(405).json({
error: "Method Not Allowed",
message: "SSE streaming is not supported. Use POST requests.",
});
});
// MCP endpoint - POST for stateless requests
app.post("/mcp", apiKeyAuthMiddleware, async (req, res) => {
try {
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
enableJsonResponse: true,
});
const server = createServer();
await server.connect(transport);
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error("Error handling request:", error);
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
// Start HTTP server
app.listen(port, "0.0.0.0", () => {
console.error(`MCP server endpoint at http://0.0.0.0:${port}/mcp`);
console.error(`Health check at http://0.0.0.0:${port}/healthz`);
console.error(`Sources API at http://0.0.0.0:${port}/api/sources`);
console.error("");
console.error("Server is running. Press Ctrl+C to stop.");
});
} else {
// STDIO transport
const server = createServer();
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("MCP server running on stdio");
process.on("SIGINT", async () => {
console.error("Shutting down...");
await transport.close();
process.exit(0);
});
}
} catch (err) {
console.error("Fatal error:", err);
process.exit(1);
}
}