file-trace-store.ts•13.5 kB
/**
 * Copyright 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import {
  SpanData,
  TraceDataSchema,
  type TraceData,
  type TraceQueryFilter,
} from '@genkit-ai/tools-common';
import { logger } from '@genkit-ai/tools-common/utils';
import { Mutex } from 'async-mutex';
import fs from 'fs';
import lockfile from 'lockfile';
import path from 'path';
import type { TraceQuery, TraceQueryResponse, TraceStore } from './types';
import { version as currentVersion } from './utils/version';
const MAX_TRACES = 1000;
const MAX_INDEX_FILES = 10;
const MAX_LIST_ATTR_LENGTH = 1000;
/**
 * Implementation of trace store that persists traces on local disk.
 */
export class LocalFileTraceStore implements TraceStore {
  private readonly storeRoot;
  private readonly indexRoot;
  private mutexes: Record<string, Mutex> = {};
  private filters: Record<string, string>;
  private readonly index: Index;
  static defaultFilters: Record<string, string> = {
    // Prevent prompt rendering from spamming local trace store
    'genkit:metadata:subtype': 'prompt',
  };
  constructor(options: {
    filters?: Record<string, string>;
    storeRoot: string;
    indexRoot: string;
  }) {
    this.storeRoot = path.resolve(options.storeRoot, `.genkit/traces`);
    fs.mkdirSync(this.storeRoot, { recursive: true });
    this.indexRoot = path.resolve(options.indexRoot, `.genkit/traces_idx`);
    fs.mkdirSync(this.indexRoot, { recursive: true });
    logger.debug(
      `[Telemetry Server] initialized local file trace store at root: ${this.storeRoot}`
    );
    this.filters = options?.filters ?? LocalFileTraceStore.defaultFilters;
    this.index = new Index(this.indexRoot);
  }
  async init() {
    const metadata = this.index.getMetadata();
    // if the metadata file doesn't exist or it was for the older version or if
    // there are too many index files we recreate the index.
    if (
      !metadata ||
      metadata.version !== currentVersion ||
      this.index.listIndexFiles().length > MAX_INDEX_FILES
    ) {
      await this.reIndex();
    }
  }
  private async reIndex() {
    this.index.clear();
    const time = Date.now();
    // Index only the last MAX_TRACES traces.
    const list = await this.listFromFiles({ limit: MAX_TRACES });
    for (const trace of list.traces.reverse()) {
      const hasRootSpan = !!Object.values(trace.spans).find(
        (s) => !s.parentSpanId
      );
      if (!hasRootSpan) continue;
      this.index.add(trace);
    }
    logger.info(
      `Indexed ${list.traces.length} traces in ${Date.now() - time}ms in ${this.indexRoot}`
    );
  }
  async load(id: string): Promise<TraceData | undefined> {
    const filePath = path.resolve(this.storeRoot, `${id}`);
    if (!fs.existsSync(filePath)) {
      return undefined;
    }
    const data = fs.readFileSync(filePath, 'utf8');
    const parsed = JSON.parse(data);
    // For backwards compatibility, new field.
    if (!parsed.traceId) {
      parsed.traceId = id;
    }
    return TraceDataSchema.parse(parsed);
  }
  getMutex(id: string): Mutex {
    if (!this.mutexes[id]) {
      this.mutexes[id] = new Mutex();
    }
    return this.mutexes[id];
  }
  async save(id: string, rawTrace: TraceData): Promise<void> {
    let trace = this.filter(rawTrace);
    // if everything is filtered, it's probably the root.
    const possibleRoot = Object.keys(trace.spans).length === 0;
    const mutex = this.getMutex(id);
    await mutex.waitForUnlock();
    const release = await mutex.acquire();
    try {
      const existing = (await this.load(id)) || trace;
      if (existing) {
        Object.keys(trace.spans).forEach(
          (spanId) => (existing.spans[spanId] = trace.spans[spanId])
        );
        // If it's one of those weird roots (internal span that we filter) we try to fix
        // whoever was referencing it by making them root.
        if (possibleRoot) {
          Object.keys(existing.spans).forEach((spanId) => {
            const span = existing.spans[spanId];
            if (
              possibleRoot &&
              span.parentSpanId &&
              !existing.spans[span.parentSpanId]
            ) {
              delete span.parentSpanId;
            }
          });
        }
        existing.displayName = trace.displayName;
        existing.startTime = trace.startTime;
        existing.endTime = trace.endTime;
        trace = existing;
      }
      fs.writeFileSync(
        path.resolve(this.storeRoot, `${id}`),
        JSON.stringify(trace)
      );
      const hasRootSpan = !!Object.values(rawTrace.spans).find(
        (s) => !s.parentSpanId
      );
      if (this.index && hasRootSpan) {
        // re-load the full trace, there are likely spans written there previously.
        const fullTrace = await this.load(rawTrace.traceId);
        if (!fullTrace) {
          throw new Error(
            'unable to read the trace that was just written... "this should never happen"'
          );
        }
        this.index.add(fullTrace);
      }
    } finally {
      release();
    }
  }
  async list(query?: TraceQuery): Promise<TraceQueryResponse> {
    const searchResult = this.index.search({
      limit: query?.limit ?? 10,
      startFromIndex: query?.continuationToken
        ? Number.parseInt(query?.continuationToken)
        : undefined,
      filter: query?.filter,
    });
    const loadedTraces = await Promise.all(
      searchResult.data.map((d) => this.load(d['id']).then(trucateTraceDetails))
    );
    return {
      traces: loadedTraces.filter((t) => !!t) as TraceData[],
      continuationToken: searchResult.pageLastIndex
        ? `${searchResult.pageLastIndex}`
        : undefined,
    };
  }
  private async listFromFiles(query?: TraceQuery): Promise<TraceQueryResponse> {
    const files = fs.readdirSync(this.storeRoot);
    files.sort((a, b) => {
      return (
        fs.statSync(path.resolve(this.storeRoot, `${b}`)).mtime.getTime() -
        fs.statSync(path.resolve(this.storeRoot, `${a}`)).mtime.getTime()
      );
    });
    const startFrom = query?.continuationToken
      ? Number.parseInt(query?.continuationToken)
      : 0;
    const stopAt = startFrom + (query?.limit || 10);
    const traces = files.slice(startFrom, stopAt).map((id) => {
      const filePath = path.resolve(this.storeRoot, `${id}`);
      const data = fs.readFileSync(filePath, 'utf8');
      const parsed = JSON.parse(data);
      // For backwards compatibility, new field.
      if (!parsed.traceId) {
        parsed.traceId = id;
      }
      return TraceDataSchema.parse(parsed);
    });
    return {
      traces,
      continuationToken: files.length > stopAt ? stopAt.toString() : undefined,
    };
  }
  private filter(trace: TraceData): TraceData {
    // Delete any spans that match the filter criteria
    Object.keys(trace.spans).forEach((spanId) => {
      const span = trace.spans[spanId];
      Object.keys(this.filters).forEach((f) => {
        if (span.attributes[f] === this.filters[f]) {
          delete trace.spans[spanId];
        }
      });
    });
    // Delete the root wrapper if it's the only span left
    if (Object.keys(trace.spans).length === 1) {
      Object.keys(trace.spans).forEach((spanId) => {
        const span = trace.spans[spanId];
        if (span.attributes['genkit:name'] === 'dev-run-action-wrapper') {
          delete trace.spans[spanId];
        }
      });
    }
    return trace;
  }
}
function trucateTraceDetails(t?: TraceData): TraceData | undefined {
  if (!t) return t;
  const { spans: originalSpans, ...restOfTrace } = t;
  const spans = {} as Record<string, SpanData>;
  for (const spanId of Object.keys(originalSpans)) {
    if (!originalSpans[spanId].parentSpanId) {
      const { attributes: originalAttributes, ...restOfSpan } =
        originalSpans[spanId];
      spans[spanId] = {
        attributes: trucateLargeAttrs(originalAttributes),
        ...restOfSpan,
      } as SpanData;
      break;
    }
  }
  return { spans, ...restOfTrace };
}
export function trucateLargeAttrs<T>(input: T): T {
  if (
    input === undefined ||
    input === null ||
    Array.isArray(input) ||
    typeof input !== 'object'
  ) {
    return input;
  }
  for (const key in input) {
    if (
      typeof input[key] === 'string' &&
      (input[key] as string).length > MAX_LIST_ATTR_LENGTH
    ) {
      input[key] = ((input[key] as string).substring(0, MAX_LIST_ATTR_LENGTH) +
        '...') as any;
    }
  }
  return input;
}
export interface IndexSearchResult {
  pageLastIndex?: number;
  data: Record<string, string>[];
}
/**
 * A super basic searchable index implementation. It's not particularly efficient,
 * but should not be worse than reading individual trace files from disk.
 */
