Skip to main content
Glama
claude-process.ts14.3 kB
/** * Claude Process - Dumb Pipe for stdio communication * * Phase 1 Refactor: Now uses Transport abstraction for local/remote execution. * * This is a SIMPLIFIED process wrapper that: * - Delegates to Transport for actual execution (local or remote) * - Provides consistent interface regardless of execution method * - Does NOT handle completion detection (that's Iris's job) * - Does NOT manage timeouts (that's Iris's job) * - Does NOT queue messages (return "busy" instead) * * Business logic lives in Iris, NOT here. */ import { EventEmitter } from "events"; import { existsSync } from "fs"; import { BehaviorSubject, Observable, Subject, Subscription } from "rxjs"; import type { IrisConfig } from "./types.js"; import { getChildLogger } from "../utils/logger.js"; import { ProcessError } from "../utils/errors.js"; import { CacheEntry, CacheEntryType, CacheEntryStatus } from "../cache/types.js"; import { TransportFactory } from "../transport/transport-factory.js"; import type { Transport } from "../transport/transport.interface.js"; import { TransportStatus } from "../transport/transport.interface.js"; import { ClaudeCommandBuilder } from "../utils/command-builder.js"; import { ProcessBusyError } from "../transport/local-transport.js"; import { ClaudePrintExecutor } from "../utils/claude-print.js"; import { CacheEntryImpl } from "../cache/cache-entry.js"; // ProcessBusyError now exported from local-transport.ts export { ProcessBusyError }; /** * Process status enum */ export enum ProcessStatus { STOPPED = "stopped", SPAWNING = "spawning", IDLE = "idle", PROCESSING = "processing", } /** * Basic process metrics - compatible with ProcessMetrics interface */ export interface BasicProcessMetrics { teamName: string; pid: number | null; status: ProcessStatus; messagesProcessed: number; lastUsed: number; uptime: number; idleTimeRemaining: number; queueLength: number; sessionId: string; messageCount: number; lastActivity: number; // Helper properties derived from status isReady: boolean; isSpawning: boolean; isBusy: boolean; } /** * Claude Process - Minimal wrapper delegating to Transport * * Fully reactive with RxJS observables + EventEmitter bridge for backward compatibility */ export class ClaudeProcess extends EventEmitter { private transport: Transport; private logger: ReturnType<typeof getChildLogger>; private spawnTime = 0; // Metrics tracking (for compatibility with existing code) private messageCount = 0; private lastUsed = 0; // RxJS reactive status tracking private statusSubject = new BehaviorSubject<ProcessStatus>(ProcessStatus.STOPPED); public status$: Observable<ProcessStatus>; // RxJS reactive error tracking private errorsSubject = new Subject<Error>(); public errors$: Observable<Error>; // Subscriptions for cleanup private subscriptions: Subscription[] = []; constructor( public readonly teamName: string, private irisConfig: IrisConfig, public readonly sessionId: string, ) { super(); this.logger = getChildLogger(`pool:process:${teamName}`); // Expose status and errors observables this.status$ = this.statusSubject.asObservable(); this.errors$ = this.errorsSubject.asObservable(); // Create transport using factory this.transport = TransportFactory.create(teamName, irisConfig, sessionId); // Subscribe to transport observables (replaces event forwarding) this.setupTransportSubscriptions(); this.logger.debug({ teamName, sessionId, transportType: this.transport.constructor.name, }, "PLACEHOLDER"); } /** * Setup subscriptions to transport observables * Maps Transport status → ClaudeProcess status */ private setupTransportSubscriptions(): void { // Subscribe to transport status changes const statusSub = this.transport.status$.subscribe((transportStatus) => { this.logger.debug({ teamName: this.teamName, transportStatus, }, "PLACEHOLDER"); // Map TransportStatus → ProcessStatus switch (transportStatus) { case TransportStatus.STOPPED: this.statusSubject.next(ProcessStatus.STOPPED); break; case TransportStatus.CONNECTING: case TransportStatus.SPAWNING: this.statusSubject.next(ProcessStatus.SPAWNING); break; case TransportStatus.READY: this.statusSubject.next(ProcessStatus.IDLE); break; case TransportStatus.BUSY: this.statusSubject.next(ProcessStatus.PROCESSING); break; case TransportStatus.TERMINATING: // Keep current status during termination break; case TransportStatus.ERROR: // Emit error event for backward compatibility // Status remains unchanged (will be set to STOPPED on exit) break; } }); // Subscribe to transport errors const errorsSub = this.transport.errors$.subscribe((error) => { this.logger.error( { err: error, teamName: this.teamName, }, "Transport error received", ); // Emit to errors$ observable this.errorsSubject.next(error); // Emit process-error event for backward compatibility this.emit("process-error", { teamName: this.teamName, error, }); }); // Store subscriptions for cleanup this.subscriptions.push(statusSub, errorsSub); } /** * Static method: Initialize session file * Phase 3 Migration: Now uses ClaudePrintExecutor for local AND remote support */ static async initializeSessionFile( irisConfig: IrisConfig, sessionId: string, sessionInitTimeout = 30000, ): Promise<void> { const logger = getChildLogger(`pool:session-init:${irisConfig.path}`); const projectPath = irisConfig.path; logger.info({ sessionId, projectPath, sessionInitTimeout, remote: !!irisConfig.remote, }, "PLACEHOLDER"); try { // Use ClaudePrintExecutor for session initialization // This automatically handles both local and remote execution const executor = ClaudePrintExecutor.create(irisConfig, sessionId); const result = await executor.execute({ command: "ping", // Required: creates session conversation resume: false, // Use --session-id (create new session) timeout: sessionInitTimeout, }); // Log debug log path if captured if (result.debugLogPath) { logger.info({ sessionId, debugLogPath: result.debugLogPath, }, "PLACEHOLDER"); } // Check if execution was successful if (!result.success) { const errorMsg = [ `Session initialization failed with exit code ${result.exitCode}`, result.debugLogPath ? `Debug logs: ${result.debugLogPath}` : null, `stderr: ${result.stderr}`, ] .filter(Boolean) .join("\n"); throw new ProcessError(errorMsg, projectPath); } // Log successful completion logger.info({ sessionId, exitCode: result.exitCode, stdoutLength: result.stdout.length, stderrLength: result.stderr.length, duration: result.duration, response: result.stdout.substring(0, 100), remote: !!irisConfig.remote, }, "PLACEHOLDER"); // Verify session file was created (only for local teams) // For remote teams, the session file exists on the remote host if (!irisConfig.remote) { const sessionFilePath = ClaudeProcess.getSessionFilePath( projectPath, sessionId, ); if (!existsSync(sessionFilePath)) { throw new ProcessError( `Session file was not created at ${sessionFilePath}`, projectPath, ); } logger.info( { sessionId, filePath: sessionFilePath }, "Session file initialized successfully", ); } else { logger.info( { sessionId, remote: irisConfig.remote }, "Remote session file initialized successfully", ); } } catch (error) { logger.error( { err: error instanceof Error ? error : new Error(String(error)), }, "Failed to initialize session file", ); throw error; } } /** * Get the path to a session file */ static getSessionFilePath(projectPath: string, sessionId: string): string { const homedir = process.env.HOME || process.env.USERPROFILE || ""; const escapedPath = projectPath.replace(/\//g, "-"); return `${homedir}/.claude/projects/${escapedPath}/${sessionId}.jsonl`; } /** * Spawn Claude process with spawn ping * @param spawnCacheEntry - CacheEntry with type=SPAWN, tellString='ping' * @param spawnTimeout - Timeout in ms for spawn init (from config) */ async spawn( spawnCacheEntry: CacheEntry, spawnTimeout = 20000, ): Promise<void> { this.logger.info({ teamName: this.teamName, sessionId: this.sessionId, cacheEntryType: spawnCacheEntry.cacheEntryType, transportType: this.transport.constructor.name, }, "PLACEHOLDER"); this.spawnTime = Date.now(); // Build Claude command (process-pool level responsibility) const commandInfo = ClaudeCommandBuilder.build( this.teamName, this.irisConfig, this.sessionId, ); this.logger.debug({ teamName: this.teamName, executable: commandInfo.executable, argsCount: commandInfo.args.length, cwd: commandInfo.cwd, }, "PLACEHOLDER"); // Delegate to transport (status updates happen via transport.status$ subscription) await this.transport.spawn(spawnCacheEntry, commandInfo, spawnTimeout); this.logger.info({ teamName: this.teamName, }, "PLACEHOLDER"); } /** * Execute tell * @param cacheEntry - CacheEntry with type=TELL, tellString=message */ executeTell(cacheEntry: CacheEntry): void { this.logger.debug({ teamName: this.teamName, cacheEntryType: cacheEntry.cacheEntryType, tellStringLength: cacheEntry.tellString.length, }, "PLACEHOLDER"); // Update metrics this.messageCount++; this.lastUsed = Date.now(); // Delegate to transport (status updates happen via transport.status$ subscription) this.transport.executeTell(cacheEntry); } /** * Convenience method for tests: Send a tell message * Creates a CacheEntry, delegates to executeTell, and emits events when complete * @param message - Message to send */ tell(message: string): void { // Create a cache entry const cacheEntry = new CacheEntryImpl(CacheEntryType.TELL, message); // Subscribe to messages to detect completion cacheEntry.messages$.subscribe((msg) => { if (msg.type === "result") { // Tell completed - emit message-response event for tests this.emit("message-response", { teamName: this.teamName, success: msg.data.subtype === "success", message: msg.data, }); } }); this.executeTell(cacheEntry); } // Private methods removed - now in LocalTransport (and SSH2Transport in Phase 2) /** * Get basic metrics - returns all ProcessMetrics properties */ getBasicMetrics(): BasicProcessMetrics { // Get metrics from transport const transportMetrics = this.transport.getMetrics(); const isReady = this.transport.isReady(); const isBusy = this.transport.isBusy(); const pid = this.transport.getPid(); // Use the current status from the BehaviorSubject (single source of truth) const status = this.statusSubject.value; return { teamName: this.teamName, pid, status, messagesProcessed: transportMetrics.messagesProcessed, lastUsed: this.lastUsed || this.spawnTime, uptime: transportMetrics.uptime, idleTimeRemaining: 0, // Iris manages timeouts, not ClaudeProcess queueLength: 0, // No queue in dumb pipe model sessionId: this.sessionId, messageCount: this.messageCount, lastActivity: transportMetrics.lastResponseAt || this.spawnTime, // Helper properties isReady, isSpawning: status === ProcessStatus.SPAWNING, isBusy, }; } /** * Check if spawning */ isSpawning(): boolean { const transportMetrics = this.transport.getMetrics(); return !this.transport.isReady() && transportMetrics.uptime > 0; } /** * Send ESC character to stdin (attempt to cancel current operation) * This is experimental - may or may not work depending on Claude's headless mode implementation */ cancel(): void { this.logger.info({ teamName: this.teamName, isBusy: this.transport.isBusy(), }, "PLACEHOLDER"); // Delegate to transport (if supported) if (this.transport.cancel) { this.transport.cancel(); } else { this.logger.warn({ transportType: this.transport.constructor.name, }, "PLACEHOLDER"); } } /** * Get launch command for debugging (delegates to transport) */ getLaunchCommand(): string | null { return this.transport.getLaunchCommand?.() || null; } /** * Get team config snapshot for debugging (delegates to transport) */ getTeamConfigSnapshot(): string | null { return this.transport.getTeamConfigSnapshot?.() || null; } /** * Terminate process via transport */ async terminate(): Promise<void> { this.logger.info({ teamName: this.teamName, transportType: this.transport.constructor.name, }, "PLACEHOLDER"); // Delegate to transport (status updates happen via transport.status$ subscription) await this.transport.terminate(); // Unsubscribe from transport observables (AFTER terminate completes) this.subscriptions.forEach(sub => sub.unsubscribe()); this.subscriptions = []; // Emit STOPPED status to trigger pool cleanup this.statusSubject.next(ProcessStatus.STOPPED); // Complete observables (no more emissions after termination) this.statusSubject.complete(); this.errorsSubject.complete(); this.logger.info({ teamName: this.teamName, }, "PLACEHOLDER"); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jenova-marie/iris-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server