/**
* Iris MCP - Process Pool Manager
* Manages a pool of Claude Code processes with LRU eviction
*/
import { EventEmitter } from "events";
import { Subscription } from "rxjs";
import { filter } from "rxjs/operators";
import { ClaudeProcess, ProcessStatus } from "./claude-process.js";
import type { ProcessPoolStatus, ProcessPoolConfig } from "./types.js";
import { PoolEvent } from "./types.js";
import { TeamsConfigManager } from "../config/iris-config.js";
import { getChildLogger } from "../utils/logger.js";
import { TeamNotFoundError, ProcessPoolLimitError } from "../utils/errors.js";
import { CacheEntryImpl } from "../cache/cache-entry.js";
import { CacheEntryType } from "../cache/types.js";
export class ClaudeProcessPool extends EventEmitter {
private processes = new Map<string, ClaudeProcess>();
private sessionToProcess = new Map<string, string>(); // sessionId -> poolKey mapping
private accessOrder: string[] = []; // For LRU tracking
private healthCheckInterval: NodeJS.Timeout | null = null;
private logger = getChildLogger("pool:manager");
// RxJS subscriptions per process (poolKey -> Subscription[])
private processSubscriptions = new Map<string, Subscription[]>();
constructor(
private configManager: TeamsConfigManager,
private config: ProcessPoolConfig,
) {
super();
this.startHealthCheck();
}
/**
* Get the current configuration
*/
getConfig() {
return this.configManager.getConfig();
}
/**
* Generate pool key for team pair
* Format: "fromTeam->toTeam"
* This maintains conversation isolation between different team pairs
*/
private getPoolKey(fromTeam: string, toTeam: string): string {
return `${fromTeam}->${toTeam}`;
}
/**
* Setup RxJS subscriptions to process observables
* Replaces old EventEmitter event listeners
*/
private setupProcessSubscriptions(
process: ClaudeProcess,
poolKey: string,
sessionId: string,
): void {
const subscriptions: Subscription[] = [];
// Subscribe to process status changes
const statusSub = process.status$.subscribe((status) => {
this.logger.debug(
{
poolKey,
sessionId,
status,
},
"Process status changed",
);
// Handle terminal states - clean up process from pool
if (status === ProcessStatus.STOPPED) {
this.logger.info(
{
poolKey,
sessionId,
},
"Process stopped, cleaning up from pool",
);
// Clean up subscriptions
const subs = this.processSubscriptions.get(poolKey);
if (subs) {
subs.forEach((sub) => sub.unsubscribe());
this.processSubscriptions.delete(poolKey);
}
// Clean up from pool
this.processes.delete(poolKey);
this.sessionToProcess.delete(sessionId);
this.removeFromAccessOrder(poolKey);
// Emit event for backward compatibility
this.emit(PoolEvent.PROCESS_TERMINATED, { teamName: process.teamName });
}
});
// Subscribe to process errors
const errorsSub = process.errors$.subscribe((error) => {
this.logger.error(
{
err: error,
poolKey,
sessionId,
},
"Process error received",
);
// Emit error event for backward compatibility
this.emit(PoolEvent.PROCESS_ERROR, {
teamName: process.teamName,
error,
});
});
subscriptions.push(statusSub, errorsSub);
this.processSubscriptions.set(poolKey, subscriptions);
}
/**
* Get process by session ID
*/
getProcessBySessionId(sessionId: string): ClaudeProcess | undefined {
const poolKey = this.sessionToProcess.get(sessionId);
if (!poolKey) return undefined;
return this.processes.get(poolKey);
}
/**
* Get or create a process for a team
*
* @param teamName - The team to get/create process for
* @param sessionId - The session ID to use for this process
* @param fromTeam - The requesting team
*/
async getOrCreateProcess(
teamName: string,
sessionId: string,
fromTeam: string,
): Promise<ClaudeProcess> {
// Check if team exists in configuration
const irisConfig = this.configManager.getIrisConfig(teamName);
if (!irisConfig) {
throw new TeamNotFoundError(teamName);
}
this.logger.debug(
{
fromTeam,
toTeam: teamName,
sessionId,
},
"Using session for team pair",
);
// Generate pool key for this team pair
const poolKey = this.getPoolKey(fromTeam, teamName);
// Update access order for LRU
this.updateAccessOrder(poolKey);
// Return existing process if available
const existing = this.processes.get(poolKey);
if (
existing &&
existing.getBasicMetrics().status !== ProcessStatus.STOPPED
) {
this.logger.debug({ poolKey, sessionId }, "Using existing process");
return existing;
}
// Check pool limit
if (this.processes.size >= this.config.maxProcesses) {
await this.evictLRU();
}
// Create new process
this.logger.info(
{
poolKey,
teamName,
sessionId,
fromTeam,
toTeam: teamName,
},
"Creating new process",
);
const process = new ClaudeProcess(teamName, irisConfig, sessionId);
// Set up RxJS subscriptions to process observables
this.setupProcessSubscriptions(process, poolKey, sessionId);
// Register session BEFORE spawning so HTTP/MCP server can route requests
// This is critical for remote teams with reverse tunnels - Claude tries to
// connect immediately after spawn and needs the session to be registered
this.processes.set(poolKey, process);
this.sessionToProcess.set(sessionId, poolKey);
this.updateAccessOrder(poolKey);
// Spawn the process with a temporary cache entry for init ping
try {
const spawnCacheEntry = new CacheEntryImpl(CacheEntryType.SPAWN, "ping");
const spawnTimeout = this.config.spawnTimeout || 20000;
await process.spawn(spawnCacheEntry, spawnTimeout);
this.logger.info(
{
poolKey,
teamName,
sessionId,
totalProcesses: this.processes.size,
},
"Process successfully added to pool",
);
return process;
} catch (error) {
// CRITICAL: Clean up the failed process
// The process object exists but spawn failed, so it's in a zombie state
this.logger.error(
{
err: error instanceof Error ? error : new Error(String(error)),
poolKey,
teamName,
sessionId,
},
"Process spawn failed, cleaning up",
);
// Remove from pool and session maps (since we registered before spawn)
this.processes.delete(poolKey);
this.sessionToProcess.delete(sessionId);
this.accessOrder = this.accessOrder.filter((key) => key !== poolKey);
// Terminate the zombie process to clean up any resources
await process.terminate().catch((termError) => {
this.logger.warn(
{
err:
termError instanceof Error
? termError
: new Error(String(termError)),
poolKey,
},
"Failed to terminate zombie process",
);
});
// Re-throw the original error
throw error;
}
}
/**
* Terminate a specific process
*/
async terminateProcess(teamName: string): Promise<void> {
// Find the process by team name (it could be in any pool key)
const process = this.getProcess(teamName);
if (process) {
// Just call terminate - the event handlers will clean up the maps
await process.terminate();
}
}
/**
* Terminate all processes
*/
async terminateAll(): Promise<void> {
this.logger.info("Terminating all processes");
// Copy the processes array to avoid modifying while iterating
const processesToTerminate = Array.from(this.processes.values());
const promises: Promise<void>[] = [];
for (const process of processesToTerminate) {
promises.push(process.terminate());
}
await Promise.all(promises);
// Clean up all subscriptions
for (const subs of this.processSubscriptions.values()) {
subs.forEach((sub) => sub.unsubscribe());
}
this.processSubscriptions.clear();
// Event handlers should have cleaned up, but ensure everything is cleared
this.processes.clear();
this.sessionToProcess.clear();
this.accessOrder = [];
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
}
/**
* Get pool status with session information
*/
getStatus(): ProcessPoolStatus {
const processes: Record<string, any> = {};
for (const [poolKey, process] of this.processes) {
const metrics = process.getBasicMetrics();
// Find associated session ID
let sessionId: string | undefined;
for (const [sid, pk] of this.sessionToProcess) {
if (pk === poolKey) {
sessionId = sid;
break;
}
}
processes[poolKey] = {
...metrics,
sessionId,
poolKey,
};
}
return {
totalProcesses: this.processes.size,
maxProcesses: this.config.maxProcesses,
processes,
activeSessions: this.sessionToProcess.size,
};
}
/**
* Log current pool state for debugging
*/
logPoolState(context: string): void {
const status = this.getStatus();
this.logger.debug(
{
context,
totalProcesses: status.totalProcesses,
maxProcesses: status.maxProcesses,
activeSessions: status.activeSessions,
processes: Object.entries(status.processes).map(([key, proc]) => ({
poolKey: key,
status: proc.status,
pid: proc.pid,
sessionId: proc.sessionId,
messageCount: proc.messageCount,
})),
accessOrder: this.accessOrder,
sessionMappings: Array.from(this.sessionToProcess.entries()),
},
"Pool state snapshot",
);
}
/**
* Get process for a team (if exists)
* @deprecated Use getProcessBySessionId or getOrCreateProcess instead
*/
getProcess(teamName: string): ClaudeProcess | undefined {
// Search for any process where toTeam matches (pool key format: "fromTeam->toTeam")
for (const [poolKey, process] of this.processes) {
if (poolKey.endsWith(`->${teamName}`)) {
return process;
}
}
return undefined;
}
/**
* Send a command to a process (for compaction, etc.)
* Creates a temporary cache entry to send the command using the new architecture
*/
async sendCommandToSession(
sessionId: string,
command: string,
): Promise<string | null> {
const process = this.getProcessBySessionId(sessionId);
if (!process) {
this.logger.warn({ sessionId }, "No process found for session");
return null;
}
try {
// Create temporary cache entry for this command
const commandEntry = new CacheEntryImpl(CacheEntryType.TELL, command);
// Execute the command (non-blocking)
process.executeTell(commandEntry);
// Wait for result with timeout
const response = await new Promise<string>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("Command timeout after 30s"));
}, 30000);
// Subscribe to result message
const subscription = commandEntry.messages$
.pipe(filter((msg) => msg.type === "result"))
.subscribe(() => {
clearTimeout(timeout);
subscription.unsubscribe();
// Extract response from assistant messages
const messages = commandEntry.getMessages();
const assistantMessages = messages.filter(
(m) => m.type === "assistant",
);
const response = assistantMessages
.map((m) => m.data.message?.content?.[0]?.text || "")
.join("\n");
resolve(response);
});
});
this.logger.info({ sessionId, command }, "Command sent to session");
return response;
} catch (error) {
this.logger.error(
{
err: error instanceof Error ? error : new Error(String(error)),
sessionId,
command,
},
"Failed to send command to session",
);
throw error;
}
}
/**
* Evict least recently used process
*/
private async evictLRU(): Promise<void> {
if (this.accessOrder.length === 0) {
throw new ProcessPoolLimitError(this.config.maxProcesses);
}
// Find first process that's not currently processing
let victimIndex = -1;
for (let i = 0; i < this.accessOrder.length; i++) {
const teamName = this.accessOrder[i];
const process = this.processes.get(teamName);
if (process && process.getBasicMetrics().status === ProcessStatus.IDLE) {
victimIndex = i;
break;
}
}
if (victimIndex === -1) {
// All processes are busy, evict the oldest anyway
victimIndex = 0;
}
const victimPoolKey = this.accessOrder[victimIndex];
const victimProcess = this.processes.get(victimPoolKey);
if (!victimProcess) {
// Pool key exists in accessOrder but process is gone - clean up and retry
this.removeFromAccessOrder(victimPoolKey);
return this.evictLRU();
}
this.logger.info({ poolKey: victimPoolKey }, "Evicting LRU process");
// Directly terminate the process - event handlers will clean up the maps
await victimProcess.terminate();
}
/**
* Update access order for LRU tracking
*/
private updateAccessOrder(teamName: string): void {
// Remove if exists
this.removeFromAccessOrder(teamName);
// Add to end (most recently used)
this.accessOrder.push(teamName);
}
/**
* Remove team from access order
*/
private removeFromAccessOrder(teamName: string): void {
const index = this.accessOrder.indexOf(teamName);
if (index > -1) {
this.accessOrder.splice(index, 1);
}
}
/**
* Start health check interval
*/
private startHealthCheck(): void {
this.healthCheckInterval = setInterval(() => {
this.performHealthCheck();
}, this.config.healthCheckInterval);
}
/**
* Perform health check on all processes
*/
private performHealthCheck(): void {
const processesToRemove: string[] = [];
for (const [teamName, process] of this.processes) {
const metrics = process.getBasicMetrics();
// Remove stopped processes
if (metrics.status === ProcessStatus.STOPPED) {
processesToRemove.push(teamName);
continue;
}
// Log metrics
this.logger.debug(
{
teamName,
status: metrics.status,
messagesProcessed: metrics.messagesProcessed,
uptime: metrics.uptime,
queueLength: metrics.queueLength,
},
"Process health check",
);
}
// Clean up stopped processes
for (const teamName of processesToRemove) {
this.logger.info({ teamName }, "Removing stopped process from pool");
this.processes.delete(teamName);
this.removeFromAccessOrder(teamName);
}
// Emit health check event
this.emit(PoolEvent.HEALTH_CHECK, this.getStatus());
}
}