Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
channel.ts10.9 kB
/** * A bounded, buffered CSP channel implementation for TypeScript. * * Implements the Producer-Consumer pattern with automatic backpressure via * blocking send/receive semantics. Based on Communicating Sequential Processes (Hoare, 1978). * * Properties: * - Bounded buffer: O(capacity) memory usage * - Blocking send: Blocks when buffer is full * - Blocking receive: Blocks when buffer is empty * - Graceful shutdown: Close drains buffer before terminating * * Performance Characteristics: * - send(): O(R) where R = pending receivers (typically 0-10) * - receive(): O(B + S) where B = buffer size, S = pending senders * - Uses Array.shift() which is O(n) but acceptable for small queues * - Same complexity trade-off as async.queue, p-limit, and similar libraries * - For typical usage (buffer < 100, queues < 10), overhead is negligible (<10ms per 5000 operations) * * Note: Could be optimized to O(1) with linked list or circular buffer, but current * implementation prioritizes simplicity and is comparable to standard JS libraries. * * Deadlock Prevention: * JavaScript channels use cooperative blocking via Promises, not true thread blocking. * Deadlocks are rare but possible in certain patterns: * * ❌ AVOID: Sequential operations on unbuffered channels * ```typescript * const ch = new Channel<number>(0); * await ch.send(1); // Blocks forever - no receiver started * await ch.receive(); // Never reached * ``` * * ❌ AVOID: Circular dependencies between channels * ```typescript * const ch1 = new Channel(0); * const ch2 = new Channel(0); * // Task 1: await ch1.send() → await ch2.receive() * // Task 2: await ch2.send() → await ch1.receive() * // Both block on send, never reach receive * ``` * * ✅ SAFE: Concurrent start with buffered channels (recommended pattern) * ```typescript * const ch = new Channel<number>(); // Default (10) is safe * * // Start producer immediately * const producer = (async () => { * for (let i = 0; i < 100; i++) { * await ch.send(i); * } * ch.close(); // Always close in finally block * })(); * * // Start consumers immediately * const consumers = Array.from({ length: 5 }, async () => { * for await (const value of ch) { * await processValue(value); * } * }); * * // Wait for all to complete * await Promise.all([producer, ...consumers]); * ``` * * Best Practices: * 1. Use default capacity or higher (10+) for production - provides safety and throughput * 2. Always close() channels in a finally block to prevent hanging operations * 3. Start producers and consumers concurrently, not sequentially * 4. Use for-await loops for automatic cleanup on close * 5. Avoid circular dependencies between channels * 6. Handle errors in workers so they don't crash and leave channel blocked * 7. Only use unbuffered (capacity=0) when you need strict happens-before guarantees * * @see https://en.wikipedia.org/wiki/Communicating_sequential_processes * * @template T The type of values sent through the channel * * @example Safe Producer-Consumer Pattern * ```typescript * // Default capacity (10) is safe for most cases * const ch = new Channel<number>(); // or explicit: new Channel<number>(50) * * // Producer with proper cleanup * const producer = (async () => { * try { * for (let i = 0; i < 100; i++) { * await ch.send(i); // Blocks if buffer full (backpressure) * } * } finally { * ch.close(); // Guaranteed cleanup * } * })(); * * // Multiple consumers * const consumers = Array.from({ length: 3 }, async () => { * for await (const value of ch) { * console.log(value); * } * }); * * await Promise.all([producer, ...consumers]); * ``` * * @example Unbuffered Channel (Rendezvous) * ```typescript * const ch = new Channel<number>(0); // Unbuffered - use with care! * * // Must start both operations before awaiting * const sendPromise = ch.send(42); // Starts but doesn't block caller yet * const value = await ch.receive(); // Unblocks the sender * await sendPromise; // Now safe to await * ``` */ /** * Internal type for blocked senders waiting to deliver values */ interface Sender<T> { readonly value: T; readonly resolve: () => void; readonly reject: (error: Error) => void; } /** * Internal type for blocked receivers waiting for values */ interface Receiver<T> { readonly resolve: (value: T | typeof CLOSED) => void; readonly reject: (error: Error) => void; } /** * Custom error class for channel operations */ export class ChannelError extends Error { constructor(message: string, options?: ErrorOptions) { super(message, options); this.name = "ChannelError"; } } /** * Error messages for channel operations */ const ERRORS = { SEND_TO_CLOSED: "Cannot send to closed channel", CLOSED_WHILE_BLOCKED: "Channel closed while send was blocked", NEGATIVE_CAPACITY: "Channel capacity must be non-negative", } as const satisfies Record<string, string>; export class Channel<T> { #buffer: T[] = []; #sendQueue: Sender<T>[] = []; #receiveQueue: Receiver<T>[] = []; #closed = false; readonly #capacity: number; /** * Create a new channel with the specified buffer capacity. * * @param capacity - Buffer size (default: 10) * - 0: Unbuffered/rendezvous channel - strict synchronization, higher deadlock risk. * Use only when you need guaranteed happens-before ordering. * - 1-100: Buffered channel - recommended for production use. * - Higher values: Better throughput but more memory usage. * * @example * ```typescript * // Default buffered (safe for most cases) * const ch1 = new Channel<number>(); * * // Explicit buffer size (production pattern) * const ch2 = new Channel<number>(50); * * // Unbuffered (advanced - strict synchronization) * const ch3 = new Channel<number>(0); * ``` */ constructor(capacity: number = 10) { if (capacity < 0) { throw new ChannelError(ERRORS.NEGATIVE_CAPACITY); } this.#capacity = capacity; } /** * Send a value to the channel * Blocks if the buffer is full until space is available * * @param value - The value to send * @throws {ChannelError} If channel is closed */ async send(value: T): Promise<void> { if (this.#closed) { throw new ChannelError(ERRORS.SEND_TO_CLOSED); } // Direct delivery to waiting receiver const receiver = this.#receiveQueue.shift(); if (receiver) { receiver.resolve(value); return; } // Add to buffer if space available if (this.#buffer.length < this.#capacity) { this.#buffer.push(value); return; } // Block until space available return new Promise<void>((resolve, reject) => { this.#sendQueue.push({ value, resolve, reject }); }); } /** * Receive a value from the channel * Blocks if no value is available until one arrives * * @returns The received value, or CLOSED symbol if channel is closed and empty */ async receive(): Promise<T | typeof CLOSED> { // Drain buffer first if (this.#buffer.length > 0) { const value = this.#buffer.shift()!; // Unblock a waiting sender const sender = this.#sendQueue.shift(); if (sender) { this.#buffer.push(sender.value); sender.resolve(); } return value; } // Direct handoff from waiting sender (critical for unbuffered channels) const sender = this.#sendQueue.shift(); if (sender) { sender.resolve(); return sender.value; } // Channel closed and empty if (this.#closed) { return CLOSED; } // Block until value available return new Promise<T | typeof CLOSED>((resolve, reject) => { this.#receiveQueue.push({ resolve, reject }); }); } /** * Try to receive a value without blocking * Returns immediately with value or undefined if channel is empty * * @returns The received value, CLOSED if channel is closed, or undefined if empty * * @example * ```typescript * const ch = new Channel<number>(10); * await ch.send(42); * * const value = ch.tryReceive(); * if (value !== undefined && value !== CLOSED) { * console.log("Got:", value); * } * ``` */ tryReceive(): T | typeof CLOSED | undefined { // Drain buffer first if (this.#buffer.length > 0) { const value = this.#buffer.shift()!; // Unblock a waiting sender const sender = this.#sendQueue.shift(); if (sender) { this.#buffer.push(sender.value); sender.resolve(); } return value; } // Direct handoff from waiting sender const sender = this.#sendQueue.shift(); if (sender) { sender.resolve(); return sender.value; } // Channel closed and empty if (this.#closed) { return CLOSED; } // Channel empty but not closed return undefined; } /** * Close the channel * No more sends allowed, but remaining values can be received */ close(): void { if (this.#closed) return; this.#closed = true; // Resolve all blocked receivers for (const receiver of this.#receiveQueue) { receiver.resolve(CLOSED); } this.#receiveQueue = []; // Reject all blocked senders const error = new ChannelError(ERRORS.CLOSED_WHILE_BLOCKED); for (const sender of this.#sendQueue) { sender.reject(error); } this.#sendQueue = []; } /** * Check if channel is closed */ get isClosed(): boolean { return this.#closed; } /** * Get current buffer length */ get length(): number { return this.#buffer.length; } /** * Get the channel's capacity */ get capacity(): number { return this.#capacity; } /** * Get the number of blocked senders waiting */ get pendingSends(): number { return this.#sendQueue.length; } /** * Get the number of blocked receivers waiting */ get pendingReceives(): number { return this.#receiveQueue.length; } /** * Async iterator support for for-await-of loops */ async *[Symbol.asyncIterator](): AsyncIterableIterator<T> { while (true) { const value = await this.receive(); if (value === CLOSED) return; yield value; } } } /** * Special symbol to indicate channel is closed */ export const CLOSED = Symbol("CLOSED"); /** * Type guard to check if a value is the CLOSED symbol * * @param value - Value to check * @returns true if value is CLOSED symbol * * @example * ```typescript * const value = await ch.receive(); * if (isClosed(value)) { * console.log("Channel is closed"); * } else { * console.log("Got value:", value); * } * ``` */ export function isClosed<T>(value: T | typeof CLOSED): value is typeof CLOSED { return value === CLOSED; }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server