MCP BatchIt
by ryanjoachim
- src
#!/usr/bin/env node
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"
import { Client } from "@modelcontextprotocol/sdk/client/index.js"
import { WebSocketClientTransport } from "@modelcontextprotocol/sdk/client/websocket.js"
import { z } from "zod"
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js"
import { ChildProcess } from "child_process"
import { existsSync } from "fs"
import { isAbsolute } from "path"
// Array of patterns that indicate self-referential usage
const SELF_REFERENCE_PATTERNS = [
// Direct file path references
"mcp-batchit/build/index.js",
"mcp-batchit/dist/index.js",
"mcp-batchit/lib/index.js",
// NPM package references
"@modelcontextprotocol/batchit",
"@modelcontextprotocol/server-batchit",
// Common variations
"mcp-batchit",
"batchit",
"server-batchit",
]
// Transport error handling
enum TransportErrorType {
CommandNotFound = "CommandNotFound",
ConnectionFailed = "ConnectionFailed",
ValidationFailed = "ValidationFailed",
ConfigurationInvalid = "ConfigurationInvalid",
}
class TransportError extends Error {
constructor(
public type: TransportErrorType,
message: string,
public cause?: Error
) {
super(message)
this.name = "TransportError"
Error.captureStackTrace(this, TransportError)
}
}
// Server Type Definitions
interface FilesystemServerConfig {
rootDirectory?: string
permissions?: string
watchMode?: boolean
}
interface DatabaseServerConfig {
database: string
readOnly?: boolean
poolSize?: number
}
interface GenericServerConfig {
[key: string]: unknown
}
type ServerType =
| { type: "filesystem"; config: FilesystemServerConfig }
| { type: "database"; config: DatabaseServerConfig }
| { type: "generic"; config: GenericServerConfig }
// Transport Configuration
type TransportConfig =
| {
type: "stdio"
command: string
args?: string[]
env?: Record<string, string>
}
| {
type: "websocket"
url: string
options?: Record<string, unknown>
}
interface ServerIdentity {
name: string
serverType: ServerType
transport: TransportConfig
maxIdleTimeMs?: number
}
interface ServerConnection {
client: Client
transport: WebSocketClientTransport | StdioClientTransport
childProcess?: ChildProcess
lastUsed: number
identity: ServerIdentity
}
interface HPCContentItem {
type: string
text?: string
}
interface HPCErrorResponse {
isError: true
error?: string
message?: string
content?: HPCContentItem[]
}
function isHPCErrorResponse(value: unknown): value is HPCErrorResponse {
return (
value !== null &&
typeof value === "object" &&
"isError" in value &&
value.isError === true
)
}
// Type guard for StdioClientTransport
function isStdioTransport(transport: any): transport is StdioClientTransport {
return "start" in transport
}
// Schema Definitions
const ServerTypeSchema = z.discriminatedUnion("type", [
z.object({
type: z.literal("filesystem"),
config: z.object({
rootDirectory: z.string().optional(),
permissions: z.string().optional(),
watchMode: z.boolean().optional(),
}),
}),
z.object({
type: z.literal("database"),
config: z.object({
database: z.string(),
readOnly: z.boolean().optional(),
poolSize: z.number().optional(),
}),
}),
z.object({
type: z.literal("generic"),
config: z.record(z.unknown()),
}),
])
const TransportConfigSchema = z.discriminatedUnion("type", [
z.object({
type: z.literal("stdio"),
command: z.string(),
args: z.array(z.string()).optional(),
env: z.record(z.string()).optional(),
}),
z.object({
type: z.literal("websocket"),
url: z.string(),
options: z.record(z.unknown()).optional(),
}),
])
const BatchArgsSchema = z.object({
targetServer: z.object({
name: z.string(),
serverType: ServerTypeSchema,
transport: TransportConfigSchema,
maxIdleTimeMs: z.number().optional(),
}),
operations: z.array(
z.object({
tool: z.string(),
arguments: z.record(z.unknown()).default({}),
})
),
options: z
.object({
maxConcurrent: z.number().default(10),
timeoutMs: z.number().default(30000),
stopOnError: z.boolean().default(false),
keepAlive: z.boolean().default(false),
})
.default({
maxConcurrent: 10,
timeoutMs: 30000,
stopOnError: false,
keepAlive: false,
}),
})
// Connection Management
class ConnectionManager {
private connections = new Map<string, ServerConnection>()
private cleanupIntervals = new Map<string, NodeJS.Timeout>()
createKeyForIdentity(identity: ServerIdentity): string {
return JSON.stringify({
name: identity.name,
serverType: identity.serverType,
transport: identity.transport,
})
}
private validateStdioConfig(
config: Extract<TransportConfig, { type: "stdio" }>
) {
if (!config.command) {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"Command is required for stdio transport"
)
}
if (!config.args?.length) {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"At least one argument (server file path) is required"
)
}
// For node commands, validate the file exists
if (config.command === "node") {
const serverFile = config.args[0]
if (!isAbsolute(serverFile)) {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"Server file path must be absolute when using node command"
)
}
if (!existsSync(serverFile)) {
throw new TransportError(
TransportErrorType.ValidationFailed,
`Server file not found: ${serverFile}`
)
}
// Prevent the BatchIt aggregator from spawning itself
const fullCommand = [config.command, ...(config.args || [])].join(" ")
if (
SELF_REFERENCE_PATTERNS.some((pattern) =>
fullCommand.toLowerCase().includes(pattern.toLowerCase())
)
) {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"Cannot spawn the BatchIt aggregator itself. Provide a valid MCP server file instead."
)
}
}
}
private validateWebSocketConfig(
config: Extract<TransportConfig, { type: "websocket" }>
) {
try {
const url = new URL(config.url)
if (url.protocol !== "ws:" && url.protocol !== "wss:") {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"WebSocket URL must use ws:// or wss:// protocol"
)
}
} catch (error) {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"Invalid WebSocket URL",
error instanceof Error ? error : undefined
)
}
}
async getOrCreateConnection(
identity: ServerIdentity
): Promise<ServerConnection> {
const serverKey = this.createKeyForIdentity(identity)
if (this.connections.has(serverKey)) {
const conn = this.connections.get(serverKey)!
conn.lastUsed = Date.now()
return conn
}
const transport = await this.createTransport(identity.transport)
const client = new Client(
{ name: "mcp-batchit", version: "1.0.0" },
{ capabilities: {} }
)
await client.connect(transport)
const connection: ServerConnection = {
client,
transport,
lastUsed: Date.now(),
identity,
}
this.connections.set(serverKey, connection)
this.setupMonitoring(serverKey, connection)
this.setupCleanupInterval(serverKey)
return connection
}
private async createTransport(
config: TransportConfig
): Promise<WebSocketClientTransport | StdioClientTransport> {
switch (config.type) {
case "stdio": {
try {
this.validateStdioConfig(config)
try {
const transport = new StdioClientTransport({
command: config.command,
args: config.args,
env: config.env,
stderr: "pipe",
})
// Test the transport
return transport
} catch (error) {
if (
error &&
typeof error === "object" &&
"code" in error &&
error.code === "ENOENT"
) {
throw new TransportError(
TransportErrorType.CommandNotFound,
`Command '${config.command}' not found in PATH. If using 'npx', ensure it's installed globally. Consider using 'node' with direct path to server JS file instead.`
)
}
throw error
}
} catch (error) {
if (error instanceof TransportError) {
throw new McpError(ErrorCode.InvalidParams, error.message)
} else {
throw new McpError(
ErrorCode.InternalError,
`Failed to create stdio transport: ${
error instanceof Error ? error.message : String(error)
}`
)
}
}
}
case "websocket": {
try {
this.validateWebSocketConfig(config)
const wsUrl =
config.url.startsWith("ws://") || config.url.startsWith("wss://")
? config.url
: `ws://${config.url}`
const transport = new WebSocketClientTransport(new URL(wsUrl))
return transport
} catch (error) {
if (error instanceof TransportError) {
throw new McpError(ErrorCode.InvalidParams, error.message)
} else {
throw new McpError(
ErrorCode.InternalError,
`Failed to create WebSocket transport: ${
error instanceof Error ? error.message : String(error)
}`
)
}
}
}
}
}
private setupMonitoring(
serverKey: string,
connection: ServerConnection
): void {
if (isStdioTransport(connection.transport)) {
// For stdio transports, we can monitor stderr
const stderr = connection.transport.stderr
if (stderr) {
stderr.on("data", (data: Buffer) => {
console.error(`[${connection.identity.name}] ${data.toString()}`)
})
}
}
// Monitor transport errors
connection.transport.onerror = (error: Error) => {
console.error(`Transport error:`, error)
this.closeConnection(serverKey)
}
}
private setupCleanupInterval(serverKey: string): void {
const interval = setInterval(() => {
const conn = this.connections.get(serverKey)
if (!conn) return
const idleTime = Date.now() - conn.lastUsed
if (idleTime > (conn.identity.maxIdleTimeMs ?? 300000)) {
// 5min default
this.closeConnection(serverKey)
}
}, 60000) // Check every minute
this.cleanupIntervals.set(serverKey, interval)
}
async closeConnection(serverKey: string): Promise<void> {
const conn = this.connections.get(serverKey)
if (!conn) return
try {
await conn.client.close()
await conn.transport.close()
} catch (error) {
console.error(`Error closing connection for ${serverKey}:`, error)
}
this.connections.delete(serverKey)
const interval = this.cleanupIntervals.get(serverKey)
if (interval) {
clearInterval(interval)
this.cleanupIntervals.delete(serverKey)
}
}
async closeAll(): Promise<void> {
for (const serverKey of this.connections.keys()) {
await this.closeConnection(serverKey)
}
}
}
// Batch Execution
interface Operation {
tool: string
arguments: Record<string, unknown>
}
interface OperationResult {
tool: string
success: boolean
result?: unknown
error?: string
durationMs: number
}
class BatchExecutor {
constructor(private connectionManager: ConnectionManager) {}
async executeBatch(
identity: ServerIdentity,
operations: Operation[],
options: {
maxConcurrent: number
timeoutMs: number
stopOnError: boolean
keepAlive?: boolean
}
): Promise<OperationResult[]> {
const connection = await this.connectionManager.getOrCreateConnection(
identity
)
const results: OperationResult[] = []
const pending = [...operations]
const running = new Set<Promise<OperationResult>>()
try {
while (pending.length > 0 || running.size > 0) {
while (pending.length > 0 && running.size < options.maxConcurrent) {
const op = pending.shift()!
const promise = this.executeOperation(
connection,
op,
options.timeoutMs
)
running.add(promise)
promise.then((res) => {
running.delete(promise)
results.push(res)
if (!res.success && options.stopOnError) {
pending.length = 0
}
})
}
if (running.size > 0) {
await Promise.race(running)
}
}
} finally {
if (!options.keepAlive) {
await this.connectionManager.closeConnection(
this.connectionManager.createKeyForIdentity(identity)
)
}
}
return results
}
private getErrorMessage(result: HPCErrorResponse): string {
// Direct error/message properties
if (result.error || result.message) {
return result.error ?? result.message ?? "Unknown HPC error"
}
// Look for error in content array
if (result.content?.length) {
const textContent = result.content
.filter((item) => item.type === "text" && item.text)
.map((item) => item.text)
.filter((text): text is string => text !== undefined)
.join(" ")
if (textContent) {
return textContent
}
}
return "Unknown HPC error"
}
private async executeOperation(
connection: ServerConnection,
operation: Operation,
timeoutMs: number
): Promise<OperationResult> {
const start = Date.now()
try {
const result = await Promise.race([
connection.client.callTool({
name: operation.tool,
arguments: operation.arguments,
}),
new Promise<never>((_, reject) =>
setTimeout(
() =>
reject(
new McpError(ErrorCode.RequestTimeout, "Operation timed out")
),
timeoutMs
)
),
])
if (isHPCErrorResponse(result)) {
return {
tool: operation.tool,
success: false,
error: this.getErrorMessage(result),
durationMs: Date.now() - start,
}
}
return {
tool: operation.tool,
success: true,
result,
durationMs: Date.now() - start,
}
} catch (error) {
return {
tool: operation.tool,
success: false,
error: error instanceof Error ? error.message : String(error),
durationMs: Date.now() - start,
}
}
}
}
// Server Setup
const connectionManager = new ConnectionManager()
const batchExecutor = new BatchExecutor(connectionManager)
const server = new McpServer({
name: "mcp-batchit",
version: "1.0.0",
})
// Define the tool's schema shape (required properties for tool registration)
const toolSchema = {
targetServer: BatchArgsSchema.shape.targetServer,
operations: BatchArgsSchema.shape.operations,
options: BatchArgsSchema.shape.options,
}
server.tool(
"batch_execute",
`
Execute multiple operations in batch on a specified MCP server. You must provide a real MCP server (like @modelcontextprotocol/server-filesystem). The aggregator will reject any attempt to spawn itself.
Transport Configuration:
1. For stdio transport (recommended for local servers):
Using node with direct file path (preferred):
{
"transport": {
"type": "stdio",
"command": "node",
"args": ["C:/path/to/server.js"]
}
}
Using npx (requires global npx installation):
{
"transport": {
"type": "stdio",
"command": "npx",
"args": ["@modelcontextprotocol/server-filesystem"]
}
}
2. For WebSocket transport (for connecting to running servers):
{
"transport": {
"type": "websocket",
"url": "ws://localhost:3000"
}
}
Usage:
- Provide "targetServer" configuration with:
- name: Unique identifier for the server
- serverType: Type and configuration of the server (filesystem, database, or generic)
- transport: Connection method (stdio or websocket) and its configuration
- Provide "operations" as an array of objects with:
- tool: The tool name on the target server
- arguments: The JSON arguments to pass
- Options:
- maxConcurrent: Maximum concurrent operations (default: 10)
- timeoutMs: Timeout per operation in milliseconds (default: 30000)
- stopOnError: Whether to stop on first error (default: false)
- keepAlive: Keep connection after batch completion (default: false)
Complete Example:
{
"targetServer": {
"name": "local-fs",
"serverType": {
"type": "filesystem",
"config": {
"rootDirectory": "C:/data",
"watchMode": true
}
},
"transport": {
"type": "stdio",
"command": "node",
"args": ["C:/path/to/filesystem-server.js"]
}
},
"operations": [
{ "tool": "createFile", "arguments": { "path": "test1.txt", "content": "Hello" } },
{ "tool": "createFile", "arguments": { "path": "test2.txt", "content": "World" } }
],
"options": {
"maxConcurrent": 3,
"stopOnError": true
}
}`,
toolSchema,
async (args) => {
const parsed = BatchArgsSchema.safeParse(args)
if (!parsed.success) {
throw new McpError(ErrorCode.InvalidParams, parsed.error.message)
}
const { targetServer, operations, options } = parsed.data
const results = await batchExecutor.executeBatch(
targetServer,
operations,
options
)
return {
content: [
{
type: "text",
text: JSON.stringify(
{
targetServer: targetServer.name,
summary: {
successCount: results.filter((r) => r.success).length,
failCount: results.filter((r) => !r.success).length,
totalDurationMs: results.reduce(
(sum, r) => sum + r.durationMs,
0
),
},
operations: results,
},
null,
2
),
},
],
}
}
)
// Startup
;(async function main() {
const transport = new StdioServerTransport()
await server.connect(transport)
console.error("mcp-batchit is running on stdio. Ready to batch-execute!")
process.on("SIGINT", cleanup)
process.on("SIGTERM", cleanup)
})().catch((err) => {
console.error("Fatal error in aggregator server:", err)
process.exit(1)
})
async function cleanup() {
console.error("Shutting down, closing all connections...")
await connectionManager.closeAll()
await server.close()
process.exit(0)
}