export class Index {
  private currentIndexFile: string;
  constructor(private indexRoot: string) {
    // TODO: do something about index getting too big. Delete/forget old stuff, etc.
    this.currentIndexFile = path.resolve(
      this.indexRoot,
      this.newIndexFileName()
    );
    fs.mkdirSync(this.indexRoot, { recursive: true });
  }
  clear() {
    fs.rmSync(this.indexRoot, { recursive: true, force: true });
    fs.mkdirSync(this.indexRoot, { recursive: true });
    fs.appendFileSync(
      this.metadataFileName(),
      JSON.stringify({ version: currentVersion })
    );
  }
  metadataFileName() {
    return path.resolve(this.indexRoot, 'genkit.metadata');
  }
  getMetadata(): { version: string } | undefined {
    try {
      return JSON.parse(
        fs.readFileSync(this.metadataFileName(), { encoding: 'utf8' })
      );
    } catch (e) {
      return undefined;
    }
  }
  private newIndexFileName() {
    return `idx_${(Date.now() + '').padStart(17, '0')}.json`;
  }
  listIndexFiles() {
    return fs.readdirSync(this.indexRoot).filter((f) => f.startsWith('idx_'));
  }
  add(traceData: TraceData) {
    const rootSpans = Object.values(traceData.spans).filter(
      (s) => !s.parentSpanId
    );
    const rootSpan = rootSpans.length > 0 ? rootSpans[0] : undefined;
    const indexData = {
      id: traceData.traceId,
    } as Record<string, string | number>;
    indexData['type'] =
      `${rootSpan?.attributes?.['genkit:metadata:subtype'] || rootSpan?.attributes?.['genkit:type'] || 'UNKNOWN'}`;
    if (rootSpan?.startTime) {
      indexData['start'] = rootSpan.startTime;
    }
    if (rootSpan?.displayName) {
      indexData['name'] = rootSpan.displayName;
    }
    if (rootSpan?.endTime) {
      indexData['end'] = rootSpan.endTime;
    }
    if (rootSpan?.displayName) {
      indexData['status'] = rootSpan.status?.code ?? 'UNKNOWN';
    }
    Object.keys(rootSpan?.attributes ?? {})
      .filter((k) => k.startsWith('genkitx:'))
      .forEach((k) => {
        indexData[k] = `${rootSpan!.attributes[k]}`;
      });
    try {
      lockfile.lockSync(lockFile(this.currentIndexFile));
      fs.appendFileSync(
        this.currentIndexFile,
        JSON.stringify(indexData) + '\n'
      );
    } finally {
      lockfile.unlockSync(lockFile(this.currentIndexFile));
    }
  }
  search(query: {
    limit: number;
    startFromIndex?: number;
    filter?: TraceQueryFilter;
  }): IndexSearchResult {
    const startFromIndex = query.startFromIndex ?? 0;
    const fullData = [] as Record<string, string | number>[];
    for (const idxFile of this.listIndexFiles()) {
      const idxTxt = fs.readFileSync(
        path.resolve(this.indexRoot, idxFile),
        'utf8'
      );
      const fileData = idxTxt
        .split('\n')
        .map((l) => {
          try {
            return JSON.parse(l) as Record<string, string | number>;
          } catch {
            return undefined;
          }
        })
        .filter((d) => {
          if (!d) return false;
          if (!query?.filter) return true;
          if (
            query.filter.eq &&
            Object.keys(query.filter.eq).find(
              (k) => d[k] !== query.filter!.eq![k]
            )
          ) {
            return false;
          }
          if (
            query.filter.neq &&
            Object.keys(query.filter.neq).find(
              (k) => d[k] === query.filter!.neq![k]
            )
          ) {
            return false;
          }
          return true;
        })
        .reverse() as Record<string, string | number>[];
      fullData.push(...fileData);
    }
    fullData
      // We must sort the data as chronological ordering is not guaranteed between
      // different index files.
      .sort((a, b) => (b!['start'] as number) - (a!['start'] as number));
    const result = {
      data: fullData.slice(startFromIndex, startFromIndex + query.limit),
    } as IndexSearchResult;
    // if there are more results, populate stop index.
    if (startFromIndex + query.limit < fullData.length) {
      result.pageLastIndex = startFromIndex + query.limit;
    }
    return result;
  }
}
function lockFile(file: string) {
  return `${file}.lock`;
}