file_store_test.ts•14.1 kB
/**
 * Copyright 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import type { TraceData, TraceQueryFilter } from '@genkit-ai/tools-common';
import * as assert from 'assert';
import fs from 'fs';
import getPort from 'get-port';
import { afterEach, beforeEach, describe, it } from 'node:test';
import os from 'os';
import path from 'path';
import { Index } from '../src/file-trace-store';
import {
  LocalFileTraceStore,
  startTelemetryServer,
  stopTelemetryApi,
} from '../src/index';
import { sleep, span } from './utils';
const TRACE_ID = '1234';
const TRACE_ID_1 = '1234';
const TRACE_ID_2 = '2345';
const TRACE_ID_3 = '3456';
const SPAN_A = 'abc';
const SPAN_B = 'bcd';
const SPAN_C = 'cde';
describe('local-file-store', () => {
  let port: number;
  let storeRoot: string;
  let indexRoot: string;
  let url: string;
  beforeEach(async () => {
    port = await getPort();
    url = `http://localhost:${port}`;
    storeRoot = path.resolve(
      os.tmpdir(),
      `./telemetry-server-api-test-${Date.now()}/traces`
    );
    indexRoot = path.resolve(
      os.tmpdir(),
      `./telemetry-server-api-test-${Date.now()}/traces_idx`
    );
    await startTelemetryServer({
      port,
      traceStore: new LocalFileTraceStore({
        storeRoot,
        indexRoot,
      }),
    });
  });
  afterEach(async () => {
    await stopTelemetryApi();
  });
  it('writes traces', async () => {
    const traceData = {
      traceId: TRACE_ID,
      displayName: 'trace',
      spans: { [SPAN_A]: span(TRACE_ID, SPAN_A, 100, 100) },
    } as TraceData;
    const res = await fetch(`${url}/api/traces`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(traceData),
    });
    assert.strictEqual(res.status, 200);
    await assertTraceData(TRACE_ID, traceData);
  });
  it('uppends spans to the trace', async () => {
    const spanA = span(TRACE_ID, SPAN_A, 100, 100);
    const spanB = span(TRACE_ID, SPAN_B, 200, 200);
    const res1 = await fetch(`${url}/api/traces`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        traceId: TRACE_ID,
        spans: { [SPAN_A]: spanA },
      } as TraceData),
    });
    assert.strictEqual(res1.status, 200);
    const res2 = await fetch(`${url}/api/traces`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        traceId: TRACE_ID,
        spans: { [SPAN_B]: spanB },
      } as TraceData),
    });
    assert.strictEqual(res2.status, 200);
    await assertTraceData(TRACE_ID, {
      traceId: TRACE_ID,
      spans: {
        [SPAN_A]: spanA,
        [SPAN_B]: spanB,
      },
    });
  });
  it('updated final trace data', async () => {
    const spanA = span(TRACE_ID, SPAN_A, 100, 100);
    const spanB = span(TRACE_ID, SPAN_B, 200, 200);
    const res1 = await fetch(`${url}/api/traces`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        traceId: TRACE_ID,
        spans: { [SPAN_A]: spanA },
      }),
    });
    assert.strictEqual(res1.status, 200);
    await assertTraceData(TRACE_ID, {
      traceId: TRACE_ID,
      spans: {
        [SPAN_A]: spanA,
      },
    } as TraceData);
    const res2 = await fetch(`${url}/api/traces`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        traceId: TRACE_ID,
        displayName: 'final display name',
        startTime: 111,
        endTime: 999,
        spans: { [SPAN_B]: spanB },
      }),
    });
    assert.strictEqual(res2.status, 200);
    await assertTraceData(TRACE_ID, {
      traceId: TRACE_ID,
      displayName: 'final display name',
      startTime: 111,
      endTime: 999,
      spans: {
        [SPAN_A]: spanA,
        [SPAN_B]: spanB,
      },
    });
  });
  it('lists trace data', async () => {
    const wantTraces = [] as TraceData[];
    for (let i = 0; i < 3; i++) {
      const spanId = `abc_${i}`;
      const traceId = TRACE_ID + `_${i}`;
      const spanData = span(traceId, spanId, 100 + i, 100 + i);
      const trace = {
        traceId: traceId,
        displayName: 'trace',
        spans: { [spanId]: spanData },
      };
      wantTraces.push(trace);
      const res = await fetch(`${url}/api/traces`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(trace),
      });
      assert.strictEqual(res.status, 200);
      await sleep(1); // sleep a little to ensure consistent chronological ordering
    }
    const getResp = await fetch(`${url}/api/traces`);
    assert.strictEqual(getResp.status, 200);
    const tracesResponse = await getResp.json();
    assert.deepStrictEqual(tracesResponse.traces, wantTraces.reverse());
  });
  it('lists filtered trace data', async () => {
    const wantTraces = [] as TraceData[];
    for (let i = 0; i < 3; i++) {
      const spanId = `abc_${i}`;
      const traceId = TRACE_ID + `_${i}`;
      const spanData = span(traceId, spanId, 100 + i, 100 + i);
      if (i % 2 == 0) {
        spanData.attributes['genkit:type'] = 'banana';
      }
      const trace = {
        traceId: traceId,
        displayName: 'trace',
        spans: { [spanId]: spanData },
      };
      if (i % 2 != 0) {
        wantTraces.push(trace);
      }
      const res = await fetch(`${url}/api/traces`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(trace),
      });
      assert.strictEqual(res.status, 200);
      await sleep(1); // sleep a little to ensure consistent chronological ordering
    }
    const filter = { neq: { type: 'banana' } } as TraceQueryFilter;
    const getResp = await fetch(
      `${url}/api/traces?filter=${encodeURI(JSON.stringify(filter))}`
    );
    assert.strictEqual(getResp.status, 200);
    const tracesResponse = await getResp.json();
    assert.deepStrictEqual(tracesResponse.traces, wantTraces.reverse());
  });
  it('lists trace data with pagination', async () => {
    const wantTraces = [] as TraceData[];
    for (let i = 0; i < 20; i++) {
      const spanId = `abc_${i}`;
      const traceId = TRACE_ID + `_${i}`;
      const spanData = span(traceId, spanId, 100 + i, 100 + i);
      const trace = {
        traceId: traceId,
        displayName: 'trace',
        spans: { [spanId]: spanData },
      };
      wantTraces.push(trace);
      const res = await fetch(`${url}/api/traces`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(trace),
      });
      assert.strictEqual(res.status, 200);
      await sleep(1); // sleep a little to ensure consistent chronological ordering
    }
    const pageSize = 4;
    const respP1 = await fetch(`${url}/api/traces?limit=${pageSize}`);
    assert.strictEqual(respP1.status, 200);
    const tracesResponseP1 = await respP1.json();
    assert.deepStrictEqual(
      tracesResponseP1.traces,
      wantTraces.slice(wantTraces.length - pageSize).reverse()
    );
    // continue to page 2
    const respP2 = await fetch(
      `${url}/api/traces?limit=${pageSize}&continuationToken=${tracesResponseP1.continuationToken}`
    );
    assert.strictEqual(respP2.status, 200);
    const tracesResponseP2 = await respP2.json();
    assert.deepStrictEqual(
      tracesResponseP2.traces,
      wantTraces
        .slice(wantTraces.length - pageSize * 2, wantTraces.length - pageSize)
        .reverse()
    );
  });
  async function assertTraceData(traceId: string, traceData: TraceData) {
    const getResp = await fetch(`${url}/api/traces/${traceId}`);
    assert.strictEqual(getResp.status, 200);
    assert.deepStrictEqual(await getResp.json(), traceData);
  }
});
describe('index', () => {
  let indexRoot: string;
  let index: Index;
  beforeEach(async () => {
    indexRoot = path.resolve(
      os.tmpdir(),
      `./telemetry-server-api-test-${Date.now()}-${Math.floor(Math.random() * 1000)}/traces_idx`
    );
    index = new Index(indexRoot);
  });
  afterEach(() => {
    fs.rmSync(indexRoot, { recursive: true, force: true });
  });
  it('should index and search spans', () => {
    const spanA = span(TRACE_ID_1, SPAN_A, 100, 100);
    spanA.displayName = 'spanA';
    spanA.startTime = 1234;
    spanA.endTime = 2345;
    const spanB = span(TRACE_ID_2, SPAN_B, 200, 200);
    spanB.displayName = 'spanB';
    spanB.startTime = 2345;
    spanB.endTime = 3456;
    index.add({
      traceId: TRACE_ID_1,
      spans: {
        [SPAN_A]: spanA,
      },
    } as TraceData);
    index.add({
      traceId: TRACE_ID_2,
      spans: {
        [SPAN_B]: spanB,
      },
    } as TraceData);
    assert.deepStrictEqual(index.search({ limit: 5 }).data, [
      {
        id: TRACE_ID_2,
        type: 'flow',
        name: 'spanB',
        start: 2345,
        end: 3456,
        status: 0,
      },
      {
        id: TRACE_ID_1,
        type: 'flow',
        name: 'spanA',
        start: 1234,
        end: 2345,
        status: 0,
      },
    ]);
  });
  it('should apply search filters', () => {
    const spanA = span(TRACE_ID_1, SPAN_A, 100, 100);
    spanA.displayName = 'flowA';
    spanA.attributes['genkit:type'] = 'banana';
    const spanB = span(TRACE_ID_2, SPAN_B, 200, 200);
    spanB.displayName = 'flowA';
    const spanC = span(TRACE_ID_3, SPAN_C, 200, 200);
    spanC.displayName = 'flowB';
    index.add({
      traceId: TRACE_ID_1,
      spans: {
        [SPAN_A]: spanA,
      },
    } as TraceData);
    index.add({
      traceId: TRACE_ID_2,
      spans: {
        [SPAN_B]: spanB,
      },
    } as TraceData);
    index.add({
      traceId: TRACE_ID_3,
      spans: {
        [SPAN_C]: spanC,
      },
    } as TraceData);
    assert.deepStrictEqual(
      index.search({
        limit: 5,
        filter: {
          eq: { name: 'flowA' },
        },
      }).data,
      [
        {
          id: TRACE_ID_2,
          type: 'flow',
          name: 'flowA',
          start: 1,
          end: 2,
          status: 0,
        },
        {
          id: TRACE_ID_1,
          type: 'banana',
          name: 'flowA',
          start: 1,
          end: 2,
          status: 0,
        },
      ]
    );
    assert.deepStrictEqual(
      index.search({
        limit: 5,
        filter: {
          eq: { name: 'flowA', type: 'banana' },
        },
      }).data,
      [
        {
          id: TRACE_ID_1,
          type: 'banana',
          name: 'flowA',
          start: 1,
          end: 2,
          status: 0,
        },
      ]
    );
    assert.deepStrictEqual(
      index.search({
        limit: 5,
        filter: {
          eq: { name: 'flowA' },
          neq: { type: 'banana' },
        },
      }).data,
      [
        {
          id: TRACE_ID_2,
          type: 'flow',
          name: 'flowA',
          start: 1,
          end: 2,
          status: 0,
        },
      ]
    );
  });
  it('can filter out unknown types', () => {
    const spanA = span(TRACE_ID_1, SPAN_A, 100, 100);
    spanA.displayName = 'flowA';
    spanA.attributes['genkit:type'] = 'banana';
    const spanB = span(TRACE_ID_2, SPAN_B, 200, 200);
    spanB.displayName = 'flowB';
    spanB.attributes['genkit:type'] = undefined;
    const spanC = span(TRACE_ID_3, SPAN_C, 200, 200);
    spanC.displayName = 'flowC';
    spanC.attributes['genkit:type'] = undefined;
    index.add({
      traceId: TRACE_ID_1,
      spans: {
        [SPAN_A]: spanA,
      },
    } as TraceData);
    index.add({
      traceId: TRACE_ID_2,
      spans: {
        [SPAN_B]: spanB,
      },
    } as TraceData);
    index.add({
      traceId: TRACE_ID_3,
      spans: {
        [SPAN_C]: spanC,
      },
    } as TraceData);
    assert.deepStrictEqual(
      index.search({
        limit: 5,
        filter: {
          neq: { type: 'UNKNOWN' },
        },
      }).data,
      [
        {
          id: TRACE_ID_1,
          type: 'banana',
          name: 'flowA',
          start: 1,
          end: 2,
          status: 0,
        },
      ]
    );
  });
  it('should paginate search', () => {
    for (let i = 0; i < 20; i++) {
      const traceId = 'trace_' + i;
      const spanId = 'span_' + i;
      const s = span(traceId, spanId, 100 + i, 200 + i);
      s.displayName = 'spanA';
      s.startTime = 1234 + i;
      s.endTime = 2345 + i;
      index.add({
        traceId,
        spans: {
          [spanId]: s,
        },
      } as TraceData);
    }
    const result1 = index.search({ limit: 3 });
    assert.deepStrictEqual(
      result1.data.map((d) => d.id),
      ['trace_19', 'trace_18', 'trace_17']
    );
    assert.strictEqual(result1.pageLastIndex, 3);
    const result2 = index.search({
      limit: 3,
      startFromIndex: result1.pageLastIndex,
    });
    assert.deepStrictEqual(
      result2.data.map((d) => d.id),
      ['trace_16', 'trace_15', 'trace_14']
    );
    assert.strictEqual(result2.pageLastIndex, 6);
    // check edge conditions
    const result3 = index.search({ limit: 3, startFromIndex: 17 });
    assert.deepStrictEqual(
      result3.data.map((d) => d.id),
      ['trace_2', 'trace_1', 'trace_0']
    );
    assert.strictEqual(result3.pageLastIndex, undefined);
    const result4 = index.search({ limit: 10, startFromIndex: 18 });
    assert.deepStrictEqual(
      result4.data.map((d) => d.id),
      ['trace_1', 'trace_0']
    );
    assert.strictEqual(result4.pageLastIndex, undefined);
  });
});