Skip to main content
Glama
WorkingMemoryModule.ts17.4 kB
/** * Working Memory Module Implementation * * Implements Miller's 7±2 capacity limitation with phonological and visuospatial buffers, * information chunking, rehearsal mechanisms, and decay processes. */ import { ComponentStatus, IWorkingMemory } from "../interfaces/cognitive.js"; import { MemoryChunk } from "../types/core.js"; // Buffer types for different information modalities export enum BufferType { PHONOLOGICAL = "phonological", VISUOSPATIAL = "visuospatial", EPISODIC = "episodic", } // Working memory buffer interface export interface WorkingMemoryBuffer { type: BufferType; capacity: number; chunks: MemoryChunk[]; decay_rate: number; rehearsal_threshold: number; } // Chunking strategy interface export interface ChunkingStrategy { name: string; chunk(items: unknown[]): MemoryChunk[]; canChunk(items: unknown[]): boolean; } // Working memory state export interface WorkingMemoryState { active_chunks: MemoryChunk[]; cognitive_load: number; rehearsal_queue: string[]; buffer_states: Map<BufferType, WorkingMemoryBuffer>; last_decay: number; } export class WorkingMemoryModule implements IWorkingMemory { private capacity: number = 7; // Miller's magic number private decay_rate: number = 0.1; // Per second private rehearsal_threshold: number = 0.5; // Note: chunk_similarity_threshold available for future similarity-based chunking // Buffers for different types of information (initialized in constructor) private phonological_buffer!: WorkingMemoryBuffer; private visuospatial_buffer!: WorkingMemoryBuffer; private episodic_buffer!: WorkingMemoryBuffer; // Chunking strategies (initialized in constructor) private chunking_strategies!: ChunkingStrategy[]; // Component state private initialized: boolean = false; private active: boolean = false; private last_activity: number = 0; private error?: string; // Rehearsal mechanism private rehearsal_interval: NodeJS.Timeout | null = null; private rehearsal_frequency: number = 2000; // ms constructor() { this.initializeBuffers(); this.initializeChunkingStrategies(); } async initialize(config: Record<string, unknown>): Promise<void> { try { // Validate configuration if (!config) { throw new Error("Configuration is required"); } if ( config.working_memory_capacity && (config.working_memory_capacity as number) < 1 ) { throw new Error("Working memory capacity must be at least 1"); } // Configure capacity and parameters from config this.capacity = (config.working_memory_capacity as number) ?? 7; this.decay_rate = (config.noise_level as number) ?? 0.1; this.rehearsal_threshold = (config.confidence_threshold as number) ?? 0.5; // Initialize buffers with configuration this.initializeBuffers(); // Start rehearsal mechanism this.startRehearsalProcess(); this.initialized = true; this.active = true; this.last_activity = Date.now(); } catch (error) { this.error = `Failed to initialize WorkingMemoryModule: ${error}`; throw error; } } private initializeBuffers(): void { // Phonological buffer for verbal/auditory information this.phonological_buffer = { type: BufferType.PHONOLOGICAL, capacity: Math.ceil(this.capacity * 0.4), // ~40% of total capacity chunks: [], decay_rate: this.decay_rate * 1.2, // Faster decay for phonological rehearsal_threshold: this.rehearsal_threshold, }; // Visuospatial buffer for visual/spatial information this.visuospatial_buffer = { type: BufferType.VISUOSPATIAL, capacity: Math.ceil(this.capacity * 0.4), // ~40% of total capacity chunks: [], decay_rate: this.decay_rate * 0.8, // Slower decay for visuospatial rehearsal_threshold: this.rehearsal_threshold, }; // Episodic buffer for integrated information this.episodic_buffer = { type: BufferType.EPISODIC, capacity: Math.ceil(this.capacity * 0.2), // ~20% of total capacity chunks: [], decay_rate: this.decay_rate, rehearsal_threshold: this.rehearsal_threshold * 0.8, // Lower threshold }; } private initializeChunkingStrategies(): void { this.chunking_strategies = [ new SemanticChunkingStrategy(), new SequentialChunkingStrategy(), new CategoryChunkingStrategy(), new SpatialChunkingStrategy(), ]; } async process(input: unknown): Promise<WorkingMemoryState> { this.last_activity = Date.now(); // Apply decay before processing this.decay(); // Process input and create memory chunks const chunks = this.createMemoryChunks(input); // Add chunks to appropriate buffers for (const chunk of chunks) { this.addChunk(chunk); } // Perform rehearsal this.rehearse(); return this.getCurrentState(); } addChunk(chunk: MemoryChunk): boolean { // Validate chunk if (!chunk?.content) { return false; } // Determine appropriate buffer based on content type const buffer = this.selectBuffer(chunk); // Check if buffer has capacity if (buffer.chunks.length >= buffer.capacity) { // Try to make space by removing least active chunks if (!this.makeSpace(buffer)) { return false; // Could not add chunk } } // Try to chunk with existing information const chunked = this.attemptChunking(chunk, buffer); if (chunked) { return true; } // Add as new chunk buffer.chunks.push({ ...chunk, timestamp: Date.now(), activation: Math.max(0.1, chunk.activation ?? 1.0), // Ensure minimum activation }); return true; } getActiveChunks(): MemoryChunk[] { const allChunks: MemoryChunk[] = []; // Collect chunks from all buffers allChunks.push(...this.phonological_buffer.chunks); allChunks.push(...this.visuospatial_buffer.chunks); allChunks.push(...this.episodic_buffer.chunks); // Filter by activation threshold and sort by activation return allChunks .filter((chunk) => chunk.activation > 0.1) .sort((a, b) => b.activation - a.activation); } rehearse(): void { // Rehearse chunks that are above threshold but below full activation const rehearsalCandidates = this.getActiveChunks().filter( (chunk) => chunk.activation >= this.rehearsal_threshold && chunk.activation < 0.9 ); // Boost activation for rehearsed chunks for (const chunk of rehearsalCandidates) { chunk.activation = Math.min(1.0, chunk.activation + 0.1); chunk.timestamp = Date.now(); // Reset decay timer } } decay(): void { const now = Date.now(); // Apply decay to all buffers this.applyDecayToBuffer(this.phonological_buffer, now); this.applyDecayToBuffer(this.visuospatial_buffer, now); this.applyDecayToBuffer(this.episodic_buffer, now); this.last_activity = now; } private applyDecayToBuffer( buffer: WorkingMemoryBuffer, currentTime: number ): void { buffer.chunks = buffer.chunks .map((chunk) => { const timeDelta = (currentTime - chunk.timestamp) / 1000; // Convert to seconds const decayAmount = buffer.decay_rate * timeDelta; return { ...chunk, activation: Math.max(0, chunk.activation - decayAmount), }; }) .filter((chunk) => chunk.activation > 0.05); // Remove very weak chunks } getCapacity(): number { return this.capacity; } getCurrentLoad(): number { const totalChunks = this.phonological_buffer.chunks.length + this.visuospatial_buffer.chunks.length + this.episodic_buffer.chunks.length; return totalChunks / this.capacity; } reset(): void { this.phonological_buffer.chunks = []; this.visuospatial_buffer.chunks = []; this.episodic_buffer.chunks = []; if (this.rehearsal_interval) { clearInterval(this.rehearsal_interval); this.rehearsal_interval = null; } this.active = false; } getStatus(): ComponentStatus { return { name: "WorkingMemoryModule", initialized: this.initialized, active: this.active, last_activity: this.last_activity, error: this.error ?? "", }; } private createMemoryChunks(input: unknown): MemoryChunk[] { // Convert input to memory chunks based on content type if (typeof input === "string") { return this.createTextChunks(input); } else if (Array.isArray(input)) { return this.createArrayChunks(input); } else if (typeof input === "object") { return this.createObjectChunks(input as Record<string, unknown>); } // Default single chunk return [ { content: input, activation: 1.0, timestamp: Date.now(), associations: new Set<string>(), emotional_valence: 0, importance: 0.5, context_tags: [], }, ]; } private createTextChunks(text: string): MemoryChunk[] { // Split text into semantic chunks const sentences = text.split(/[.!?]+/).filter((s) => s.trim().length > 0); return sentences.map((sentence) => ({ content: sentence.trim(), activation: 1.0, timestamp: Date.now(), associations: new Set<string>(), emotional_valence: 0, importance: 0.5, context_tags: ["text", "phonological"], })); } private createArrayChunks(array: unknown[]): MemoryChunk[] { // Group array items into chunks of manageable size const chunkSize = 3; // Optimal chunk size for arrays const chunks: MemoryChunk[] = []; // Only chunk if array is large enough to benefit from chunking if (array.length > chunkSize) { for (let i = 0; i < array.length; i += chunkSize) { const chunk = array.slice(i, i + chunkSize); chunks.push({ content: chunk, activation: 1.0, timestamp: Date.now(), associations: new Set<string>(), emotional_valence: 0, importance: 0.5, context_tags: ["array", "sequential"], }); } } else { // Small arrays stay as single chunks chunks.push({ content: array, activation: 1.0, timestamp: Date.now(), associations: new Set<string>(), emotional_valence: 0, importance: 0.5, context_tags: ["array", "sequential"], }); } return chunks; } private createObjectChunks(obj: Record<string, unknown>): MemoryChunk[] { // Create a single chunk for the entire object to avoid overwhelming working memory return [ { content: obj, activation: 1.0, timestamp: Date.now(), associations: new Set<string>(Object.keys(obj)), emotional_valence: 0, importance: 0.5, context_tags: ["object", "structured"], }, ]; } private selectBuffer(chunk: MemoryChunk): WorkingMemoryBuffer { // Select appropriate buffer based on content type and context tags const contextTags = chunk.context_tags ?? []; if ( contextTags.includes("phonological") ?? (contextTags.includes("text") || typeof chunk.content === "string") ) { return this.phonological_buffer; } if ( contextTags.includes("visuospatial") ?? contextTags.includes("spatial") ?? contextTags.includes("visual") ) { return this.visuospatial_buffer; } // Default to episodic buffer for complex or integrated information return this.episodic_buffer; } private makeSpace(buffer: WorkingMemoryBuffer): boolean { // Remove the chunk with lowest activation if (buffer.chunks.length === 0) return true; const lowestActivation = Math.min( ...buffer.chunks.map((c) => c.activation) ); const indexToRemove = buffer.chunks.findIndex( (c) => c.activation === lowestActivation ); if (indexToRemove !== -1) { buffer.chunks.splice(indexToRemove, 1); return true; } return false; } private attemptChunking( newChunk: MemoryChunk, buffer: WorkingMemoryBuffer ): boolean { // Try each chunking strategy for (const strategy of this.chunking_strategies) { for (const existingChunk of buffer.chunks) { if (strategy.canChunk([existingChunk.content, newChunk.content])) { const chunkedResult = strategy.chunk([ existingChunk.content, newChunk.content, ]); if (chunkedResult.length === 1) { // Successfully chunked - replace existing chunk const index = buffer.chunks.indexOf(existingChunk); buffer.chunks[index] = { ...chunkedResult[0], activation: Math.max( existingChunk.activation, newChunk.activation ), timestamp: Date.now(), associations: new Set([ ...existingChunk.associations, ...newChunk.associations, ]), }; return true; } } } } return false; } private startRehearsalProcess(): void { this.rehearsal_interval = setInterval(() => { if (this.active) { this.rehearse(); this.decay(); } }, this.rehearsal_frequency); } private getRehearsalQueue(): string[] { return this.getActiveChunks() .filter((chunk) => chunk.activation >= this.rehearsal_threshold) .map((chunk) => JSON.stringify(chunk.content)); } getCurrentState(): WorkingMemoryState { const bufferStates = new Map<BufferType, WorkingMemoryBuffer>(); bufferStates.set(BufferType.PHONOLOGICAL, this.phonological_buffer); bufferStates.set(BufferType.VISUOSPATIAL, this.visuospatial_buffer); bufferStates.set(BufferType.EPISODIC, this.episodic_buffer); return { active_chunks: this.getActiveChunks(), cognitive_load: this.getCurrentLoad(), rehearsal_queue: this.getRehearsalQueue(), buffer_states: bufferStates, last_decay: this.last_activity, }; } } // Chunking strategy implementations class SemanticChunkingStrategy implements ChunkingStrategy { name = "semantic"; canChunk(items: unknown[]): boolean { // Check if items are semantically related (simplified) if (items.length !== 2) return false; const [item1, item2] = items; if (typeof item1 === "string" && typeof item2 === "string") { // Simple semantic similarity check const words1 = item1.toLowerCase().split(/\s+/); const words2 = item2.toLowerCase().split(/\s+/); const commonWords = words1.filter((w) => words2.includes(w)); return commonWords.length > 0; } return false; } chunk(items: unknown[]): MemoryChunk[] { return [ { content: Array.isArray(items) ? items.join(" ") : String(items), activation: 1.0, timestamp: Date.now(), associations: new Set<string>(), emotional_valence: 0, importance: 0.6, // Slightly higher importance for chunked items context_tags: ["semantic", "chunked"], }, ]; } } class SequentialChunkingStrategy implements ChunkingStrategy { name = "sequential"; canChunk(items: unknown[]): boolean { // Check if items can be sequentially chunked return Array.isArray(items) && items.length <= 4; // Max chunk size } chunk(items: unknown[]): MemoryChunk[] { return [ { content: items, activation: 1.0, timestamp: Date.now(), associations: new Set<string>(), emotional_valence: 0, importance: 0.6, context_tags: ["sequential", "chunked"], }, ]; } } class CategoryChunkingStrategy implements ChunkingStrategy { name = "category"; canChunk(items: unknown[]): boolean { // Check if items belong to the same category (simplified) if (items.length !== 2) return false; const [item1, item2] = items; if (typeof item1 === "object" && typeof item2 === "object") { // Check if objects have similar structure const keys1 = Object.keys(item1 as object); const keys2 = Object.keys(item2 as object); const commonKeys = keys1.filter((k) => keys2.includes(k)); return commonKeys.length > keys1.length * 0.5; } return false; } chunk(items: unknown[]): MemoryChunk[] { return [ { content: items, activation: 1.0, timestamp: Date.now(), associations: new Set<string>(), emotional_valence: 0, importance: 0.6, context_tags: ["category", "chunked"], }, ]; } } class SpatialChunkingStrategy implements ChunkingStrategy { name = "spatial"; canChunk(items: unknown[]): boolean { // Check if items have spatial relationships return items.every( (item) => typeof item === "object" && item !== null && ("x" in item || "y" in item || "position" in item) ); } chunk(items: unknown[]): MemoryChunk[] { return [ { content: items, activation: 1.0, timestamp: Date.now(), associations: new Set<string>(), emotional_valence: 0, importance: 0.6, context_tags: ["spatial", "chunked", "visuospatial"], }, ]; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keyurgolani/ThoughtMcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server