Skip to main content
Glama
OBSERVABILITY.md21.6 kB
# Observability Architecture: RxJS Migration This document describes the reactive observability architecture in Iris MCP, the RxJS migration strategy, and the rationale for maintaining EventEmitter bridges. ## Table of Contents - [Overview](#overview) - [Architecture Layers](#architecture-layers) - [RxJS Implementation](#rxjs-implementation) - [Event Name Type Safety](#event-name-type-safety) - [EventEmitter Bridges](#eventemitter-bridges) - [Migration Strategy](#migration-strategy) - [Status Observable Flow](#status-observable-flow) - [Error Observable Flow](#error-observable-flow) - [Iris Orchestrator Integration](#iris-orchestrator-integration) ## Overview Iris MCP uses a **hybrid reactive architecture** combining RxJS observables at the core layers with EventEmitter bridges for consumer compatibility. This approach provides: - **Type-safe reactive streams** for status and error propagation - **Single source of truth** via BehaviorSubjects - **Backward compatibility** via EventEmitter bridges - **Memory safety** through proper subscription management - **Progressive enhancement** without breaking changes ## Architecture Layers The codebase uses a **three-tier reactive architecture**: ``` ┌─────────────────────────────────────────────────────────┐ │ Layer 1: Transport (Pure RxJS) │ │ - LocalTransport, SSHTransport │ │ - Emits: status$, errors$ │ │ - NO EventEmitter │ └──────────────────┬──────────────────────────────────────┘ │ subscribes to observables ▼ ┌─────────────────────────────────────────────────────────┐ │ Layer 2: ClaudeProcess (RxJS + EventEmitter bridge) │ │ - Subscribes to transport.status$, transport.errors$ │ │ - Emits: status$, errors$ │ │ - ALSO emits EventEmitter events for backward compat │ └──────────────────┬──────────────────────────────────────┘ │ subscribes to observables ▼ ┌─────────────────────────────────────────────────────────┐ │ Layer 3: ProcessPool (RxJS + EventEmitter bridge) │ │ - Subscribes to process.status$, process.errors$ │ │ - Maps ProcessStatus → cleanup actions │ │ - ALSO emits EventEmitter events for consumers │ └──────────────────┬──────────────────────────────────────┘ │ .on("process-terminated"), etc. ▼ ┌─────────────────────────────────────────────────────────┐ │ Consumers: mcp_server, Dashboard, Iris │ │ - Listen to EventEmitter events │ │ - Simple event-based logging/forwarding │ └─────────────────────────────────────────────────────────┘ ``` ## RxJS Implementation ### Transport Layer (Pure RxJS) All transports expose two observables: ```typescript export interface Transport { /** * Observable stream of transport status changes * BehaviorSubject - emits current status immediately on subscription */ status$: Observable<TransportStatus>; /** * Observable stream of transport errors * Subject - emits errors as they occur (no initial value) */ errors$: Observable<Error>; } ``` **TransportStatus Lifecycle:** ``` STOPPED → CONNECTING → SPAWNING → READY → BUSY → READY → TERMINATING → STOPPED ``` **Example: LocalTransport** ```typescript export class LocalTransport implements Transport { private statusSubject = new BehaviorSubject<TransportStatus>(Status.STOPPED); public status$: Observable<TransportStatus>; private errorsSubject = new Subject<Error>(); public errors$: Observable<Error>; constructor(...) { this.status$ = this.statusSubject.asObservable(); this.errors$ = this.errorsSubject.asObservable(); } async spawn(...) { this.statusSubject.next(Status.SPAWNING); // ... spawn logic ... this.statusSubject.next(Status.READY); } executeTell(...) { this.statusSubject.next(Status.BUSY); // ... execution logic ... // When result received: this.statusSubject.next(Status.READY); } async terminate() { this.statusSubject.next(Status.TERMINATING); // ... cleanup ... this.statusSubject.next(Status.STOPPED); this.statusSubject.complete(); this.errorsSubject.complete(); } } ``` ### ClaudeProcess Layer (RxJS + Bridge) ClaudeProcess subscribes to Transport observables and: 1. Maps `TransportStatus` → `ProcessStatus` 2. Exposes its own observables (`status$`, `errors$`) 3. Maintains EventEmitter bridge for backward compatibility ```typescript export class ClaudeProcess extends EventEmitter { private statusSubject = new BehaviorSubject<ProcessStatus>(ProcessStatus.STOPPED); public status$: Observable<ProcessStatus>; private errorsSubject = new Subject<Error>(); public errors$: Observable<Error>; private subscriptions: Subscription[] = []; constructor(...) { super(); this.status$ = this.statusSubject.asObservable(); this.errors$ = this.errorsSubject.asObservable(); this.transport = TransportFactory.create(...); this.setupTransportSubscriptions(); } private setupTransportSubscriptions(): void { // Subscribe to transport status and map to process status const statusSub = this.transport.status$.subscribe((transportStatus) => { switch (transportStatus) { case TransportStatus.STOPPED: this.statusSubject.next(ProcessStatus.STOPPED); break; case TransportStatus.CONNECTING: case TransportStatus.SPAWNING: this.statusSubject.next(ProcessStatus.SPAWNING); break; case TransportStatus.READY: this.statusSubject.next(ProcessStatus.IDLE); break; case TransportStatus.BUSY: this.statusSubject.next(ProcessStatus.PROCESSING); break; } }); // Subscribe to transport errors and forward const errorsSub = this.transport.errors$.subscribe((error) => { this.errorsSubject.next(error); // EventEmitter bridge for backward compatibility this.emit("process-error", { teamName: this.teamName, error }); }); this.subscriptions.push(statusSub, errorsSub); } async terminate(): Promise<void> { // Cleanup subscriptions this.subscriptions.forEach(sub => sub.unsubscribe()); this.subscriptions = []; await this.transport.terminate(); // Complete observables this.statusSubject.complete(); this.errorsSubject.complete(); } } ``` ### ProcessPool Layer (RxJS + Bridge) ProcessPool subscribes to ClaudeProcess observables and: 1. Manages process lifecycle based on status changes 2. Cleans up on `ProcessStatus.STOPPED` 3. Emits EventEmitter events for consumers ```typescript export class ClaudeProcessPool extends EventEmitter { private processSubscriptions = new Map<string, Subscription[]>(); private setupProcessSubscriptions( process: ClaudeProcess, poolKey: string, sessionId: string, ): void { const subscriptions: Subscription[] = []; // Subscribe to process status changes const statusSub = process.status$.subscribe((status) => { if (status === ProcessStatus.STOPPED) { // Clean up subscriptions const subs = this.processSubscriptions.get(poolKey); if (subs) { subs.forEach(sub => sub.unsubscribe()); this.processSubscriptions.delete(poolKey); } // Clean up from pool this.processes.delete(poolKey); this.sessionToProcess.delete(sessionId); this.removeFromAccessOrder(poolKey); // EventEmitter bridge for backward compatibility this.emit(PoolEvent.PROCESS_TERMINATED, { teamName: process.teamName }); } }); // Subscribe to process errors const errorsSub = process.errors$.subscribe((error) => { // EventEmitter bridge for backward compatibility this.emit(PoolEvent.PROCESS_ERROR, { teamName: process.teamName, error, }); }); subscriptions.push(statusSub, errorsSub); this.processSubscriptions.set(poolKey, subscriptions); } } ``` ## Event Name Type Safety To prevent typos and improve type safety, all ProcessPool events use an enum: ```typescript // src/process-pool/types.ts export enum PoolEvent { PROCESS_TERMINATED = "process-terminated", PROCESS_ERROR = "process-error", HEALTH_CHECK = "health-check", } ``` **Usage:** ```typescript // Emitter (type-safe) this.emit(PoolEvent.PROCESS_TERMINATED, data); // Listener (type-safe) pool.on(PoolEvent.PROCESS_TERMINATED, handler); // Compiler prevents typos pool.on("process-spawned", handler); // Won't happen - we use enum! ``` **Why not full type safety?** EventEmitter itself doesn't enforce enum types at the TypeScript level (it accepts any string). However, by consistently using the enum in our codebase: - IDE autocomplete suggests valid event names - Refactoring is safe (rename enum value once) - Code review catches string literals - Convention prevents typos ## EventEmitter Bridges ### Why Keep EventEmitter Bridges? The codebase maintains EventEmitter bridges at ProcessPool and ClaudeProcess levels for several reasons: **1. Consumer Simplicity** - `mcp_server` uses events for **simple logging only** - `DashboardStateBridge` uses events for **WebSocket forwarding** - Neither consumer needs reactive complexity **2. Minimal Overhead** - EventEmitter forwarding adds negligible performance cost - Events are only emitted on significant state changes (terminate, error) - No memory leaks - subscriptions properly cleaned up **3. Isolation & Flexibility** - Core layers (Transport, ClaudeProcess, ProcessPool) are fully reactive - Consumers can be migrated to RxJS incrementally if needed - Bridges provide clean abstraction boundary **4. No Breaking Changes** - mcp_server and Dashboard continue working unchanged - Tests continue passing without modification - Safe, progressive migration strategy ### What Events Are Bridged? **ProcessPool emits:** - `PoolEvent.PROCESS_TERMINATED` - When process stops (any reason) - `PoolEvent.PROCESS_ERROR` - When process encounters error - `PoolEvent.HEALTH_CHECK` - Periodic health check results **ClaudeProcess emits (legacy, should be removed eventually):** - `"process-spawned"` - When process starts (forwarded from Transport) - `"process-exited"` - When process exits (forwarded from Transport) - `"process-error"` - When process errors (forwarded from Transport) - `"process-terminated"` - When process terminates (forwarded from Transport) - `"message-response"` - When message completes (for tests) **Note:** ClaudeProcess EventEmitter forwarding from Transport is deprecated since Transport no longer emits EventEmitter events. These forwarders should be removed in a future cleanup. ### Consumer Usage Examples **mcp_server (simple logging):** ```typescript private setupEventListeners(): void { this.processPool.on(PoolEvent.PROCESS_TERMINATED, (data) => { logger.info(data, "Process terminated"); }); this.processPool.on(PoolEvent.PROCESS_ERROR, (data) => { logger.error({ err: data.error }, "Process error"); }); } ``` **DashboardStateBridge (WebSocket forwarding):** ```typescript private setupEventForwarding(): void { this.pool.on(PoolEvent.PROCESS_TERMINATED, (data) => { this.emit("ws:process-status", { teamName: data.teamName, status: "stopped", }); }); this.pool.on(PoolEvent.PROCESS_ERROR, (data) => { this.emit("ws:process-error", { teamName: data.teamName, error: data.error.message, }); }); } ``` ## Migration Strategy The RxJS migration followed a **progressive enhancement strategy** with backward compatibility at each layer: ### Phase 1: Transport Layer 1. ✅ Added `status$` and `errors$` observables to Transport interface 2. ✅ Implemented observables in LocalTransport, SSHTransport 3. ✅ Removed all EventEmitter code from transports 4. ✅ Kept Transport interface stable ### Phase 2: ClaudeProcess & ProcessPool 1. ✅ Added `status$` and `errors$` to ClaudeProcess 2. ✅ ClaudeProcess subscribes to Transport observables 3. ✅ ProcessPool subscribes to ClaudeProcess observables 4. ✅ Maintained EventEmitter bridges for consumers 5. ✅ Fixed event name bug (mcp_server listening to non-existent events) ### Phase 3: Type Safety (Current) 1. ✅ Created `PoolEvent` enum for event names 2. ✅ Updated all emit/on calls to use enum 3. ✅ Removed broken event listeners (non-existent events) ### Phase 4: Future (Optional) - Convert mcp_server to use observables (low priority - logging is simple) - Convert DashboardStateBridge to use observables (low priority - forwarding is simple) - Remove EventEmitter bridges from ProcessPool - Remove EventEmitter forwarding from ClaudeProcess - Pure RxJS stack end-to-end **Decision:** Keep bridges indefinitely. The reactive core provides all benefits, and bridges have no downsides. ## Status Observable Flow ### Complete Lifecycle Example ``` User calls send_message → Iris orchestrator → ProcessPool → ClaudeProcess → Transport ``` **Status propagation (reactive):** ``` 1. Transport: STOPPED 2. User action: send_message("frontend", "backend", "hello") 3. Transport: SPAWNING ↓ observable 4. ClaudeProcess: SPAWNING ↓ observable 5. ProcessPool: (observes SPAWNING, no action) 6. Transport: READY (after init message) ↓ observable 7. ClaudeProcess: IDLE ↓ observable 8. ProcessPool: (observes IDLE, no action) 9. Transport: BUSY (executeTell called) ↓ observable 10. ClaudeProcess: PROCESSING ↓ observable 11. ProcessPool: (observes PROCESSING, no action) 12. Transport: READY (result message received) ↓ observable 13. ClaudeProcess: IDLE ↓ observable 14. ProcessPool: (observes IDLE, process available) 15. User calls: team_sleep("backend") 16. Transport: TERMINATING ↓ observable 17. ClaudeProcess: (no status change during termination) ↓ observable 18. ProcessPool: (observes TERMINATING, no action) 19. Transport: STOPPED ↓ observable 20. ClaudeProcess: STOPPED ↓ observable 21. ProcessPool: CLEANUP (unsubscribe, delete from pool, emit event) ↓ EventEmitter 22. mcp_server: Log "Process terminated" ``` ### Key Points - **Status flows via observables** through each layer - **ProcessPool only acts on STOPPED** (cleanup trigger) - **EventEmitter used only for final consumer notification** - **Single source of truth**: BehaviorSubject at each layer ## Error Observable Flow ### Error Propagation Example ``` 1. Transport: Error occurs (SSH connection failed) ↓ errors$ observable 2. ClaudeProcess: Receives error via transport.errors$ subscription - Emits to errors$ observable - ALSO emits "process-error" EventEmitter event (bridge) ↓ errors$ observable 3. ProcessPool: Receives error via process.errors$ subscription - Logs error - Emits PoolEvent.PROCESS_ERROR (bridge) ↓ EventEmitter 4. mcp_server: Logs error to console 5. Dashboard: Forwards error to WebSocket clients ``` ### Error Handling Best Practices ```typescript // Transport layer - emit to errors$ this.errorsSubject.next(new ProcessError("SSH connection failed", this.teamName)); // ClaudeProcess - subscribe and forward this.transport.errors$.subscribe((error) => { this.errorsSubject.next(error); // Observable this.emit("process-error", { teamName: this.teamName, error }); // Bridge }); // ProcessPool - subscribe and forward process.errors$.subscribe((error) => { this.logger.error({ err: error }, "Process error"); this.emit(PoolEvent.PROCESS_ERROR, { teamName: process.teamName, error }); // Bridge }); // Consumer - simple EventEmitter listener pool.on(PoolEvent.PROCESS_ERROR, (data) => { logger.error({ err: data.error }, "Process error"); }); ``` ## Iris Orchestrator Integration Iris Orchestrator (`src/iris.ts`) is the **business logic brain** that: - Orchestrates process lifecycle - Manages completion detection - Handles timeout logic (two-timeout architecture) - Coordinates cache and sessions ### Iris Uses RxJS Internally Iris uses RxJS extensively for reactive coordination: ```typescript export class IrisOrchestrator { private responseSubscriptions = new Map<string, Subscription>(); async sendMessage(...): Promise<string | object> { // Create cache entry const tellEntry = messageCache.createEntry(CacheEntryType.TELL, message); // Subscribe to cache messages for completion detection const subscription = tellEntry.messages$ .pipe(filter((msg) => msg.type === "result")) .subscribe(() => { this.handleTellCompletion(sessionId, tellEntry); subscription.unsubscribe(); }); this.responseSubscriptions.set(sessionId, subscription); // Execute tell (non-blocking) process.executeTell(tellEntry); // Wait for completion or timeout return this.waitForCompletion(sessionId, tellEntry, timeout); } private async waitForCompletion(...): Promise<string | object> { return new Promise((resolve) => { // Subscribe to result message const subscription = cacheEntry.messages$ .pipe(filter((msg) => msg.type === "result")) .subscribe(() => { const response = this.extractFullResponse(cacheEntry); resolve(response); }); }); } } ``` ### Iris Does NOT Use ProcessPool Events **Important:** Iris does **NOT** listen to ProcessPool events. Instead, Iris: 1. **Calls methods directly** on ProcessPool and ClaudeProcess 2. **Subscribes to cache observables** (`tellEntry.messages$`) 3. **Manages timeouts independently** via internal timers 4. **Updates session state** via SessionManager **Why?** Iris is the orchestrator - it drives behavior, not reacts to events. The event-based approach is for **observational consumers** (logging, monitoring, dashboards), not for **business logic**. ### Two-Timeout Architecture Iris implements a sophisticated two-timeout system: **1. Response Timeout (from config, default 120s)** - Detects stalled Claude responses - Resets on each message received - Triggers process recreation on timeout - Iris's responsibility, NOT ClaudeProcess **2. MCP Timeout (from caller)** - Controls how long caller waits - `-1`: Async mode (return immediately) - `0`: Wait indefinitely - `N`: Wait N ms, then return partial results ```typescript // Response timeout - resets on each message const subscription = cacheEntry.messages$.subscribe((msg) => { resetTimer(); // Reset timeout on each message if (msg.type === "result") { this.handleTellCompletion(sessionId, cacheEntry); } }); // MCP timeout - controls caller wait if (mcpTimeout > 0) { setTimeout(() => { resolve({ status: "mcp_timeout", partialResponse: this.extractPartialResponse(cacheEntry), }); }, mcpTimeout); } ``` **Key insight:** Both timeouts use **RxJS subscriptions** to `cacheEntry.messages$`, NOT ProcessPool events. This keeps business logic decoupled from event system. ## Summary ### Architecture Benefits ✅ **Reactive Core**: Transport, ClaudeProcess, ProcessPool use RxJS observables ✅ **Type Safety**: PoolEvent enum prevents event name typos ✅ **Single Source of Truth**: BehaviorSubjects hold canonical state ✅ **Memory Safe**: Proper subscription cleanup prevents leaks ✅ **Backward Compatible**: EventEmitter bridges maintain existing consumers ✅ **Progressive**: Can migrate consumers incrementally (or never) ✅ **Decoupled**: Business logic (Iris) uses observables, monitoring uses events ### Design Decisions 1. **Keep EventEmitter bridges** - No performance cost, maintains simplicity for consumers 2. **Use enums for event names** - Type safety via conventions and IDE support 3. **RxJS at core, events at edges** - Reactive where it matters, simple where it doesn't 4. **Iris uses observables directly** - Business logic decoupled from event system 5. **Cleanup on STOPPED** - ProcessPool acts only on terminal status ### Migration Complete The RxJS migration is **complete and production-ready**. The hybrid architecture provides all benefits of reactive programming while maintaining pragmatic bridges for consumers that don't need reactive complexity. **No further migration needed** unless there's a specific requirement to convert mcp_server or Dashboard to RxJS (currently: no benefit, added complexity).

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jenova-marie/iris-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server