Skip to main content
Glama

Genkit MCP

Official
by firebase
file-trace-store.ts13.5 kB
/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { SpanData, TraceDataSchema, type TraceData, type TraceQueryFilter, } from '@genkit-ai/tools-common'; import { logger } from '@genkit-ai/tools-common/utils'; import { Mutex } from 'async-mutex'; import fs from 'fs'; import lockfile from 'lockfile'; import path from 'path'; import type { TraceQuery, TraceQueryResponse, TraceStore } from './types'; import { version as currentVersion } from './utils/version'; const MAX_TRACES = 1000; const MAX_INDEX_FILES = 10; const MAX_LIST_ATTR_LENGTH = 1000; /** * Implementation of trace store that persists traces on local disk. */ export class LocalFileTraceStore implements TraceStore { private readonly storeRoot; private readonly indexRoot; private mutexes: Record<string, Mutex> = {}; private filters: Record<string, string>; private readonly index: Index; static defaultFilters: Record<string, string> = { // Prevent prompt rendering from spamming local trace store 'genkit:metadata:subtype': 'prompt', }; constructor(options: { filters?: Record<string, string>; storeRoot: string; indexRoot: string; }) { this.storeRoot = path.resolve(options.storeRoot, `.genkit/traces`); fs.mkdirSync(this.storeRoot, { recursive: true }); this.indexRoot = path.resolve(options.indexRoot, `.genkit/traces_idx`); fs.mkdirSync(this.indexRoot, { recursive: true }); logger.debug( `[Telemetry Server] initialized local file trace store at root: ${this.storeRoot}` ); this.filters = options?.filters ?? LocalFileTraceStore.defaultFilters; this.index = new Index(this.indexRoot); } async init() { const metadata = this.index.getMetadata(); // if the metadata file doesn't exist or it was for the older version or if // there are too many index files we recreate the index. if ( !metadata || metadata.version !== currentVersion || this.index.listIndexFiles().length > MAX_INDEX_FILES ) { await this.reIndex(); } } private async reIndex() { this.index.clear(); const time = Date.now(); // Index only the last MAX_TRACES traces. const list = await this.listFromFiles({ limit: MAX_TRACES }); for (const trace of list.traces.reverse()) { const hasRootSpan = !!Object.values(trace.spans).find( (s) => !s.parentSpanId ); if (!hasRootSpan) continue; this.index.add(trace); } logger.info( `Indexed ${list.traces.length} traces in ${Date.now() - time}ms in ${this.indexRoot}` ); } async load(id: string): Promise<TraceData | undefined> { const filePath = path.resolve(this.storeRoot, `${id}`); if (!fs.existsSync(filePath)) { return undefined; } const data = fs.readFileSync(filePath, 'utf8'); const parsed = JSON.parse(data); // For backwards compatibility, new field. if (!parsed.traceId) { parsed.traceId = id; } return TraceDataSchema.parse(parsed); } getMutex(id: string): Mutex { if (!this.mutexes[id]) { this.mutexes[id] = new Mutex(); } return this.mutexes[id]; } async save(id: string, rawTrace: TraceData): Promise<void> { let trace = this.filter(rawTrace); // if everything is filtered, it's probably the root. const possibleRoot = Object.keys(trace.spans).length === 0; const mutex = this.getMutex(id); await mutex.waitForUnlock(); const release = await mutex.acquire(); try { const existing = (await this.load(id)) || trace; if (existing) { Object.keys(trace.spans).forEach( (spanId) => (existing.spans[spanId] = trace.spans[spanId]) ); // If it's one of those weird roots (internal span that we filter) we try to fix // whoever was referencing it by making them root. if (possibleRoot) { Object.keys(existing.spans).forEach((spanId) => { const span = existing.spans[spanId]; if ( possibleRoot && span.parentSpanId && !existing.spans[span.parentSpanId] ) { delete span.parentSpanId; } }); } existing.displayName = trace.displayName; existing.startTime = trace.startTime; existing.endTime = trace.endTime; trace = existing; } fs.writeFileSync( path.resolve(this.storeRoot, `${id}`), JSON.stringify(trace) ); const hasRootSpan = !!Object.values(rawTrace.spans).find( (s) => !s.parentSpanId ); if (this.index && hasRootSpan) { // re-load the full trace, there are likely spans written there previously. const fullTrace = await this.load(rawTrace.traceId); if (!fullTrace) { throw new Error( 'unable to read the trace that was just written... "this should never happen"' ); } this.index.add(fullTrace); } } finally { release(); } } async list(query?: TraceQuery): Promise<TraceQueryResponse> { const searchResult = this.index.search({ limit: query?.limit ?? 10, startFromIndex: query?.continuationToken ? Number.parseInt(query?.continuationToken) : undefined, filter: query?.filter, }); const loadedTraces = await Promise.all( searchResult.data.map((d) => this.load(d['id']).then(trucateTraceDetails)) ); return { traces: loadedTraces.filter((t) => !!t) as TraceData[], continuationToken: searchResult.pageLastIndex ? `${searchResult.pageLastIndex}` : undefined, }; } private async listFromFiles(query?: TraceQuery): Promise<TraceQueryResponse> { const files = fs.readdirSync(this.storeRoot); files.sort((a, b) => { return ( fs.statSync(path.resolve(this.storeRoot, `${b}`)).mtime.getTime() - fs.statSync(path.resolve(this.storeRoot, `${a}`)).mtime.getTime() ); }); const startFrom = query?.continuationToken ? Number.parseInt(query?.continuationToken) : 0; const stopAt = startFrom + (query?.limit || 10); const traces = files.slice(startFrom, stopAt).map((id) => { const filePath = path.resolve(this.storeRoot, `${id}`); const data = fs.readFileSync(filePath, 'utf8'); const parsed = JSON.parse(data); // For backwards compatibility, new field. if (!parsed.traceId) { parsed.traceId = id; } return TraceDataSchema.parse(parsed); }); return { traces, continuationToken: files.length > stopAt ? stopAt.toString() : undefined, }; } private filter(trace: TraceData): TraceData { // Delete any spans that match the filter criteria Object.keys(trace.spans).forEach((spanId) => { const span = trace.spans[spanId]; Object.keys(this.filters).forEach((f) => { if (span.attributes[f] === this.filters[f]) { delete trace.spans[spanId]; } }); }); // Delete the root wrapper if it's the only span left if (Object.keys(trace.spans).length === 1) { Object.keys(trace.spans).forEach((spanId) => { const span = trace.spans[spanId]; if (span.attributes['genkit:name'] === 'dev-run-action-wrapper') { delete trace.spans[spanId]; } }); } return trace; } } function trucateTraceDetails(t?: TraceData): TraceData | undefined { if (!t) return t; const { spans: originalSpans, ...restOfTrace } = t; const spans = {} as Record<string, SpanData>; for (const spanId of Object.keys(originalSpans)) { if (!originalSpans[spanId].parentSpanId) { const { attributes: originalAttributes, ...restOfSpan } = originalSpans[spanId]; spans[spanId] = { attributes: trucateLargeAttrs(originalAttributes), ...restOfSpan, } as SpanData; break; } } return { spans, ...restOfTrace }; } export function trucateLargeAttrs<T>(input: T): T { if ( input === undefined || input === null || Array.isArray(input) || typeof input !== 'object' ) { return input; } for (const key in input) { if ( typeof input[key] === 'string' && (input[key] as string).length > MAX_LIST_ATTR_LENGTH ) { input[key] = ((input[key] as string).substring(0, MAX_LIST_ATTR_LENGTH) + '...') as any; } } return input; } export interface IndexSearchResult { pageLastIndex?: number; data: Record<string, string>[]; } /** * A super basic searchable index implementation. It's not particularly efficient, * but should not be worse than reading individual trace files from disk. */ export class Index { private currentIndexFile: string; constructor(private indexRoot: string) { // TODO: do something about index getting too big. Delete/forget old stuff, etc. this.currentIndexFile = path.resolve( this.indexRoot, this.newIndexFileName() ); fs.mkdirSync(this.indexRoot, { recursive: true }); } clear() { fs.rmSync(this.indexRoot, { recursive: true, force: true }); fs.mkdirSync(this.indexRoot, { recursive: true }); fs.appendFileSync( this.metadataFileName(), JSON.stringify({ version: currentVersion }) ); } metadataFileName() { return path.resolve(this.indexRoot, 'genkit.metadata'); } getMetadata(): { version: string } | undefined { try { return JSON.parse( fs.readFileSync(this.metadataFileName(), { encoding: 'utf8' }) ); } catch (e) { return undefined; } } private newIndexFileName() { return `idx_${(Date.now() + '').padStart(17, '0')}.json`; } listIndexFiles() { return fs.readdirSync(this.indexRoot).filter((f) => f.startsWith('idx_')); } add(traceData: TraceData) { const rootSpans = Object.values(traceData.spans).filter( (s) => !s.parentSpanId ); const rootSpan = rootSpans.length > 0 ? rootSpans[0] : undefined; const indexData = { id: traceData.traceId, } as Record<string, string | number>; indexData['type'] = `${rootSpan?.attributes?.['genkit:metadata:subtype'] || rootSpan?.attributes?.['genkit:type'] || 'UNKNOWN'}`; if (rootSpan?.startTime) { indexData['start'] = rootSpan.startTime; } if (rootSpan?.displayName) { indexData['name'] = rootSpan.displayName; } if (rootSpan?.endTime) { indexData['end'] = rootSpan.endTime; } if (rootSpan?.displayName) { indexData['status'] = rootSpan.status?.code ?? 'UNKNOWN'; } Object.keys(rootSpan?.attributes ?? {}) .filter((k) => k.startsWith('genkitx:')) .forEach((k) => { indexData[k] = `${rootSpan!.attributes[k]}`; }); try { lockfile.lockSync(lockFile(this.currentIndexFile)); fs.appendFileSync( this.currentIndexFile, JSON.stringify(indexData) + '\n' ); } finally { lockfile.unlockSync(lockFile(this.currentIndexFile)); } } search(query: { limit: number; startFromIndex?: number; filter?: TraceQueryFilter; }): IndexSearchResult { const startFromIndex = query.startFromIndex ?? 0; const fullData = [] as Record<string, string | number>[]; for (const idxFile of this.listIndexFiles()) { const idxTxt = fs.readFileSync( path.resolve(this.indexRoot, idxFile), 'utf8' ); const fileData = idxTxt .split('\n') .map((l) => { try { return JSON.parse(l) as Record<string, string | number>; } catch { return undefined; } }) .filter((d) => { if (!d) return false; if (!query?.filter) return true; if ( query.filter.eq && Object.keys(query.filter.eq).find( (k) => d[k] !== query.filter!.eq![k] ) ) { return false; } if ( query.filter.neq && Object.keys(query.filter.neq).find( (k) => d[k] === query.filter!.neq![k] ) ) { return false; } return true; }) .reverse() as Record<string, string | number>[]; fullData.push(...fileData); } fullData // We must sort the data as chronological ordering is not guaranteed between // different index files. .sort((a, b) => (b!['start'] as number) - (a!['start'] as number)); const result = { data: fullData.slice(startFromIndex, startFromIndex + query.limit), } as IndexSearchResult; // if there are more results, populate stop index. if (startFromIndex + query.limit < fullData.length) { result.pageLastIndex = startFromIndex + query.limit; } return result; } } function lockFile(file: string) { return `${file}.lock`; }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/firebase/genkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server