Skip to main content
Glama
learner.ts19.9 kB
/** * @fileoverview Learner Service - Online Learning Loop for Titan Memory * * This service implements a ring-buffer replay system with mini-batch updates * that mix contrastive, next-token prediction, and masked language modeling. * It includes gradient accumulation with clipping and NaN guards, and runs * in a configurable interval with MCP tools for pause/resume functionality. */ import * as tf from '@tensorflow/tfjs-node'; import { z } from 'zod'; import type { IMemoryModel, IMemoryState, ITensor } from './types.js'; import type { AdvancedTokenizer } from './tokenizer/index.js'; import { wrapTensor, unwrapTensor } from './types.js'; /** * Configuration schema for the learner service */ const LearnerConfigSchema = z.object({ bufferSize: z.number().int().positive().default(10000), batchSize: z.number().int().positive().default(32), updateInterval: z.number().int().positive().default(1000), // ms gradientClipValue: z.number().positive().default(1.0), contrastiveWeight: z.number().min(0).max(1).default(0.2), nextTokenWeight: z.number().min(0).max(1).default(0.4), mlmWeight: z.number().min(0).max(1).default(0.4), accumulationSteps: z.number().int().positive().default(4), learningRate: z.number().positive().default(0.0001), nanGuardThreshold: z.number().positive().default(1e-6) }); export type LearnerConfig = z.infer<typeof LearnerConfigSchema>; /** * Training sample interface for the replay buffer */ interface TrainingSample { input: tf.Tensor; target: tf.Tensor; positive?: tf.Tensor; // For contrastive learning negative?: tf.Tensor; // For contrastive learning maskIndices?: number[]; // For MLM timestamp: number; } /** * Ring buffer implementation for replay */ class RingBuffer { private buffer: TrainingSample[]; private capacity: number; private position: number = 0; private size: number = 0; constructor(capacity: number) { this.capacity = capacity; this.buffer = new Array(capacity); } add(sample: TrainingSample): void { // Dispose old tensor if replacing if (this.buffer[this.position]) { this.disposeSample(this.buffer[this.position]); } this.buffer[this.position] = sample; this.position = (this.position + 1) % this.capacity; this.size = Math.min(this.size + 1, this.capacity); } sample(batchSize: number): TrainingSample[] { if (this.size === 0) return []; const samples: TrainingSample[] = []; const availableIndices = Array.from({ length: this.size }, (_, i) => i); for (let i = 0; i < Math.min(batchSize, this.size); i++) { const randomIndex = Math.floor(Math.random() * availableIndices.length); const bufferIndex = availableIndices.splice(randomIndex, 1)[0]; samples.push(this.buffer[bufferIndex]); } return samples; } private disposeSample(sample: TrainingSample): void { sample.input.dispose(); sample.target.dispose(); sample.positive?.dispose(); sample.negative?.dispose(); } getSize(): number { return this.size; } clear(): void { for (let i = 0; i < this.size; i++) { this.disposeSample(this.buffer[i]); } this.size = 0; this.position = 0; } } /** * Gradient accumulation helper */ class GradientAccumulator { private accumulatedGradients: Map<string, tf.Tensor> = new Map(); private step: number = 0; private readonly accumulationSteps: number; constructor(accumulationSteps: number) { this.accumulationSteps = accumulationSteps; } accumulate(gradients: Map<string, tf.Tensor>): boolean { for (const [key, gradient] of gradients) { if (this.accumulatedGradients.has(key)) { const accumulated = this.accumulatedGradients.get(key)!; this.accumulatedGradients.set(key, tf.add(accumulated, gradient)); accumulated.dispose(); } else { this.accumulatedGradients.set(key, gradient.clone()); } } this.step++; return this.step >= this.accumulationSteps; } getAverageGradients(): Map<string, tf.Tensor> { const avgGradients = new Map<string, tf.Tensor>(); const scale = tf.scalar(this.accumulationSteps); for (const [key, gradient] of this.accumulatedGradients) { avgGradients.set(key, tf.div(gradient, scale)); } scale.dispose(); return avgGradients; } reset(): void { for (const gradient of this.accumulatedGradients.values()) { gradient.dispose(); } this.accumulatedGradients.clear(); this.step = 0; } } /** * Main Learner service class */ export class LearnerService { private config: LearnerConfig; private replayBuffer: RingBuffer; private model: IMemoryModel; private tokenizer: AdvancedTokenizer; private gradientAccumulator: GradientAccumulator; private optimizer: tf.Optimizer; private trainingInterval: NodeJS.Timeout | null = null; private isRunning: boolean = false; private stepCount: number = 0; private lastLossValues: number[] = []; constructor( model: IMemoryModel, tokenizer: AdvancedTokenizer, config: Partial<LearnerConfig> = {} ) { this.config = LearnerConfigSchema.parse(config); this.model = model; this.tokenizer = tokenizer; this.replayBuffer = new RingBuffer(this.config.bufferSize); this.gradientAccumulator = new GradientAccumulator(this.config.accumulationSteps); this.optimizer = tf.train.adam(this.config.learningRate); } /** * Add a training sample to the replay buffer */ async addTrainingSample( input: string | tf.Tensor, target: string | tf.Tensor, positive?: string | tf.Tensor, negative?: string | tf.Tensor ): Promise<void> { const inputInfo = typeof input === 'string' ? await this.normalizeSample(input) : this.normalizeTensorSample(input); const targetInfo = typeof target === 'string' ? await this.normalizeSample(target) : this.normalizeTensorSample(target); const positiveInfo = typeof positive === 'string' ? await this.normalizeSample(positive) : (positive ? this.normalizeTensorSample(positive) : undefined); const negativeInfo = typeof negative === 'string' ? await this.normalizeSample(negative) : (negative ? this.normalizeTensorSample(negative) : undefined); const sample: TrainingSample = { input: inputInfo.tensor, target: targetInfo.tensor, positive: positiveInfo?.tensor, negative: negativeInfo?.tensor, maskIndices: this.generateMaskIndices(inputInfo.sequenceLength), timestamp: Date.now() }; this.replayBuffer.add(sample); } private async normalizeSample(value: string): Promise<{ tensor: tf.Tensor; sequenceLength: number }> { const encodeResult = await (this.tokenizer.encode as any)(value); if (encodeResult == null) { throw new Error('Tokenizer returned no result for provided text input.'); } if (encodeResult instanceof tf.Tensor) { const tensor = this.ensure1DTensor(encodeResult); const sequenceLength = encodeResult.shape[0] ?? encodeResult.size; encodeResult.dispose(); return { tensor, sequenceLength }; } if ('embeddings' in encodeResult) { const embeddings = encodeResult.embeddings as tf.Tensor; const sequenceLength = embeddings.shape[0] ?? embeddings.size; const averaged = tf.tidy(() => embeddings.mean(0)); embeddings.dispose(); return { tensor: averaged, sequenceLength }; } throw new Error('Unsupported tokenizer encode result. Expected tensor embeddings.'); } private normalizeTensorSample(value: tf.Tensor): { tensor: tf.Tensor; sequenceLength: number } { const tensorInput = value as tf.Tensor; if (typeof tensorInput.isDisposed === 'function' && tensorInput.isDisposed()) { throw new Error('Cannot normalize a disposed tensor input.'); } // Keep a reference to guard against callers disposing their tensor while we clone it. const kept = tf.keep(tensorInput); const tensor = kept.clone(); kept.dispose(); const sequenceLength = tensor.shape[0] ?? tensor.size; return { tensor, sequenceLength }; } private ensure1DTensor(tensor: tf.Tensor): tf.Tensor { if (tensor.rank === 1) { return tensor.clone(); } return tf.tidy(() => tensor.mean(0)); } /** * Start the training loop */ startTraining(): void { if (this.isRunning) return; this.isRunning = true; this.trainingInterval = setInterval(() => { this.performTrainingStep(); }, this.config.updateInterval); } /** * Pause the training loop */ pauseTraining(): void { if (this.trainingInterval) { clearInterval(this.trainingInterval); this.trainingInterval = null; } this.isRunning = false; } /** * Resume the training loop */ resumeTraining(): void { if (!this.isRunning) { this.startTraining(); } } /** * Check if the learner is currently running */ isTraining(): boolean { return this.isRunning; } /** * Get current training statistics */ getTrainingStats(): { bufferSize: number; stepCount: number; isRunning: boolean; averageLoss: number; lastLoss: number; } { const averageLoss = this.lastLossValues.length > 0 ? this.lastLossValues.reduce((a, b) => a + b, 0) / this.lastLossValues.length : 0; return { bufferSize: this.replayBuffer.getSize(), stepCount: this.stepCount, isRunning: this.isRunning, averageLoss, lastLoss: this.lastLossValues[this.lastLossValues.length - 1] || 0 }; } /** * Perform a single training step */ private performTrainingStep(): void { if (this.replayBuffer.getSize() < this.config.batchSize) { return; // Not enough samples } try { tf.tidy(() => { const batch = this.replayBuffer.sample(this.config.batchSize); const { loss, gradients } = this.computeMixedLoss(batch); // NaN guard if (this.hasNaNGradients(gradients)) { console.warn('NaN gradients detected, skipping step'); return; } // Gradient clipping const clippedGradients = this.clipGradients(gradients); for (const gradient of gradients.values()) { gradient.dispose(); } gradients.clear(); // Accumulate gradients const shouldUpdate = this.gradientAccumulator.accumulate(clippedGradients); if (shouldUpdate) { // Apply accumulated gradients const avgGradients = this.gradientAccumulator.getAverageGradients(); this.applyGradients(avgGradients); this.gradientAccumulator.reset(); // Dispose average gradients for (const gradient of avgGradients.values()) { gradient.dispose(); } } // Track loss const lossValue = loss.dataSync()[0]; this.lastLossValues.push(lossValue); if (this.lastLossValues.length > 100) { this.lastLossValues.shift(); } this.stepCount++; // Dispose clipped gradients for (const gradient of clippedGradients.values()) { gradient.dispose(); } }); } catch (error) { console.error('Training step error:', error); } } /** * Compute mixed loss combining contrastive, next-token, and MLM losses */ private computeMixedLoss(batch: TrainingSample[]): { loss: tf.Scalar; gradients: Map<string, tf.Tensor>; } { const trainableVars = this.model.getTrainableVariables(); const lossFn = () => { const lossTerms: tf.Tensor[] = []; if (this.config.nextTokenWeight > 0) { const nextTokenLoss = this.computeNextTokenLoss(batch); lossTerms.push(tf.mul(nextTokenLoss, tf.scalar(this.config.nextTokenWeight))); } if (this.config.contrastiveWeight > 0) { const contrastiveLoss = this.computeContrastiveLoss(batch); lossTerms.push(tf.mul(contrastiveLoss, tf.scalar(this.config.contrastiveWeight))); } if (this.config.mlmWeight > 0) { const mlmLoss = this.computeMLMLoss(batch); lossTerms.push(tf.mul(mlmLoss, tf.scalar(this.config.mlmWeight))); } if (lossTerms.length === 0) { return tf.scalar(0); } return lossTerms.length === 1 ? (lossTerms[0] as tf.Scalar) : (tf.addN(lossTerms) as tf.Scalar); }; if (trainableVars.length === 0) { return { loss: lossFn(), gradients: new Map<string, tf.Tensor>() }; } const { value: mixedLoss, grads } = tf.variableGrads(lossFn, trainableVars); const gradients = new Map<string, tf.Tensor>(); for (const [name, gradient] of Object.entries(grads)) { gradients.set(name, gradient as tf.Tensor); } return { loss: mixedLoss as tf.Scalar, gradients }; } /** * Compute next-token prediction loss */ private computeNextTokenLoss(batch: TrainingSample[]): tf.Scalar { const inputs = batch.map(s => s.input); const targets = batch.map(s => s.target); const inputStack = tf.stack(inputs); const targetStack = tf.stack(targets); // Use model's prediction capabilities const memoryState = this.model.getMemoryState(); const predictions = inputStack.split(inputStack.shape[0]).map(input => { const squeezed = input.squeeze([0]); const result = this.model.forward(wrapTensor(squeezed), memoryState); return unwrapTensor(result.predicted); }); const predictionStack = tf.stack(predictions); const loss = tf.losses.softmaxCrossEntropy(targetStack, predictionStack) as tf.Scalar; // Clean up for (const pred of predictions) { pred.dispose(); } predictionStack.dispose(); inputStack.dispose(); targetStack.dispose(); return loss; } /** * Compute contrastive learning loss */ private computeContrastiveLoss(batch: TrainingSample[]): tf.Scalar { const validSamples = batch.filter(s => s.positive && s.negative); if (validSamples.length === 0) { return tf.scalar(0); } let totalLoss = tf.scalar(0); let count = 0; for (const sample of validSamples) { if (!sample.positive || !sample.negative) continue; const anchorEmbed = this.getEmbedding(sample.input); const positiveEmbed = this.getEmbedding(sample.positive); const negativeEmbed = this.getEmbedding(sample.negative); // Compute cosine similarities const positiveSim = tf.sum(tf.mul(anchorEmbed, positiveEmbed)); const negativeSim = tf.sum(tf.mul(anchorEmbed, negativeEmbed)); // Contrastive loss: maximize positive similarity, minimize negative const margin = tf.scalar(0.5); const loss = tf.maximum( tf.scalar(0), tf.add(tf.sub(negativeSim, positiveSim), margin) ); totalLoss = tf.add(totalLoss, loss); count++; // Clean up anchorEmbed.dispose(); positiveEmbed.dispose(); negativeEmbed.dispose(); positiveSim.dispose(); negativeSim.dispose(); loss.dispose(); margin.dispose(); } if (count > 0) { const avgLoss = tf.div(totalLoss, tf.scalar(count)) as tf.Scalar; totalLoss.dispose(); return avgLoss; } return totalLoss; } /** * Compute masked language modeling loss */ private computeMLMLoss(batch: TrainingSample[]): tf.Scalar { const validSamples = batch.filter(s => s.maskIndices && s.maskIndices.length > 0); if (validSamples.length === 0) { return tf.scalar(0); } let totalLoss = tf.scalar(0); let count = 0; for (const sample of validSamples) { if (!sample.maskIndices || sample.maskIndices.length === 0) continue; // Create masked input const maskedInput = sample.input.clone(); const maskToken = (this.tokenizer as any).getSpecialTokens?.().mask || 103; // Default mask token // Apply masks for (const maskIndex of sample.maskIndices) { // This is a simplified approach - in practice you'd need proper indexing // maskedInput[maskIndex] = maskToken; } // Get predictions for masked positions const memoryState = this.model.getMemoryState(); const { predicted } = this.model.forward(wrapTensor(maskedInput), memoryState); // Compute loss only for masked positions const maskedLoss = tf.losses.softmaxCrossEntropy( sample.target.slice([0], [sample.maskIndices.length]), unwrapTensor(predicted).slice([0], [sample.maskIndices.length]) ); totalLoss = tf.add(totalLoss, maskedLoss); count++; // Clean up maskedInput.dispose(); maskedLoss.dispose(); } if (count > 0) { const avgLoss = tf.div(totalLoss, tf.scalar(count)) as tf.Scalar; totalLoss.dispose(); return avgLoss; } return totalLoss; } /** * Get embedding representation of a tensor */ private getEmbedding(tensor: tf.Tensor): tf.Tensor { // This should use the model's embedding layer // For now, we'll use a simple normalization const norm = tf.norm(tensor); return tf.div(tensor, norm); } /** * Generate random mask indices for MLM */ private generateMaskIndices(sequenceLength: number): number[] { const maskRate = 0.15; // Standard BERT masking rate const numMasks = Math.floor(sequenceLength * maskRate); const indices: number[] = []; for (let i = 0; i < numMasks; i++) { const randomIndex = Math.floor(Math.random() * sequenceLength); if (!indices.includes(randomIndex)) { indices.push(randomIndex); } } return indices; } /** * Check for NaN gradients */ private hasNaNGradients(gradients: Map<string, tf.Tensor>): boolean { for (const gradient of gradients.values()) { const hasNaN = tf.any(tf.isNaN(gradient)).dataSync()[0] > 0; if (hasNaN) return true; const maxValue = tf.max(tf.abs(gradient)).dataSync()[0]; if (maxValue > 1e6) return true; // Treat very large gradients as problematic } return false; } /** * Clip gradients to prevent exploding gradients */ private clipGradients(gradients: Map<string, tf.Tensor>): Map<string, tf.Tensor> { const clippedGradients = new Map<string, tf.Tensor>(); for (const [key, gradient] of gradients) { const clipped = tf.clipByValue( gradient, -this.config.gradientClipValue, this.config.gradientClipValue ); clippedGradients.set(key, clipped); } return clippedGradients; } /** * Apply gradients to the model */ private applyGradients(gradients: Map<string, tf.Tensor>): void { try { if (typeof this.model.applyGradients === 'function') { this.model.applyGradients(gradients); return; } const namedGradients: tf.NamedTensorMap = {}; const variableMap = new Map( this.model.getTrainableVariables().map(variable => [variable.name, variable]) ); for (const [name, gradient] of gradients) { if (variableMap.has(name)) { namedGradients[name] = gradient; } } if (Object.keys(namedGradients).length > 0) { this.optimizer.applyGradients(namedGradients); } } finally { for (const gradient of gradients.values()) { gradient.dispose(); } gradients.clear(); } } /** * Dispose of all resources */ dispose(): void { this.pauseTraining(); this.replayBuffer.clear(); this.gradientAccumulator.reset(); this.optimizer.dispose(); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/henryhawke/mcp-titan'

If you have feedback or need assistance with the MCP directory API, please join our Discord server