Skip to main content
Glama
firebase
by firebase
streaming.ts6.32 kB
/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { GenkitError } from './error'; /** * Error thrown when a stream cannot be found. */ export class StreamNotFoundError extends GenkitError { constructor(message: string) { super({ status: 'NOT_FOUND', message }); this.name = 'StreamNotFoundError'; } } /** * Interface for writing content to a stream. * @template S The type of the stream chunks. * @template O The type of the final output. */ export interface ActionStreamInput<S, O> { /** * Writes a chunk to the stream. * @param chunk The chunk data to write. */ write(chunk: S): Promise<void>; /** * Closes the stream with a final output. * @param output The final output data. */ done(output: O): Promise<void>; /** * Closes the stream with an error. * @param err The error that occurred. */ error(err: any): Promise<void>; } /** * Subscriber callbacks for receiving stream events. * @template S The type of the stream chunks. * @template O The type of the final output. */ export type ActionStreamSubscriber<S, O> = { /** Called when a chunk is received. */ onChunk: (chunk: S) => void; /** Called when the stream completes successfully. */ onDone: (output: O) => void; /** Called when the stream encounters an error. */ onError: (error: any) => void; }; /** * Interface for managing streaming actions, allowing creation and subscription to streams. * Implementations can provide different storage backends (e.g., in-memory, database, cache). */ export interface StreamManager { /** * Opens a stream for writing. * @param streamId The unique identifier for the stream. * @returns An object to write to the stream. */ open<S, O>(streamId: string): Promise<ActionStreamInput<S, O>>; /** * Subscribes to a stream to receive its events. * @param streamId The unique identifier for the stream. * @param options The subscriber callbacks. * @returns A promise resolving to an object containing an unsubscribe function. */ subscribe<S, O>( streamId: string, options: ActionStreamSubscriber<S, O> ): Promise<{ unsubscribe: () => void }>; } type StreamState<S, O> = | { status: 'open'; chunks: S[]; subscribers: ActionStreamSubscriber<S, O>[]; lastTouched: number; } | { status: 'done'; chunks: S[]; output: O; lastTouched: number } | { status: 'error'; chunks: S[]; error: any; lastTouched: number }; /** * An in-memory implementation of StreamManager. * Useful for testing or single-instance deployments where persistence is not required. */ export class InMemoryStreamManager implements StreamManager { private streams: Map<string, StreamState<any, any>> = new Map(); /** * @param options Configuration options. * @param options.ttlSeconds Time-to-live for streams in seconds. Defaults to 5 minutes. */ constructor(private options: { ttlSeconds?: number } = {}) {} private _cleanup() { const ttl = (this.options.ttlSeconds ?? 5 * 60) * 1000; const now = Date.now(); for (const [streamId, stream] of this.streams.entries()) { if (stream.status !== 'open' && now - stream.lastTouched > ttl) { this.streams.delete(streamId); } } } async open<S, O>(streamId: string): Promise<ActionStreamInput<S, O>> { this._cleanup(); if (this.streams.has(streamId)) { throw new Error(`Stream with id ${streamId} already exists.`); } this.streams.set(streamId, { status: 'open', chunks: [], subscribers: [], lastTouched: Date.now(), }); return { write: async (chunk: S) => { const stream = this.streams.get(streamId); if (stream?.status === 'open') { stream.chunks.push(chunk); stream.subscribers.forEach((s) => s.onChunk(chunk)); stream.lastTouched = Date.now(); } }, done: async (output: O) => { const stream = this.streams.get(streamId); if (stream?.status === 'open') { this.streams.set(streamId, { status: 'done', chunks: stream.chunks, output, lastTouched: Date.now(), }); stream.subscribers.forEach((s) => s.onDone(output)); } }, error: async (err: any) => { const stream = this.streams.get(streamId); if (stream?.status === 'open') { stream.subscribers.forEach((s) => s.onError(err)); this.streams.set(streamId, { status: 'error', chunks: stream.chunks, error: err, lastTouched: Date.now(), }); } }, }; } async subscribe<S, O>( streamId: string, subscriber: ActionStreamSubscriber<S, O> ): Promise<{ unsubscribe: () => void }> { const stream = this.streams.get(streamId); if (!stream) { throw new StreamNotFoundError(`Stream with id ${streamId} not found.`); } if (stream.status === 'done') { for (const chunk of stream.chunks) { subscriber.onChunk(chunk); } subscriber.onDone(stream.output); } else if (stream.status === 'error') { for (const chunk of stream.chunks) { subscriber.onChunk(chunk); } subscriber.onError(stream.error); } else { stream.chunks.forEach((chunk) => subscriber.onChunk(chunk)); stream.subscribers.push(subscriber); } return { unsubscribe: () => { const currentStream = this.streams.get(streamId); if (currentStream?.status === 'open') { const index = currentStream.subscribers.indexOf(subscriber); if (index > -1) { currentStream.subscribers.splice(index, 1); } } }, }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/firebase/genkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server