Skip to main content
Glama
ooples

MCP Console Automation Server

DataPipelineManager.ts28.3 kB
/** * Data Pipeline Integration Manager * Handles data flow, transformations, and integration with external data sources */ import { EventEmitter } from 'events'; import { v4 as uuidv4 } from 'uuid'; import { DataFlowDefinition, DataInput, DataOutput, DataTransformation, DataValidation, ValidationRule, DataSource, DataDestination, WorkflowExecution, } from '../types/workflow.js'; import { Logger } from '../utils/logger.js'; import * as fs from 'fs/promises'; import * as path from 'path'; export interface DataPipelineExecution { id: string; pipelineId: string; status: 'pending' | 'running' | 'completed' | 'failed'; startTime: Date; endTime?: Date; inputs: Record<string, any>; outputs: Record<string, any>; metrics: DataPipelineMetrics; errors: DataError[]; logs: DataLog[]; } export interface DataPipelineMetrics { recordsProcessed: number; recordsFiltered: number; recordsTransformed: number; recordsValidated: number; recordsFailed: number; processingTimeMs: number; throughputRecordsPerSecond: number; memoryUsageMB: number; } export interface DataError { timestamp: Date; stage: string; recordId?: string; error: string; data?: any; } export interface DataLog { timestamp: Date; level: 'debug' | 'info' | 'warn' | 'error'; stage: string; message: string; recordCount?: number; } export class DataPipelineManager extends EventEmitter { private pipelines: Map<string, DataFlowDefinition>; private executions: Map<string, DataPipelineExecution>; private dataConnectors: Map<string, DataConnector>; private transformationFunctions: Map<string, TransformationFunction>; private logger: Logger; constructor() { super(); this.pipelines = new Map(); this.executions = new Map(); this.dataConnectors = new Map(); this.transformationFunctions = new Map(); this.logger = new Logger('DataPipelineManager'); this.registerBuiltInTransformations(); this.registerBuiltInConnectors(); } /** * Register a data pipeline definition */ registerPipeline(id: string, definition: DataFlowDefinition): void { this.pipelines.set(id, definition); this.logger.info(`Data pipeline registered: ${id}`); this.emit('pipeline-registered', id, definition); } /** * Execute a data pipeline */ async executePipeline( pipelineId: string, inputs: Record<string, any>, context?: any ): Promise<string> { const pipeline = this.pipelines.get(pipelineId); if (!pipeline) { throw new Error(`Pipeline not found: ${pipelineId}`); } const executionId = uuidv4(); const execution: DataPipelineExecution = { id: executionId, pipelineId, status: 'pending', startTime: new Date(), inputs, outputs: {}, metrics: { recordsProcessed: 0, recordsFiltered: 0, recordsTransformed: 0, recordsValidated: 0, recordsFailed: 0, processingTimeMs: 0, throughputRecordsPerSecond: 0, memoryUsageMB: 0, }, errors: [], logs: [], }; this.executions.set(executionId, execution); try { execution.status = 'running'; this.emit('pipeline-started', executionId); // Load data from inputs const data = await this.loadInputData( pipeline, inputs, execution, context ); // Apply validations const validatedData = await this.validateData(pipeline, data, execution); // Apply transformations const transformedData = await this.transformData( pipeline, validatedData, execution ); // Save outputs await this.saveOutputData(pipeline, transformedData, execution, context); execution.status = 'completed'; execution.endTime = new Date(); execution.metrics.processingTimeMs = execution.endTime.getTime() - execution.startTime.getTime(); if (execution.metrics.processingTimeMs > 0) { execution.metrics.throughputRecordsPerSecond = (execution.metrics.recordsProcessed / execution.metrics.processingTimeMs) * 1000; } this.emit('pipeline-completed', executionId); return executionId; } catch (error: any) { execution.status = 'failed'; execution.endTime = new Date(); execution.errors.push({ timestamp: new Date(), stage: 'pipeline', error: error.message, data: { pipelineId, inputs }, }); this.emit('pipeline-failed', executionId, error); throw error; } } /** * Load data from input sources */ private async loadInputData( pipeline: DataFlowDefinition, inputs: Record<string, any>, execution: DataPipelineExecution, context?: any ): Promise<Record<string, any>> { const data: Record<string, any> = {}; this.addLog( execution, 'info', 'input', `Loading data from ${pipeline.inputs.length} input sources` ); for (const input of pipeline.inputs) { try { const inputData = await this.loadFromSource(input, inputs, context); data[input.name] = inputData; const recordCount = Array.isArray(inputData) ? inputData.length : 1; execution.metrics.recordsProcessed += recordCount; this.addLog( execution, 'info', 'input', `Loaded ${recordCount} records from ${input.name}` ); } catch (error: any) { execution.errors.push({ timestamp: new Date(), stage: 'input', error: `Failed to load input ${input.name}: ${error.message}`, }); if (input.required) { throw error; } else { data[input.name] = input.defaultValue; this.addLog( execution, 'warn', 'input', `Using default value for ${input.name}: ${error.message}` ); } } } return data; } /** * Load data from a specific source */ private async loadFromSource( input: DataInput, inputs: Record<string, any>, context?: any ): Promise<any> { switch (input.source.type) { case 'variable': return ( inputs[input.source.config.name] || context?.[input.source.config.name] ); case 'file': return await this.loadFromFile(input.source.config); case 'api': return await this.loadFromApi(input.source.config); case 'database': return await this.loadFromDatabase(input.source.config); case 'environment': return process.env[input.source.config.name]; case 'user_input': return inputs[input.name]; default: throw new Error(`Unsupported input source type: ${input.source.type}`); } } /** * Load data from file */ private async loadFromFile(config: any): Promise<any> { const filePath = config.path; const format = config.format || path.extname(filePath).toLowerCase(); const content = await fs.readFile(filePath, 'utf8'); switch (format) { case '.json': return JSON.parse(content); case '.csv': return this.parseCsv(content, config.options || {}); case '.txt': return content.split('\n').filter((line) => line.trim()); case '.xml': return this.parseXml(content); default: return content; } } /** * Load data from API */ private async loadFromApi(config: any): Promise<any> { const response = await fetch(config.url, { method: config.method || 'GET', headers: config.headers || {}, body: config.body ? JSON.stringify(config.body) : undefined, }); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } const contentType = response.headers.get('content-type') || ''; if (contentType.includes('application/json')) { return await response.json(); } else if (contentType.includes('text/')) { return await response.text(); } else { return await response.arrayBuffer(); } } /** * Load data from database */ private async loadFromDatabase(config: any): Promise<any> { const connector = this.dataConnectors.get(config.connector); if (!connector) { throw new Error(`Database connector not found: ${config.connector}`); } return await connector.query(config.query, config.parameters); } /** * Validate data against pipeline validation rules */ private async validateData( pipeline: DataFlowDefinition, data: Record<string, any>, execution: DataPipelineExecution ): Promise<Record<string, any>> { if (!pipeline.validations || pipeline.validations.length === 0) { return data; } this.addLog( execution, 'info', 'validation', `Validating data with ${pipeline.validations.length} validation rules` ); const validatedData = { ...data }; let validationErrors = 0; for (const validation of pipeline.validations) { try { const fieldData = this.getNestedValue(validatedData, validation.field); if (Array.isArray(fieldData)) { // Validate each item in array for (let i = 0; i < fieldData.length; i++) { const isValid = await this.validateValue( fieldData[i], validation.rules, execution ); if (!isValid) { validationErrors++; execution.errors.push({ timestamp: new Date(), stage: 'validation', recordId: `${validation.field}[${i}]`, error: `Validation failed for field ${validation.field}[${i}]`, data: fieldData[i], }); } } } else { // Validate single value const isValid = await this.validateValue( fieldData, validation.rules, execution ); if (!isValid) { validationErrors++; execution.errors.push({ timestamp: new Date(), stage: 'validation', recordId: validation.field, error: `Validation failed for field ${validation.field}`, data: fieldData, }); } } execution.metrics.recordsValidated++; } catch (error: any) { validationErrors++; execution.errors.push({ timestamp: new Date(), stage: 'validation', error: `Validation error for ${validation.field}: ${error.message}`, }); } } execution.metrics.recordsFailed += validationErrors; if (validationErrors > 0) { this.addLog( execution, 'warn', 'validation', `${validationErrors} validation errors found` ); } else { this.addLog(execution, 'info', 'validation', 'All validations passed'); } return validatedData; } /** * Validate a single value against rules */ private async validateValue( value: any, rules: ValidationRule[], execution: DataPipelineExecution ): Promise<boolean> { for (const rule of rules) { const isValid = this.applyValidationRule(value, rule); if (!isValid) { this.addLog( execution, 'debug', 'validation', `Validation rule failed: ${rule.type} - ${rule.message || 'No message'}` ); return false; } } return true; } /** * Apply a single validation rule */ private applyValidationRule(value: any, rule: ValidationRule): boolean { switch (rule.type) { case 'required': return value !== undefined && value !== null && value !== ''; case 'type': return typeof value === rule.value; case 'range': if (typeof value !== 'number') return false; const [min, max] = rule.value as [number, number]; return value >= min && value <= max; case 'pattern': if (typeof value !== 'string') return false; return new RegExp(rule.value as string).test(value); case 'custom': // Custom validation function try { const func = new Function('value', rule.value as string); return Boolean(func(value)); } catch { return false; } default: return true; } } /** * Transform data through the pipeline */ private async transformData( pipeline: DataFlowDefinition, data: Record<string, any>, execution: DataPipelineExecution ): Promise<Record<string, any>> { if (!pipeline.transformations || pipeline.transformations.length === 0) { return data; } this.addLog( execution, 'info', 'transformation', `Applying ${pipeline.transformations.length} transformations` ); let transformedData = { ...data }; for (const transformation of pipeline.transformations) { try { transformedData = await this.applyTransformation( transformation, transformedData, execution ); execution.metrics.recordsTransformed++; this.addLog( execution, 'debug', 'transformation', `Applied transformation: ${transformation.type} -> ${transformation.output}` ); } catch (error: any) { execution.errors.push({ timestamp: new Date(), stage: 'transformation', error: `Transformation failed (${transformation.id}): ${error.message}`, data: transformation, }); throw error; } } return transformedData; } /** * Apply a single transformation */ private async applyTransformation( transformation: DataTransformation, data: Record<string, any>, execution: DataPipelineExecution ): Promise<Record<string, any>> { const inputData = transformation.input.map((inputName) => this.getNestedValue(data, inputName) ); let result: any; switch (transformation.type) { case 'map': result = await this.applyMapTransformation( inputData[0], transformation.config ); break; case 'filter': result = await this.applyFilterTransformation( inputData[0], transformation.config ); execution.metrics.recordsFiltered += Array.isArray(inputData[0]) ? inputData[0].length - (Array.isArray(result) ? result.length : 0) : 0; break; case 'reduce': result = await this.applyReduceTransformation( inputData[0], transformation.config ); break; case 'merge': result = await this.applyMergeTransformation( inputData, transformation.config ); break; case 'split': result = await this.applySplitTransformation( inputData[0], transformation.config ); break; case 'validate': result = await this.applyValidateTransformation( inputData[0], transformation.config ); break; case 'format': result = await this.applyFormatTransformation( inputData[0], transformation.config ); break; case 'encrypt': result = await this.applyEncryptTransformation( inputData[0], transformation.config ); break; default: { // Check for custom transformation function const customFunction = this.transformationFunctions.get( transformation.type ); if (customFunction) { result = await customFunction.apply( inputData, transformation.config, execution ); } else { throw new Error( `Unsupported transformation type: ${transformation.type}` ); } break; } } // Set the result in the output field return this.setNestedValue({ ...data }, transformation.output, result); } /** * Save outputs to destinations */ private async saveOutputData( pipeline: DataFlowDefinition, data: Record<string, any>, execution: DataPipelineExecution, context?: any ): Promise<void> { if (!pipeline.outputs || pipeline.outputs.length === 0) { execution.outputs = data; return; } this.addLog( execution, 'info', 'output', `Saving data to ${pipeline.outputs.length} output destinations` ); for (const output of pipeline.outputs) { try { const outputData = this.getNestedValue(data, output.name); await this.saveToDestination(output, outputData, context); execution.outputs[output.name] = outputData; this.addLog( execution, 'info', 'output', `Saved output: ${output.name}` ); } catch (error: any) { execution.errors.push({ timestamp: new Date(), stage: 'output', error: `Failed to save output ${output.name}: ${error.message}`, }); throw error; } } } /** * Save data to a destination */ private async saveToDestination( output: DataOutput, data: any, context?: any ): Promise<void> { switch (output.destination.type) { case 'variable': // Data is already in execution outputs break; case 'file': await this.saveToFile(data, output.destination.config, output.format); break; case 'api': await this.saveToApi(data, output.destination.config); break; case 'database': await this.saveToDatabase(data, output.destination.config); break; case 'notification': await this.sendNotification(data, output.destination.config); break; default: throw new Error( `Unsupported output destination type: ${output.destination.type}` ); } } // Built-in transformation implementations private async applyMapTransformation( data: any[], config: any ): Promise<any[]> { if (!Array.isArray(data)) { throw new Error('Map transformation requires array input'); } const mappingFunction = config.function || config.expression; const func = new Function('item', 'index', `return ${mappingFunction}`); return data.map((item, index) => func(item, index)); } private async applyFilterTransformation( data: any[], config: any ): Promise<any[]> { if (!Array.isArray(data)) { throw new Error('Filter transformation requires array input'); } const filterFunction = config.function || config.expression; const func = new Function('item', 'index', `return ${filterFunction}`); return data.filter((item, index) => func(item, index)); } private async applyReduceTransformation( data: any[], config: any ): Promise<any> { if (!Array.isArray(data)) { throw new Error('Reduce transformation requires array input'); } const reduceFunction = config.function || config.expression; const func = new Function( 'accumulator', 'item', 'index', `return ${reduceFunction}` ); return data.reduce( (acc, item, index) => func(acc, item, index), config.initialValue ); } private async applyMergeTransformation( dataArrays: any[], config: any ): Promise<any> { if (config.strategy === 'concat' && dataArrays.every(Array.isArray)) { return dataArrays.flat(); } else if (config.strategy === 'object') { return Object.assign({}, ...dataArrays); } else { return dataArrays; } } private async applySplitTransformation( data: any, config: any ): Promise<any[]> { if (typeof data === 'string') { return data.split(config.delimiter || '\n'); } else if (Array.isArray(data)) { const chunkSize = config.chunkSize || 1; const result = []; for (let i = 0; i < data.length; i += chunkSize) { result.push(data.slice(i, i + chunkSize)); } return result; } throw new Error('Split transformation requires string or array input'); } private async applyValidateTransformation( data: any, config: any ): Promise<any> { // Re-validate data with additional rules return data; // Implement validation logic } private async applyFormatTransformation( data: any, config: any ): Promise<any> { if (config.type === 'json') { return JSON.stringify(data, null, config.indent || 2); } else if (config.type === 'csv') { return this.formatAsCsv(data, config.options || {}); } else if (config.type === 'template') { return this.applyTemplate(data, config.template); } return data; } private async applyEncryptTransformation( data: any, config: any ): Promise<any> { // Implement encryption logic return data; // Placeholder } // File I/O helpers private async saveToFile( data: any, config: any, format?: string ): Promise<void> { let content: string; if (format === 'json' || config.format === 'json') { content = JSON.stringify(data, null, 2); } else if (format === 'csv' || config.format === 'csv') { content = this.formatAsCsv(data, config.csvOptions || {}); } else if (format === 'txt' || config.format === 'txt') { content = Array.isArray(data) ? data.join('\n') : String(data); } else { content = String(data); } await fs.writeFile(config.path, content, config.encoding || 'utf8'); } private async saveToApi(data: any, config: any): Promise<void> { const response = await fetch(config.url, { method: config.method || 'POST', headers: { 'Content-Type': 'application/json', ...config.headers }, body: JSON.stringify(data), }); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } } private async saveToDatabase(data: any, config: any): Promise<void> { const connector = this.dataConnectors.get(config.connector); if (!connector) { throw new Error(`Database connector not found: ${config.connector}`); } await connector.insert(config.table, data); } private async sendNotification(data: any, config: any): Promise<void> { this.emit('notification', { type: config.type || 'info', message: config.message || 'Data pipeline notification', data, }); } // Utility methods private getNestedValue(obj: any, path: string): any { return path.split('.').reduce((current, key) => current?.[key], obj); } private setNestedValue(obj: any, path: string, value: any): any { const keys = path.split('.'); const lastKey = keys.pop()!; const target = keys.reduce((current, key) => { if (!(key in current)) current[key] = {}; return current[key]; }, obj); target[lastKey] = value; return obj; } private addLog( execution: DataPipelineExecution, level: 'debug' | 'info' | 'warn' | 'error', stage: string, message: string, recordCount?: number ): void { const log: DataLog = { timestamp: new Date(), level, stage, message, recordCount, }; execution.logs.push(log); // Log with appropriate level switch (level) { case 'debug': this.logger.debug(`[${execution.id}] ${stage}: ${message}`); break; case 'info': this.logger.info(`[${execution.id}] ${stage}: ${message}`); break; case 'warn': this.logger.warn(`[${execution.id}] ${stage}: ${message}`); break; case 'error': this.logger.error(`[${execution.id}] ${stage}: ${message}`); break; } } private parseCsv(content: string, options: any): any[] { const lines = content.split('\n').filter((line) => line.trim()); const delimiter = options.delimiter || ','; const hasHeader = options.hasHeader !== false; if (lines.length === 0) return []; const headers = hasHeader ? lines[0].split(delimiter) : null; const dataLines = hasHeader ? lines.slice(1) : lines; return dataLines.map((line, index) => { const values = line.split(delimiter); if (headers) { const obj: any = {}; headers.forEach((header, i) => { obj[header.trim()] = values[i]?.trim() || ''; }); return obj; } else { return values.map((v) => v.trim()); } }); } private formatAsCsv(data: any[], options: any): string { if (!Array.isArray(data) || data.length === 0) return ''; const delimiter = options.delimiter || ','; const includeHeader = options.includeHeader !== false; const firstItem = data[0]; const headers = Object.keys(firstItem); let csv = ''; if (includeHeader) { csv += headers.join(delimiter) + '\n'; } data.forEach((item) => { const values = headers.map((header) => { const value = item[header]; return typeof value === 'string' && value.includes(delimiter) ? `"${value}"` : String(value || ''); }); csv += values.join(delimiter) + '\n'; }); return csv; } private parseXml(content: string): any { // Simple XML parsing - in production, use a proper XML parser return { xml: content }; } private applyTemplate(data: any, template: string): string { return template.replace(/\{\{(\w+(?:\.\w+)*)\}\}/g, (match, path) => { const value = this.getNestedValue(data, path); return value !== undefined ? String(value) : match; }); } // Built-in connectors and transformations registration private registerBuiltInConnectors(): void { // Register built-in database connectors // Implementation would include SQLite, PostgreSQL, MySQL, etc. } private registerBuiltInTransformations(): void { // Register custom transformation functions // Users can register their own transformations } /** * Register a custom data connector */ registerConnector(name: string, connector: DataConnector): void { this.dataConnectors.set(name, connector); this.logger.info(`Data connector registered: ${name}`); } /** * Register a custom transformation function */ registerTransformation(name: string, func: TransformationFunction): void { this.transformationFunctions.set(name, func); this.logger.info(`Transformation function registered: ${name}`); } /** * Get pipeline execution by ID */ getExecution(executionId: string): DataPipelineExecution | undefined { return this.executions.get(executionId); } /** * Get all pipeline executions */ getAllExecutions(): DataPipelineExecution[] { return Array.from(this.executions.values()); } /** * Cancel a running pipeline execution */ cancelExecution(executionId: string): void { const execution = this.executions.get(executionId); if (execution && execution.status === 'running') { execution.status = 'failed'; execution.endTime = new Date(); execution.errors.push({ timestamp: new Date(), stage: 'pipeline', error: 'Pipeline execution cancelled by user', }); this.emit('pipeline-cancelled', executionId); } } } // Interface for data connectors export interface DataConnector { name: string; type: string; query(sql: string, parameters?: any[]): Promise<any>; insert(table: string, data: any): Promise<void>; update(table: string, data: any, conditions: any): Promise<void>; delete(table: string, conditions: any): Promise<void>; connect(): Promise<void>; disconnect(): Promise<void>; } // Interface for transformation functions export interface TransformationFunction { apply( inputs: any[], config: any, execution: DataPipelineExecution ): Promise<any>; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ooples/mcp-console-automation'

If you have feedback or need assistance with the MCP directory API, please join our Discord server