Skip to main content
Glama
firebase
by firebase
firestore_test.ts6.17 kB
/** * Copyright 2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { afterEach, beforeEach, describe, it } from '@jest/globals'; import * as assert from 'assert'; import { deleteApp, initializeApp, type App } from 'firebase-admin/app'; import { getFirestore } from 'firebase-admin/firestore'; import { FirestoreStreamManager } from '../src/stream-manager/firestore'; describe('FirestoreStreamManager', () => { let app: App; let streamManager: FirestoreStreamManager; beforeEach(() => { process.env.FIRESTORE_EMULATOR_HOST = '127.0.0.1:8080'; app = initializeApp({ projectId: 'genkit-test', }); streamManager = new FirestoreStreamManager({ firebaseApp: app, collection: 'genkit-streams', }); }); afterEach(async () => { const db = getFirestore(app); const collections = await db.listCollections(); for (const collection of collections) { await db.recursiveDelete(collection); } await deleteApp(app); }); it('should open a stream and write chunks', async () => { const streamId = 'test-stream-1'; const stream = await streamManager.open(streamId); await stream.write({ foo: 'bar' }); await stream.write({ bar: 'baz' }); const db = getFirestore(app); const snapshot = await db.collection('genkit-streams').doc(streamId).get(); const data = snapshot.data(); assert.ok(data); assert.strictEqual(data.stream.length, 2); assert.strictEqual(data.stream[0].type, 'chunk'); assert.deepStrictEqual(data.stream[0].chunk, { foo: 'bar' }); assert.ok(data.stream[0].uuid); assert.strictEqual(data.stream[1].type, 'chunk'); assert.deepStrictEqual(data.stream[1].chunk, { bar: 'baz' }); assert.ok(data.stream[1].uuid); assert.ok(data.createdAt); assert.ok(data.updatedAt); }); it('should preserve duplicate chunks', async () => { const streamId = 'test-stream-dupes'; const stream = await streamManager.open(streamId); await stream.write({ foo: 'bar' }); await stream.write({ foo: 'bar' }); const db = getFirestore(app); const snapshot = await db.collection('genkit-streams').doc(streamId).get(); const data = snapshot.data(); assert.ok(data); assert.strictEqual(data.stream.length, 2); assert.deepStrictEqual(data.stream[0].chunk, { foo: 'bar' }); assert.deepStrictEqual(data.stream[1].chunk, { foo: 'bar' }); assert.notStrictEqual(data.stream[0].uuid, data.stream[1].uuid); }); it('should open a stream and write done', async () => { const streamId = 'test-stream-2'; const stream = await streamManager.open(streamId); await stream.done({ result: 'success' }); const db = getFirestore(app); const snapshot = await db.collection('genkit-streams').doc(streamId).get(); const data = snapshot.data(); assert.ok(data); assert.deepStrictEqual(data.stream, [ { type: 'done', output: { result: 'success' } }, ]); assert.ok(data.createdAt); assert.ok(data.updatedAt); }); it('should open a stream and write error', async () => { const streamId = 'test-stream-3'; const stream = await streamManager.open(streamId); await stream.error(new Error('test error')); const db = getFirestore(app); const snapshot = await db.collection('genkit-streams').doc(streamId).get(); const data = snapshot.data(); assert.ok(data); assert.strictEqual(data.stream.length, 1); assert.strictEqual(data.stream[0].type, 'error'); assert.strictEqual(data.stream[0].err.message, 'test error'); assert.ok(data.createdAt); assert.ok(data.updatedAt); }); it('should subscribe to a stream', (done) => { const streamId = 'test-stream-4'; const chunks: any[] = []; let finalOutput: any; const db = getFirestore(app); const streamDoc = db.collection('genkit-streams').doc(streamId); streamDoc .set({ stream: [ { type: 'chunk', chunk: { foo: 'bar' } }, { type: 'chunk', chunk: { bar: 'baz' } }, { type: 'done', output: { result: 'success' } }, ], }) .then(() => { streamManager.subscribe(streamId, { onChunk: (chunk) => { chunks.push(chunk); }, onDone: (output) => { finalOutput = output; assert.deepStrictEqual(chunks, [{ foo: 'bar' }, { bar: 'baz' }]); assert.deepStrictEqual(finalOutput, { result: 'success' }); done(); }, onError(error) { console.log(error); }, }); }); }); it('should throw an error when subscribing to a non-existent stream', async () => { await assert.rejects( streamManager.subscribe('non-existent-stream', { onChunk: () => {}, onDone: () => {}, onError: () => {}, }), (err: any) => { assert.strictEqual(err.name, 'StreamNotFoundError'); return true; } ); }); it('should time out when no chunks are received', (done) => { const streamId = 'test-stream-5'; streamManager = new FirestoreStreamManager({ firebaseApp: app, collection: 'genkit-streams', timeout: 100, }); streamManager.open(streamId).then(() => { streamManager.subscribe(streamId, { onChunk: () => { assert.fail('should not have received a chunk'); }, onDone: () => { assert.fail('should not have received done'); }, onError: (err) => { assert.strictEqual(err.status, 'DEADLINE_EXCEEDED'); done(); }, }); }); }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/firebase/genkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server