Skip to main content
Glama
streaming-execution.test.ts6.89 kB
import { describe, expect, it, beforeAll, afterAll } from 'vitest'; import * as grpc from '@grpc/grpc-js'; import { createSandboxService } from '../server/sandbox-service.js'; import { SandboxServiceService, ExecutionState } from '../generated/sandbox.js'; import { SandboxClient } from '../client/index.js'; import { existsSync, rmSync } from 'node:fs'; // Hardcoded ports - src uses 59xxx, dist uses 59500+ const isDistBuild = import.meta.url.includes('/dist/'); const PORT_BASE = isDistBuild ? 59500 : 59000; describe('Streaming Execution API', () => { const testCacheDir = '/tmp/prodisco-stream-test-' + Date.now(); let server: grpc.Server; let client: SandboxClient; const port = PORT_BASE + 1; beforeAll(async () => { // Start server server = new grpc.Server(); const service = createSandboxService({ cacheDir: testCacheDir }); server.addService(SandboxServiceService, service); await new Promise<void>((resolve, reject) => { server.bindAsync(`localhost:${port}`, grpc.ServerCredentials.createInsecure(), (err) => { if (err) reject(err); else { resolve(); } }); }); // Create client client = new SandboxClient({ tcpHost: 'localhost', tcpPort: port }); // Wait for server const healthy = await client.waitForHealthy(5000); if (!healthy) throw new Error('Server did not become healthy'); }); afterAll(() => { client.close(); server.forceShutdown(); if (existsSync(testCacheDir)) { rmSync(testCacheDir, { recursive: true }); } }); describe('executeStream()', () => { it('streams output from execution', async () => { const code = ` console.log("line 1"); console.log("line 2"); console.log("line 3"); `; const chunks: string[] = []; let result: any = null; for await (const chunk of client.executeStream({ code })) { if (chunk.type === 'output') { chunks.push(chunk.data as string); } else if (chunk.type === 'result') { result = chunk.data; } } // Should have received output chunks expect(chunks.length).toBeGreaterThan(0); const allOutput = chunks.join(''); expect(allOutput).toContain('line 1'); expect(allOutput).toContain('line 2'); expect(allOutput).toContain('line 3'); // Should have received final result expect(result).not.toBeNull(); expect(result.success).toBe(true); expect(result.executionId).toBeDefined(); }); it('streams error output', async () => { const code = ` console.log("normal"); console.error("error message"); console.log("after error"); `; const outputs: Array<{ type: string; data: string }> = []; let result: any = null; for await (const chunk of client.executeStream({ code })) { if (chunk.type === 'output' || chunk.type === 'error') { outputs.push({ type: chunk.type, data: chunk.data as string }); } else if (chunk.type === 'result') { result = chunk.data; } } // Should have error output const errorChunks = outputs.filter(o => o.type === 'error'); expect(errorChunks.length).toBeGreaterThan(0); expect(errorChunks.some(c => c.data.includes('[ERROR]'))).toBe(true); expect(result?.success).toBe(true); }); it('returns execution ID in all chunks', async () => { const code = 'console.log("test");'; const executionIds = new Set<string>(); for await (const chunk of client.executeStream({ code })) { executionIds.add(chunk.executionId); } // All chunks should have the same execution ID expect(executionIds.size).toBe(1); const [executionId] = [...executionIds]; expect(executionId).toMatch(/^[0-9a-f-]{36}$/); }); it('handles errors in code', async () => { const code = 'throw new Error("test error");'; let result: any = null; for await (const chunk of client.executeStream({ code })) { if (chunk.type === 'result') { result = chunk.data; } } expect(result).not.toBeNull(); expect(result.success).toBe(false); expect(result.error).toContain('test error'); expect(result.state).toBe(ExecutionState.EXECUTION_STATE_FAILED); }); it('handles cached script not found', async () => { let result: any = null; for await (const chunk of client.executeStream({ cached: 'nonexistent' })) { if (chunk.type === 'result') { result = chunk.data; } } expect(result).not.toBeNull(); expect(result.success).toBe(false); expect(result.error).toContain('not found'); }); it('includes timestamp in chunks', async () => { const code = 'console.log("test");'; const timestamps: number[] = []; for await (const chunk of client.executeStream({ code })) { timestamps.push(chunk.timestampMs); } expect(timestamps.length).toBeGreaterThan(0); timestamps.forEach(ts => { expect(ts).toBeGreaterThan(Date.now() - 60000); expect(ts).toBeLessThanOrEqual(Date.now() + 1000); }); }); }); describe('executeStreamWithAbort()', () => { it('can abort execution with AbortController', async () => { const code = ` for (let i = 0; i < 100; i++) { console.log("iteration " + i); await new Promise(r => setTimeout(r, 50)); } `; const controller = new AbortController(); const chunks: string[] = []; // Abort after 200ms setTimeout(() => controller.abort(), 200); try { for await (const chunk of client.executeStreamWithAbort({ code }, controller.signal)) { if (chunk.type === 'output') { chunks.push(chunk.data as string); } } // Should not reach here expect.fail('Should have thrown an error'); } catch (error: any) { // Accept either AbortError (from our code) or CANCELLED (from gRPC) expect( error.name === 'AbortError' || error.message?.includes('CANCELLED') || error.code === 1 // gRPC CANCELLED code ).toBe(true); } // Should have received some output before abort expect(chunks.length).toBeGreaterThan(0); expect(chunks.length).toBeLessThan(100); }); it('completes normally if not aborted', async () => { const code = 'console.log("complete");'; const controller = new AbortController(); let result: any = null; for await (const chunk of client.executeStreamWithAbort({ code }, controller.signal)) { if (chunk.type === 'result') { result = chunk.data; } } expect(result?.success).toBe(true); }); }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/harche/ProDisco'

If you have feedback or need assistance with the MCP directory API, please join our Discord server