Skip to main content
Glama
IBM
by IBM
toolProcessor.ts20.8 kB
/** * @fileoverview YAML Tool Processor - Simplified pipeline for YAML tool processing * Consolidates YAML parsing, tool configuration creation, and dependency management * into a single, cohesive class with a simple 3-step workflow: parse → create → cache * * @module src/ibmi-mcp-server/utils/yaml/yamlToolProcessor */ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { watch, FSWatcher } from "fs"; import { config } from "@/config/index.js"; import { SourceManager } from "../../services/sourceManager.js"; import { SQLToolFactory } from "./toolFactory.js"; import { ToolsetManager } from "./toolsetManager.js"; import { ErrorHandler, logger } from "@/utils/internal/index.js"; import { requestContextService, RequestContext, } from "@/utils/internal/requestContext.js"; import { JsonRpcErrorCode, McpError } from "@/types-global/errors.js"; import { ProcessedSQLTool, CachedToolConfig, ConfigSource, ConfigBuildResult, } from "./types.js"; import { SqlToolsConfig } from "../../schemas/index.js"; import { SourceConfig } from "../../schemas/index.js"; import { resolve } from "path"; import { existsSync, statSync } from "fs"; import { ToolConfigBuilder } from "./toolConfigBuilder.js"; import { createHandlerFromDefinition } from "./toolDefinitions.js"; /** * Result of tool processing operation */ export interface ToolProcessingResult { /** Whether processing was successful */ success: boolean; /** Number of tools processed */ toolCount: number; /** Number of sources registered */ sourceCount: number; /** Error message if processing failed */ error?: string; /** Processing statistics */ stats?: { toolsProcessed: number; sourcesRegistered: number; toolsetMappings: number; }; } /** * YAML Tool Processor - Simplified pipeline for YAML tool processing * * Provides a clean, simple workflow: * 1. Parse YAML tools from configuration * 2. Create standardized tool configurations * 3. Cache configurations for fast registration * * Internally manages all dependencies and complexity, exposing only a simple API. */ export class ToolProcessor { private sourceManager: SourceManager; private toolsetManager: ToolsetManager; private isInitialized: boolean = false; private yamlConfig: SqlToolsConfig | null = null; private processedTools: ProcessedSQLTool[] = []; private toolConfigs: CachedToolConfig[] = []; // File watching capabilities private static changeCallbacks: Map< string, (filePath: string, eventType: string, context?: RequestContext) => void > = new Map(); private static nextCallbackId = 1; private static watchers: Map<string, FSWatcher> = new Map(); private static watchedPaths: Set<string> = new Set(); constructor() { // Initialize internal dependencies this.sourceManager = SourceManager.getInstance(); this.toolsetManager = ToolsetManager.getInstance(); } /** * Initialize the processor and all internal dependencies * @param context - Request context for logging */ async initialize(context?: RequestContext): Promise<void> { const operationContext = context || requestContextService.createRequestContext({ operation: "ToolProcessor.initialize", }); return ErrorHandler.tryCatch( async () => { if (this.isInitialized) { logger.debug(operationContext, "ToolProcessor already initialized"); return; } logger.info(operationContext, "Initializing YAML Tool Processor"); // Validate configuration if (!config.toolsYamlPath) { throw new McpError( JsonRpcErrorCode.InvalidParams, "YAML tools path not configured. Please set TOOLS_YAML_PATH.", { toolsYamlPath: config.toolsYamlPath }, ); } // Parse YAML configuration this.yamlConfig = await this.parseYamlConfig(operationContext); // Register sources with source manager if (this.yamlConfig.sources) { await this.registerSources(this.yamlConfig.sources, operationContext); } // Initialize SQL executor SQLToolFactory.initialize(this.sourceManager); // Initialize toolset manager await this.toolsetManager.initialize(this.yamlConfig, operationContext); this.isInitialized = true; logger.info( operationContext, "YAML Tool Processor initialized successfully", ); }, { operation: "ToolProcessor.initialize", context: operationContext, errorCode: JsonRpcErrorCode.InternalError, }, ); } /** * Process YAML tools into standardized configurations * @param context - Request context for logging * @returns Tool processing result */ async processTools(context?: RequestContext): Promise<ToolProcessingResult> { const operationContext = context || requestContextService.createRequestContext({ operation: "ToolProcessor.processTools", }); return ErrorHandler.tryCatch( async () => { if (!this.isInitialized) { throw new McpError( JsonRpcErrorCode.InternalError, "ToolProcessor not initialized. Call initialize() first.", ); } logger.info( operationContext, "Processing YAML tools into configurations", ); // Process tools from YAML config this.processedTools = await this.processYamlTools(operationContext); // Apply toolsets filtering if specified let filteredTools = this.processedTools; const selectedToolsets = config.selectedToolsets; if ( selectedToolsets && Array.isArray(selectedToolsets) && selectedToolsets.length > 0 ) { const configBuilder = ToolConfigBuilder.getInstance(); filteredTools = configBuilder.filterToolsByToolsets( this.processedTools, selectedToolsets, ); logger.info( operationContext, `Filtered tools by toolsets [${selectedToolsets.join(", ")}]: ${filteredTools.length}/${this.processedTools.length} tools selected`, ); } // Create tool configurations this.toolConfigs = await this.createToolConfigurations( filteredTools, operationContext, ); const result: ToolProcessingResult = { success: true, toolCount: this.toolConfigs.length, sourceCount: Object.keys(this.yamlConfig?.sources || {}).length, stats: { toolsProcessed: this.processedTools.length, sourcesRegistered: Object.keys( this.sourceManager.getSourcesStatus(), ).length, toolsetMappings: this.toolsetManager.getToolsetStats().totalTools, }, }; logger.info( { ...operationContext, result }, `YAML tools processed successfully: ${result.toolCount} tools, ${result.sourceCount} sources`, ); return result; }, { operation: "ToolProcessor.processTools", context: operationContext, errorCode: JsonRpcErrorCode.InternalError, }, ); } /** * Register processed tools with MCP server * @param server - MCP server instance * @param context - Request context for logging */ async registerWithServer( server: McpServer, context?: RequestContext, ): Promise<void> { const operationContext = context || requestContextService.createRequestContext({ operation: "ToolProcessor.registerWithServer", }); return ErrorHandler.tryCatch( async () => { // Register YAML-generated tools if (this.toolConfigs.length > 0) { logger.info( operationContext, `Registering ${this.toolConfigs.length} YAML-generated tools with MCP server`, ); let yamlRegisteredCount = 0; for (const config of this.toolConfigs) { try { const handler = createHandlerFromDefinition(config); server.registerTool( config.name, { title: config.title, description: config.description, inputSchema: config.inputSchema.shape, outputSchema: config.outputSchema.shape, annotations: config.annotations, }, handler, ); yamlRegisteredCount++; } catch (error) { logger.error( operationContext, `Failed to register YAML tool ${config.name}: ${error instanceof Error ? error.message : String(error)}`, ); } } logger.info( operationContext, `Successfully registered ${yamlRegisteredCount} YAML-generated tools`, ); } logger.info( operationContext, `Total tools registered with MCP server: ${this.toolConfigs.length}`, ); }, { operation: "ToolProcessor.registerWithServer", context: operationContext, errorCode: JsonRpcErrorCode.InternalError, }, ); } /** * Get processed tool configurations (for caching) * @returns Array of cached tool configurations */ getToolConfigurations(): CachedToolConfig[] { return [...this.toolConfigs]; } /** * Get the YAML configuration * @returns The parsed YAML configuration */ getYamlConfig(): SqlToolsConfig | null { return this.yamlConfig; } /** * Check if a tool should be registered based on toolsets filtering * @param toolName - Name of the tool to check * @returns Whether the tool should be registered */ public shouldRegisterToolPublic(toolName: string): boolean { return this.shouldRegisterTool(toolName); } /** * Check if a tool should be registered based on toolsets filtering * @param toolName - Name of the tool to check * @returns Whether the tool should be registered * @private */ private shouldRegisterTool(toolName: string): boolean { const selectedToolsets = config.selectedToolsets; // If no toolsets filter is applied, register all tools if (!selectedToolsets || selectedToolsets.length === 0) { return true; } // Find which toolsets contain this tool if (!this.yamlConfig?.toolsets) { return false; } for (const [toolsetName, toolsetConfig] of Object.entries( this.yamlConfig.toolsets, )) { if ( toolsetConfig.tools.includes(toolName) && selectedToolsets.includes(toolsetName) ) { return true; } } return false; } /** * Get processing statistics * @returns Processing statistics */ getStats(): { initialized: boolean; toolCount: number; sourceCount: number; toolsetCount: number; } { return { initialized: this.isInitialized, toolCount: this.toolConfigs.length, sourceCount: Object.keys(this.yamlConfig?.sources || {}).length, toolsetCount: Object.keys(this.toolsetManager.getSummary()).length, }; } /** * Parse YAML configuration from configured path * @param context - Request context for logging * @returns Parsed YAML configuration * @private */ private async parseYamlConfig( context: RequestContext, ): Promise<SqlToolsConfig> { const configBuilder = ToolConfigBuilder.getInstance(); // Configure sources based on configured path const sources = this.buildConfigSources(config.toolsYamlPath!, context); // Build configuration using ToolConfigBuilder const configResult = await configBuilder.buildFromSources( sources, undefined, context, ); if (!configResult.success || !configResult.config) { const errorMessage = configResult.errors ? configResult.errors.join(", ") : "Unknown build error"; throw new McpError( JsonRpcErrorCode.InvalidParams, `Failed to build YAML configuration: ${errorMessage}`, { toolsYamlPath: config.toolsYamlPath, errors: configResult.errors, }, ); } if (configResult.resolvedFilePaths?.length) { ToolProcessor.startWatching(configResult.resolvedFilePaths, context); } return configResult.config; } /** * Build configuration sources based on YAML tools path type * @param yamlToolsPath - Path to YAML tools * @param context - Request context * @private */ private buildConfigSources( yamlToolsPath: string | string[], context: RequestContext, ): ConfigSource[] { if (Array.isArray(yamlToolsPath)) { return yamlToolsPath.map((path) => ({ type: "file" as const, path, required: true, })); } const resolvedPath = resolve(yamlToolsPath); if (existsSync(resolvedPath)) { const stats = statSync(resolvedPath); if (stats.isDirectory()) { logger.debug(context, `Detected directory path: ${yamlToolsPath}`); return [{ type: "directory", path: yamlToolsPath, required: true }]; } else { logger.debug(context, `Detected file path: ${yamlToolsPath}`); return [{ type: "file", path: yamlToolsPath, required: true }]; } } else { // Path doesn't exist, treat as file (ToolConfigBuilder will validate appropriately) return [{ type: "file", path: yamlToolsPath, required: true }]; } } /** * Register sources from YAML configuration * @param sources - Source configurations * @param context - Request context * @private */ private async registerSources( sources: Record<string, SourceConfig>, context: RequestContext, ): Promise<void> { logger.info(context, `Registering ${Object.keys(sources).length} sources`); for (const [sourceName, sourceConfig] of Object.entries(sources)) { await this.sourceManager.registerSource( sourceName, sourceConfig, context, ); } } /** * Process tools from YAML configuration * @param _context - Request context * @returns Array of processed tools * @private */ private async processYamlTools( _context: RequestContext, ): Promise<ProcessedSQLTool[]> { const processedTools: ProcessedSQLTool[] = []; if (!this.yamlConfig?.tools) { return processedTools; } for (const [toolName, toolConfig] of Object.entries( this.yamlConfig.tools, )) { const sourceConfig = this.yamlConfig.sources?.[toolConfig.source]; if (!sourceConfig) { throw new McpError( JsonRpcErrorCode.InvalidParams, `Tool '${toolName}' references non-existent source '${toolConfig.source}'`, ); } // Find toolsets that contain this tool const toolsets: string[] = []; if (this.yamlConfig.toolsets) { for (const [toolsetName, toolsetConfig] of Object.entries( this.yamlConfig.toolsets, )) { if (toolsetConfig.tools.includes(toolName)) { toolsets.push(toolsetName); } } } processedTools.push({ name: toolName, config: toolConfig, source: sourceConfig, toolsets, metadata: { name: toolName, description: toolConfig.description, domain: toolConfig.domain, category: toolConfig.category, toolsets, }, }); } return processedTools; } /** * Create tool configurations from processed tools using standardized builder * @param processedTools - Array of processed tools * @param context - Request context * @returns Array of cached tool configurations * @private */ private async createToolConfigurations( processedTools: ProcessedSQLTool[], context: RequestContext, ): Promise<CachedToolConfig[]> { const { ToolConfigBuilder } = await import("./toolConfigBuilder.js"); const configBuilder = ToolConfigBuilder.getInstance(); const toolConfigs: CachedToolConfig[] = []; for (const processedTool of processedTools) { const toolConfig = await configBuilder.buildToolConfig( processedTool.name, processedTool.config, processedTool.toolsets, context, ); toolConfigs.push(toolConfig); } return toolConfigs; } /** * Clear all data and reset (for testing) */ clearAll(): void { this.sourceManager.clearAllSources(); this.toolsetManager.clearAll(); this.isInitialized = false; this.yamlConfig = null; this.processedTools = []; this.toolConfigs = []; } // ============================================================================ // Simplified File Watching (moved from YamlConfigBuilder) // ============================================================================ /** * Register a callback to be called when YAML configuration files change * @param callback - Function to call when files change * @returns Callback ID for unregistering */ static registerChangeCallback( callback: ( filePath: string, eventType: string, context?: RequestContext, ) => void, ): string { const id = `callback_${ToolProcessor.nextCallbackId++}`; ToolProcessor.changeCallbacks.set(id, callback); return id; } /** * Unregister a change callback * @param callbackId - ID returned from registerChangeCallback * @returns True if callback was found and removed */ static unregisterChangeCallback(callbackId: string): boolean { return ToolProcessor.changeCallbacks.delete(callbackId); } /** * Get the number of registered change callbacks * @returns Number of registered callbacks */ static getCallbackCount(): number { return ToolProcessor.changeCallbacks.size; } /** * Start watching YAML configuration files for changes * @param filePaths - Array of file paths to watch * @param context - Request context for logging */ static startWatching(filePaths: string[], context?: RequestContext): void { if (!config.yamlAutoReload) { return; } for (const filePath of filePaths) { if (ToolProcessor.watchedPaths.has(filePath)) { continue; // Already watching this path } try { const watcher = watch(filePath, { persistent: false }, (eventType) => { logger.info( context || {}, `YAML file changed (${eventType}): ${filePath}`, ); // Call all registered callbacks for (const callback of ToolProcessor.changeCallbacks.values()) { try { callback(filePath, eventType, context); } catch (error) { logger.error( context || {}, `Error in YAML change callback: ${error instanceof Error ? error.message : String(error)}`, ); } } }); ToolProcessor.watchers.set(filePath, watcher); ToolProcessor.watchedPaths.add(filePath); logger.debug(context || {}, `Started watching YAML file: ${filePath}`); } catch (error) { logger.warning( context || {}, `Failed to watch YAML file: ${filePath}. ${error instanceof Error ? error.message : String(error)}`, ); } } } /** * Stop watching all files and clear watchers */ static clearWatchers(): void { for (const watcher of ToolProcessor.watchers.values()) { try { watcher.close(); } catch { // Ignore errors when closing watchers } } ToolProcessor.watchers.clear(); ToolProcessor.watchedPaths.clear(); ToolProcessor.changeCallbacks.clear(); } // Static factory methods for convenience (replaces YamlConfigBuilder static methods) static async fromFile( filePath: string, context?: RequestContext, ): Promise<ConfigBuildResult> { return ToolConfigBuilder.fromFile(filePath, context); } static async fromFiles( filePaths: string[], context?: RequestContext, ): Promise<ConfigBuildResult> { return ToolConfigBuilder.fromFiles(filePaths, context); } static async fromDirectory( directoryPath: string, context?: RequestContext, ): Promise<ConfigBuildResult> { return ToolConfigBuilder.fromDirectory(directoryPath, context); } static async fromGlob( pattern: string, baseDir?: string, context?: RequestContext, ): Promise<ConfigBuildResult> { return ToolConfigBuilder.fromGlob(pattern, baseDir, context); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/IBM/ibmi-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server