Skip to main content
Glama
CACHE.md30.8 kB
# Cache Subsystem Documentation **Location:** `src/cache/` **Purpose:** Event-driven message storage with RxJS observables **Pattern:** Hierarchical storage with reactive notifications --- ## Table of Contents 1. [Overview](#overview) 2. [Architecture](#architecture) 3. [Component Details](#component-details) 4. [Message Flow](#message-flow) 5. [RxJS Integration](#rxjs-integration) 6. [Lifecycle Management](#lifecycle-management) 7. [API Reference](#api-reference) --- ## Overview The Cache subsystem provides **persistent, event-driven storage** for Claude protocol messages. It implements a **hierarchical structure** that mirrors the system's organizational model: - **CacheManager**: Singleton coordinator for all message caches - **MessageCache**: One per team pair (fromTeam→toTeam) - **CacheEntry**: One per operation (spawn ping or tell) - **CacheMessage**: Individual protocol messages from Claude **Key Innovation:** Uses **RxJS observables** to emit events when new messages arrive, enabling Iris to react in real-time without polling. **Survivability:** Message caches **survive process crashes**, preserving partial responses for retrieval even after process recreation. --- ## Architecture ### Component Hierarchy ``` ┌──────────────────────────────────────────────────────────────┐ │ CacheManager │ │ (cache-manager.ts) │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ caches: Map<sessionId, MessageCache> │ │ │ │ │ │ │ │ Methods: │ │ │ │ • getOrCreateCache(sessionId, fromTeam, toTeam) │ │ │ │ • getCache(sessionId) │ │ │ │ • getAllCaches() │ │ │ │ • destroyAll() │ │ │ └────────────────────────────────────────────────────────┘ │ └────────────────────┬─────────────────────────────────────────┘ │ manages ▼ ┌──────────────────────────────────────────────────────────────┐ │ MessageCache │ │ (message-cache.ts) │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ sessionId: string │ │ │ │ fromTeam: string │ │ │ │ toTeam: string │ │ │ │ entries: CacheEntry[] │ │ │ │ │ │ │ │ Methods: │ │ │ │ • createEntry(type, tellString) → CacheEntry │ │ │ │ • getAllEntries() │ │ │ │ • getEntriesByType(type) │ │ │ │ • getEntriesByStatus(status) │ │ │ │ • getStats() │ │ │ │ • destroy() │ │ │ └────────────────────────────────────────────────────────┘ │ └────────────────────┬─────────────────────────────────────────┘ │ contains ▼ ┌──────────────────────────────────────────────────────────────┐ │ CacheEntry │ │ (cache-entry.ts) │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ cacheEntryType: SPAWN | TELL │ │ │ │ tellString: string │ │ │ │ status: active | completed | terminated │ │ │ │ messages: CacheMessage[] │ │ │ │ terminationReason?: TerminationReason │ │ │ │ │ │ │ │ RxJS Observable: │ │ │ │ • messages$: Observable<CacheMessage> │ │ │ │ │ │ │ │ Write Methods (ClaudeProcess): │ │ │ │ • addMessage(data) │ │ │ │ │ │ │ │ Read Methods (Iris): │ │ │ │ • getMessages() │ │ │ │ • getLatestMessage() │ │ │ │ │ │ │ │ Lifecycle (Iris): │ │ │ │ • complete() │ │ │ │ • terminate(reason) │ │ │ └────────────────────────────────────────────────────────┘ │ └────────────────────┬─────────────────────────────────────────┘ │ contains ▼ ┌──────────────────────────────────────────────────────────────┐ │ CacheMessage │ │ (types.ts) │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ timestamp: number │ │ │ │ type: "system" | "user" | "assistant" | │ │ │ │ "stream_event" | "result" | "unknown" │ │ │ │ data: any (raw Claude protocol message) │ │ │ └────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────┘ ``` ### File Structure ``` src/cache/ ├── types.ts # TypeScript interfaces, enums, types ├── cache-manager.ts # Top-level manager (singleton) ├── message-cache.ts # Per-team-pair cache ├── cache-entry.ts # Per-operation cache with RxJS └── README.md # Future phase placeholder ``` --- ## Component Details ### 1. CacheManager (cache-manager.ts) **Responsibility:** Coordinate all cache sessions **State:** ```typescript class CacheManager { private caches = new Map<string, MessageCache>(); } ``` **Key Methods:** ```typescript // Get or create message cache (called by Iris) getOrCreateCache( sessionId: string, fromTeam: string, toTeam: string ): MessageCache // Get existing cache getCache(sessionId: string): MessageCache | null // Get all caches (for monitoring) getAllCaches(): MessageCache[] // Aggregate statistics getStats(): { totalSessions: number; totalEntries: number; sessionStats: Array<{ sessionId: string; fromTeam: string; toTeam: string; entryCount: number; activeEntries: number; completedEntries: number; }>; } // Cleanup (shutdown) destroyAll(): void ``` **Lifetime:** Lives for entire Iris process lifetime **Thread Safety:** Not thread-safe (single-threaded Node.js) --- ### 2. MessageCache (message-cache.ts) **Responsibility:** Manage cache entries for a specific team pair **Identity:** One cache per `(fromTeam, toTeam)` pair - `iris → alpha` = one cache - `alpha → beta` = different cache - Links to SessionInfo (persistent metadata) via sessionId **State:** ```typescript class MessageCache { readonly sessionId: string; readonly fromTeam: string; readonly toTeam: string; readonly createdAt: number; private entries: CacheEntry[] = []; } ``` **Key Methods:** ```typescript // Create new entry (for spawn or tell) createEntry( cacheEntryType: CacheEntryType, tellString: string ): CacheEntry // Get all entries (chronological) getAllEntries(): CacheEntry[] // Filter by type getEntriesByType(type: CacheEntryType): CacheEntry[] // Filter by status getEntriesByStatus(status: CacheEntryStatus): CacheEntry[] // Get latest entry getLatestEntry(): CacheEntry | null // Statistics getStats(): { totalEntries: number; activeEntries: number; completedEntries: number; terminatedEntries: number; spawnEntries: number; tellEntries: number; } // Cleanup (called by CacheManager) destroy(): void ``` **Example Usage:** ```typescript // Iris creates entry for spawn const spawnEntry = messageCache.createEntry( CacheEntryType.SPAWN, "ping" ); // Pass to ClaudeProcess await process.spawn(spawnEntry); // Iris creates entry for tell const tellEntry = messageCache.createEntry( CacheEntryType.TELL, "What is 2+2?" ); // Subscribe to messages tellEntry.messages$.subscribe(msg => { console.log('New message:', msg.type); }); // Execute process.executeTell(tellEntry); ``` --- ### 3. CacheEntry (cache-entry.ts) **Responsibility:** Store messages for a single operation with RxJS notifications **Entry Types:** - **SPAWN**: Initial ping to warm up process - **TELL**: Actual user message **State:** ```typescript class CacheEntryImpl implements CacheEntry { readonly cacheEntryType: CacheEntryType; readonly tellString: string; readonly createdAt: number; status: CacheEntryStatus = CacheEntryStatus.ACTIVE; completedAt: number | null = null; terminationReason?: TerminationReason; private messages: CacheMessage[] = []; private messagesSubject = new Subject<CacheMessage>(); public messages$: Observable<CacheMessage>; } ``` **Write Methods (Called by ClaudeProcess - DUMB PIPE):** ```typescript addMessage(data: any): void { // Only accept messages while active if (this.status !== CacheEntryStatus.ACTIVE) return; // Create message const message: CacheMessage = { timestamp: Date.now(), type: data.type || "unknown", data, // Raw JSON from Claude }; // Store this.messages.push(message); // 🔔 EMIT to RxJS subscribers this.messagesSubject.next(message); } ``` **Read Methods (Called by Iris - BRAIN):** ```typescript // Get all messages getMessages(): CacheMessage[] // Get most recent message getLatestMessage(): CacheMessage | null ``` **Lifecycle Methods (Called by Iris - BRAIN):** ```typescript // Mark as successfully completed complete(): void { this.status = CacheEntryStatus.COMPLETED; this.completedAt = Date.now(); this.messagesSubject.complete(); // Complete RxJS observable } // Mark as terminated (error/timeout) terminate(reason: TerminationReason): void { this.status = CacheEntryStatus.TERMINATED; this.terminationReason = reason; this.completedAt = Date.now(); this.messagesSubject.complete(); // Complete RxJS observable } ``` **Observable:** ```typescript messages$: Observable<CacheMessage> ``` **Example Subscription:** ```typescript const subscription = cacheEntry.messages$.subscribe({ next: (message) => { console.log(`Received: ${message.type}`); if (message.type === 'result') { console.log('Completed!'); } }, complete: () => { console.log('Observable completed (entry finished)'); } }); // Cleanup subscription.unsubscribe(); ``` --- ### 4. CacheMessage (types.ts) **Responsibility:** Immutable snapshot of a Claude protocol message **Structure:** ```typescript interface CacheMessage { timestamp: number; // When message was added to cache type: "system" | "user" | "assistant" | "stream_event" | "result" | "unknown"; data: any; // Raw JSON from Claude (not parsed) } ``` **Message Types:** - `system`: System messages (e.g., `subtype: "init"`) - `user`: User messages sent to Claude - `assistant`: Assistant responses (text content) - `stream_event`: Streaming progress events - `result`: Final result with success/error - `unknown`: Unrecognized message type **Example Message Data:** ```json { "timestamp": 1697567890123, "type": "assistant", "data": { "type": "assistant", "message": { "role": "assistant", "content": [ { "type": "text", "text": "The answer is 4." } ] } } } ``` --- ## Message Flow ### Write Path (ClaudeProcess → Cache) ``` ┌─────────────────────────────────────────────────────────────┐ │ Claude Code Process (External) │ │ Writes newline-delimited JSON to stdout │ └────────────────────┬────────────────────────────────────────┘ │ │ stdout stream ▼ ┌─────────────────────────────────────────────────────────────┐ │ ClaudeProcess.handleStdoutData() (DUMB PIPE) │ │ 1. Parse JSON line │ │ 2. currentCacheEntry.addMessage(json) ← THAT'S IT! │ └────────────────────┬────────────────────────────────────────┘ │ │ addMessage(json) ▼ ┌─────────────────────────────────────────────────────────────┐ │ CacheEntry.addMessage(data) │ │ 1. Create CacheMessage: │ │ { timestamp: Date.now(), type: data.type, data } │ │ 2. messages.push(message) │ │ 3. messagesSubject.next(message) ← Emit to RxJS │ └─────────────────────────────────────────────────────────────┘ ``` **Key Point:** ClaudeProcess has **zero business logic** - it just pipes JSON to cache. ### Read Path (Iris → Cache) ``` ┌─────────────────────────────────────────────────────────────┐ │ Iris Orchestrator │ │ Needs to react to new messages │ └────────────────────┬────────────────────────────────────────┘ │ │ Subscribe to observable ▼ ┌─────────────────────────────────────────────────────────────┐ │ cacheEntry.messages$ (RxJS Observable) │ │ Stream of CacheMessage objects │ └────────────────────┬────────────────────────────────────────┘ │ │ Emit on each addMessage() ▼ ┌─────────────────────────────────────────────────────────────┐ │ Iris RxJS Subscription │ │ cacheEntry.messages$.subscribe(message => { │ │ // Reset responseTimeout │ │ // Check for completion │ │ // Update session state │ │ }); │ └─────────────────────────────────────────────────────────────┘ ``` **Alternative Read Path (Polling):** ```typescript // Get all messages at once (for team_cache_read tool) const messages = cacheEntry.getMessages(); // Get latest message const latest = cacheEntry.getLatestMessage(); ``` --- ## RxJS Integration ### Observable Pattern **Why RxJS?** - **Event-driven**: Iris reacts immediately to new messages - **Composable**: Use RxJS operators (filter, map, debounce) - **Cancellable**: Unsubscribe to stop listening - **Foundation for Phase 5**: Intelligence layer needs event streams **Implementation:** ```typescript class CacheEntryImpl { // Private subject (write-only from inside) private messagesSubject = new Subject<CacheMessage>(); // Public observable (read-only from outside) public messages$: Observable<CacheMessage>; constructor() { // Convert subject to observable this.messages$ = this.messagesSubject.asObservable(); } addMessage(data: any): void { const message = { timestamp, type, data }; this.messages.push(message); this.messagesSubject.next(message); // Emit! } complete(): void { this.messagesSubject.complete(); // Complete observable } } ``` ### Subscription Management (Iris Responsibility) **Iris tracks subscriptions to prevent memory leaks:** ```typescript class IrisOrchestrator { private responseSubscriptions = new Map<string, Subscription>(); private startResponseTimeout(sessionId: string, cacheEntry: CacheEntry) { // Subscribe to messages const subscription = cacheEntry.messages$.subscribe(msg => { // Handle message }); // Store for cleanup this.responseSubscriptions.set(sessionId, subscription); } private cleanupTell(sessionId: string) { // Unsubscribe to prevent memory leak const subscription = this.responseSubscriptions.get(sessionId); if (subscription) { subscription.unsubscribe(); this.responseSubscriptions.delete(sessionId); } } } ``` ### RxJS Operators in Iris **Filter for specific message types:** ```typescript import { filter } from 'rxjs/operators'; cacheEntry.messages$ .pipe(filter(msg => msg.type === 'result')) .subscribe(() => { console.log('Completed!'); }); ``` **Timeout handling:** ```typescript import { timeout } from 'rxjs/operators'; cacheEntry.messages$ .pipe( filter(msg => msg.type === 'result'), timeout(30000) // 30s timeout ) .subscribe({ next: () => console.log('Completed'), error: (err) => console.log('Timeout!') }); ``` --- ## Lifecycle Management ### CacheEntry Lifecycle ``` ┌──────────┐ │ CREATE │ ← cacheSession.createEntry(type, tellString) └────┬─────┘ │ ▼ ┌──────────┐ │ ACTIVE │ ← Receiving messages, observable emitting └────┬─────┘ │ ├─────────────┬──────────────────┐ │ │ │ │ │ │ ▼ ▼ ▼ ┌───────────┐ ┌─────────────┐ ┌────────────┐ │ COMPLETED │ │ TERMINATED │ │ DESTROY │ └───────────┘ └─────────────┘ └────────────┘ │ │ │ │ │ │ └─────────────┴──────────────────┘ │ ▼ [Observable completed] [Subscribers notified] ``` **State Transitions:** 1. **CREATE → ACTIVE** - `cacheSession.createEntry(type, tellString)` - Observable created and ready - Status = `ACTIVE` 2. **ACTIVE → COMPLETED** - `cacheEntry.complete()` called by Iris - Received `result` message with success - Observable completed - Status = `COMPLETED` 3. **ACTIVE → TERMINATED** - `cacheEntry.terminate(reason)` called by Iris - Reasons: RESPONSE_TIMEOUT, PROCESS_CRASHED, MANUAL_TERMINATION - Observable completed - Status = `TERMINATED` 4. **COMPLETED/TERMINATED → DESTROY** - `messageCache.destroy()` called by CacheManager - Entry removed from cache - Memory freed ### MessageCache Lifetime **Creation:** ```typescript // Iris gets or creates cache first time team pair communicates const cache = cacheManager.getOrCreateCache( sessionId, fromTeam, toTeam ); ``` **Survival:** - MessageCache **survives** process crashes - MessageCache **survives** process recreation - MessageCache **preserves** all entries and messages **Example:** ``` 1. Tell sent to alpha team 2. Process responding... 3. ⚠️ Process crashes (responseTimeout) 4. ✅ MessageCache STILL EXISTS in CacheManager 5. ✅ All messages preserved in cache entries 6. 📖 Caller can retrieve partial results via team_cache_read 7. Next tell creates NEW process 8. ✅ SAME MessageCache reused 9. New cache entry added to existing cache ``` **Destruction:** ```typescript // Only destroyed on explicit cleanup cacheManager.destroyAll(); // Shutdown cacheManager.deleteCache(sessionId); // Manual cleanup ``` --- ## API Reference ### CacheManager ```typescript class CacheManager { // Get or create message cache (idempotent) getOrCreateCache( sessionId: string, fromTeam: string, toTeam: string ): MessageCache; // Get existing cache (returns null if not found) getCache(sessionId: string): MessageCache | null; // Get all caches getAllCaches(): MessageCache[]; // Delete specific cache deleteCache(sessionId: string): void; // Get aggregate statistics getStats(): { totalSessions: number; totalEntries: number; sessionStats: Array<{...}>; }; // Destroy all caches (shutdown) destroyAll(): void; } ``` ### MessageCache ```typescript class MessageCache { readonly sessionId: string; readonly fromTeam: string; readonly toTeam: string; readonly createdAt: number; // Create new entry createEntry( cacheEntryType: CacheEntryType, tellString: string ): CacheEntry; // Get all entries (chronological) getAllEntries(): CacheEntry[]; // Filter by type (SPAWN or TELL) getEntriesByType(type: CacheEntryType): CacheEntry[]; // Filter by status (active, completed, terminated) getEntriesByStatus(status: CacheEntryStatus): CacheEntry[]; // Get latest entry getLatestEntry(): CacheEntry | null; // Statistics getStats(): { totalEntries: number; activeEntries: number; completedEntries: number; terminatedEntries: number; spawnEntries: number; tellEntries: number; }; // Cleanup destroy(): void; } ``` ### CacheEntry ```typescript interface CacheEntry { // Metadata readonly cacheEntryType: CacheEntryType; // SPAWN or TELL readonly tellString: string; // Original message readonly createdAt: number; status: CacheEntryStatus; // active, completed, terminated completedAt: number | null; terminationReason?: TerminationReason; // Data messages: CacheMessage[]; // All messages messages$: Observable<CacheMessage>; // RxJS observable // Write methods (ClaudeProcess) addMessage(data: any): void; // Read methods (Iris) getMessages(): CacheMessage[]; getLatestMessage(): CacheMessage | null; // Lifecycle (Iris) complete(): void; terminate(reason: TerminationReason): void; } ``` ### CacheMessage ```typescript interface CacheMessage { timestamp: number; // When added to cache type: "system" | "user" | "assistant" | "stream_event" | "result" | "unknown"; data: any; // Raw Claude protocol message } ``` ### Enums ```typescript enum CacheEntryType { SPAWN = "spawn", // Process initialization ping TELL = "tell" // Actual tell message } enum CacheEntryStatus { ACTIVE = "active", COMPLETED = "completed", TERMINATED = "terminated" } enum TerminationReason { RESPONSE_TIMEOUT = "response_timeout", PROCESS_CRASHED = "process_crashed", MANUAL_TERMINATION = "manual_termination" } ``` --- ## Integration Points ### With Iris Orchestrator **Iris creates and manages cache:** ```typescript class IrisOrchestrator { private cacheManager: CacheManager; async sendMessage(fromTeam, toTeam, message) { // Get or create message cache const messageCache = this.cacheManager.getOrCreateCache( sessionId, fromTeam, toTeam ); // Create cache entry const tellEntry = messageCache.createEntry( CacheEntryType.TELL, message ); // Subscribe to messages this.startResponseTimeout(sessionId, tellEntry); // Execute process.executeTell(tellEntry); } } ``` ### With ClaudeProcess **ClaudeProcess writes to cache:** ```typescript class ClaudeProcess { private currentCacheEntry: CacheEntry | null = null; async spawn(spawnCacheEntry: CacheEntry) { this.currentCacheEntry = spawnCacheEntry; // ... spawn process // stdout piped to cache via handleStdoutData } executeTell(cacheEntry: CacheEntry) { this.currentCacheEntry = cacheEntry; this.writeToStdin(cacheEntry.tellString); } private handleStdoutData(data: Buffer) { const json = JSON.parse(line); // DUMB PIPE: Just write to cache if (this.currentCacheEntry) { this.currentCacheEntry.addMessage(json); } } } ``` ### With MCP Tools **team_cache_read tool:** ```typescript async function team_cache_read(sessionId: string) { const messageCache = iris.cacheManager.getCache(sessionId); if (!messageCache) return { error: "Cache not found" }; const entries = messageCache.getAllEntries(); return { sessionId, entries: entries.map(entry => ({ type: entry.cacheEntryType, status: entry.status, messageCount: entry.getMessages().length, messages: entry.getMessages() })) }; } ``` **team_cache_clear tool:** ```typescript async function team_cache_clear(sessionId: string) { iris.cacheManager.deleteCache(sessionId); return { success: true }; } ``` --- ## Performance Characteristics **Memory:** - CacheMessage: ~200 bytes (average) - CacheEntry: ~1-5 KB (10-50 messages) - MessageCache: ~10-100 KB (10-20 entries) - CacheManager: ~1-10 MB (100 caches) **Typical Sizes:** - Short tell: 10-20 messages (~2 KB) - Long tell: 50-200 messages (~10-40 KB) - Session with 20 tells: ~200-400 KB **Observable Overhead:** - RxJS Subject: ~1 KB per entry - Subscription: ~100 bytes per subscriber - Negligible compared to message data **Scalability:** - 1000 message caches: ~100 MB RAM - 10,000 messages: ~2 MB RAM - Observable cleanup prevents memory leaks --- ## Future Enhancements ### Phase 2: Dashboard Integration ```typescript // Real-time cache monitoring cacheEntry.messages$.subscribe(msg => { dashboard.emit('cache-message', { sessionId, entryId, message: msg }); }); ``` ### Phase 3: API Integration ```typescript // WebSocket streaming app.ws('/cache/:sessionId/stream', (ws) => { const cache = cacheManager.getCache(sessionId); const sub = cache?.getLatestEntry()?.messages$.subscribe(msg => { ws.send(JSON.stringify(msg)); }); }); ``` ### Phase 5: Intelligence Layer ```typescript // Pattern recognition on cache streams cacheEntry.messages$ .pipe( map(msg => extractPatterns(msg)), buffer(time(5000)), filter(patterns => patterns.length > threshold) ) .subscribe(patterns => { intelligence.learnFromPatterns(patterns); }); ``` --- ## Testing Strategy **Unit Tests:** - CacheEntry message addition and observable emission - MessageCache entry management and filtering - CacheManager cache CRUD operations - Observable completion on entry lifecycle **Integration Tests:** - Cache survives process crashes - Multiple subscribers receive all messages - Memory leak prevention (subscription cleanup) - Large message volumes (1000+ messages) --- **Document Version:** 1.0 **Last Updated:** October 2025

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jenova-marie/iris-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server