trajectory-store.ts•19.4 kB
/**
* TrajectoryStore - Implementation of trajectory storage system for GEPA
* Provides persistent storage for execution trajectories with efficient querying and archiving
*/
import { promises as fs } from 'fs';
import { join } from 'path';
import type { ExecutionTrajectory, TrajectoryFilter } from '../types/gepa';
import {
type ITrajectoryStore,
type TrajectoryStoreConfig,
type SaveResult,
type ArchiveResult,
type IndexRebuildResult,
type TrajectoryIndexEntry,
DEFAULT_TRAJECTORY_STORE_CONFIG,
FILE_PATTERNS,
VALIDATION_CONSTRAINTS,
TrajectoryValidationError,
TrajectoryIndexError,
} from './trajectory-store-types';
/**
* Concrete implementation of trajectory storage with JSON file-based persistence
*/
export class TrajectoryStore implements ITrajectoryStore {
public readonly config: TrajectoryStoreConfig;
private indexCache: Map<string, TrajectoryIndexEntry> = new Map();
private indexLoaded = false;
constructor(config: Partial<TrajectoryStoreConfig> = {}) {
this.config = {
...DEFAULT_TRAJECTORY_STORE_CONFIG,
...config,
};
}
/**
* Save a trajectory to persistent storage
*/
async save(trajectory: ExecutionTrajectory): Promise<SaveResult> {
try {
// Validate trajectory data
this.validateTrajectory(trajectory);
// Check if trajectory already exists
const exists = await this.trajectoryExists(trajectory.id);
if (exists) {
return {
success: false,
error: `Trajectory with ID '${trajectory.id}' already exists`,
};
}
// Ensure directories exist
await this.ensureDirectoriesExist();
// Serialize trajectory (handle Maps and other complex types)
const serializedTrajectory = this.serializeTrajectory(trajectory);
const content = JSON.stringify(serializedTrajectory, null, 2);
// Check file size constraint
const sizeInBytes = Buffer.byteLength(content, 'utf-8');
if (sizeInBytes > this.config.maxFileSize) {
return {
success: false,
error: `Trajectory exceeds maximum file size of ${this.config.maxFileSize} bytes (${sizeInBytes} bytes)`,
};
}
// Generate file path
const filePath = this.getTrajectoryFilePath(trajectory.id);
// Write trajectory file
await fs.writeFile(filePath, content, 'utf-8');
// Update index if enabled
if (this.config.indexingEnabled) {
await this.updateIndex(trajectory, filePath, sizeInBytes);
}
return {
success: true,
id: trajectory.id,
filePath,
};
} catch (error) {
// eslint-disable-next-line no-console
console.error('Failed to save trajectory:', error);
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error occurred',
};
}
}
/**
* Load a trajectory by ID
*/
async load(id: string): Promise<ExecutionTrajectory | null> {
try {
const filePath = this.getTrajectoryFilePath(id);
// Check if file exists
try {
await fs.access(filePath);
} catch {
return null; // File doesn't exist
}
// Read and parse trajectory file
const content = await fs.readFile(filePath, 'utf-8');
const trajectory = JSON.parse(content);
// Deserialize trajectory (restore Maps and Dates)
return this.deserializeTrajectory(trajectory);
} catch (error) {
if (error instanceof SyntaxError) {
// eslint-disable-next-line no-console
console.error('Failed to parse trajectory JSON:', error);
} else {
// eslint-disable-next-line no-console
console.error('Failed to load trajectory:', error);
}
return null;
}
}
/**
* Query trajectories with optional filtering
*/
async query(filter: TrajectoryFilter = {}): Promise<ExecutionTrajectory[]> {
try {
if (this.config.indexingEnabled) {
await this.ensureIndexLoaded();
// Get matching index entries
const matchingEntries = this.filterIndexEntries(filter);
// Load full trajectories for matching entries
const trajectories: ExecutionTrajectory[] = [];
for (const entry of matchingEntries) {
const trajectory = await this.load(entry.id);
if (trajectory) {
trajectories.push(trajectory);
}
}
// Sort by timestamp (most recent first)
trajectories.sort((a, b) => new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime());
// Apply pagination
const { offset = 0, limit } = filter;
let result = trajectories.slice(offset);
if (limit !== undefined) {
result = result.slice(0, limit);
}
return result;
} else {
// Fallback to file system scan when indexing is disabled
return await this.queryFromFileSystem(filter);
}
} catch (error) {
// eslint-disable-next-line no-console
console.error('Failed to query trajectories:', error);
return [];
}
}
/**
* Archive old trajectories based on configuration
*/
async archiveOld(): Promise<ArchiveResult> {
try {
await this.ensureIndexLoaded();
await this.ensureArchiveDirectoryExists();
const cutoffDate = new Date(Date.now() - this.config.archiveAfterDays * 24 * 60 * 60 * 1000);
const archivedIds: string[] = [];
let successCount = 0;
for (const [id, entry] of this.indexCache) {
if (entry.archived) continue; // Already archived
if (entry.timestamp < cutoffDate) {
try {
await this.archiveTrajectory(id, entry);
archivedIds.push(id);
successCount++;
} catch (error) {
// eslint-disable-next-line no-console
console.error(`Failed to archive trajectory ${id}:`, error);
// Continue with other trajectories
}
}
}
// Update index to reflect archived trajectories
await this.saveIndex();
return {
success: true,
archivedCount: successCount,
archivedIds,
};
} catch (error) {
// eslint-disable-next-line no-console
console.error('Failed to archive old trajectories:', error);
return {
success: false,
archivedCount: 0,
archivedIds: [],
error: error instanceof Error ? error.message : 'Unknown error occurred',
};
}
}
/**
* Rebuild the trajectory index from existing files
*/
async rebuildIndex(): Promise<IndexRebuildResult> {
try {
await this.ensureDirectoriesExist();
// Clear current index
this.indexCache.clear();
// Scan data directory for trajectory files
const files = await fs.readdir(this.config.dataDir);
const trajectoryFiles = files.filter(file => file.endsWith('.json') && !file.startsWith('.'));
let indexedCount = 0;
for (const file of trajectoryFiles) {
try {
const filePath = join(this.config.dataDir, file);
const stats = await fs.stat(filePath);
const content = await fs.readFile(filePath, 'utf-8');
const trajectory = JSON.parse(content);
// Create index entry
const indexEntry: TrajectoryIndexEntry = {
id: trajectory.id,
promptId: trajectory.promptId,
taskId: trajectory.taskId,
timestamp: new Date(trajectory.timestamp),
success: trajectory.finalResult.success,
score: trajectory.finalResult.score,
filePath,
fileSize: stats.size,
archived: false,
};
this.indexCache.set(trajectory.id, indexEntry);
indexedCount++;
} catch (error) {
// eslint-disable-next-line no-console
console.error(`Failed to index file ${file}:`, error);
// Continue with other files
}
}
// Save rebuilt index
await this.saveIndex();
this.indexLoaded = true;
return {
success: true,
indexedCount,
};
} catch (error) {
// eslint-disable-next-line no-console
console.error('Failed to rebuild index:', error);
return {
success: false,
indexedCount: 0,
error: error instanceof Error ? error.message : 'Unknown error occurred',
};
}
}
/**
* Validate trajectory data before saving
*/
private validateTrajectory(trajectory: ExecutionTrajectory): void {
if (!trajectory || typeof trajectory !== 'object') {
throw new TrajectoryValidationError('Invalid trajectory data: must be an object');
}
if (!trajectory.id || typeof trajectory.id !== 'string') {
throw new TrajectoryValidationError('Invalid trajectory data: ID is required and must be a string');
}
if (trajectory.id.length < VALIDATION_CONSTRAINTS.MIN_ID_LENGTH ||
trajectory.id.length > VALIDATION_CONSTRAINTS.MAX_ID_LENGTH) {
throw new TrajectoryValidationError(
`Trajectory ID must be between ${VALIDATION_CONSTRAINTS.MIN_ID_LENGTH} and ${VALIDATION_CONSTRAINTS.MAX_ID_LENGTH} characters`
);
}
if (!trajectory.promptId || typeof trajectory.promptId !== 'string') {
throw new TrajectoryValidationError('Invalid trajectory validation: Prompt ID is required and must be a string');
}
if (!trajectory.taskId || typeof trajectory.taskId !== 'string') {
throw new TrajectoryValidationError('Invalid trajectory validation: Task ID is required and must be a string');
}
if (!Array.isArray(trajectory.steps)) {
throw new TrajectoryValidationError('Invalid trajectory validation: Steps must be an array');
}
if (trajectory.steps.length > VALIDATION_CONSTRAINTS.MAX_STEPS) {
throw new TrajectoryValidationError(
`Trajectory cannot have more than ${VALIDATION_CONSTRAINTS.MAX_STEPS} steps`
);
}
if (!trajectory.finalResult || typeof trajectory.finalResult !== 'object') {
throw new TrajectoryValidationError('Invalid trajectory validation: Final result is required and must be an object');
}
if (typeof trajectory.finalResult.success !== 'boolean') {
throw new TrajectoryValidationError('Invalid trajectory validation: Final result success must be a boolean');
}
if (typeof trajectory.finalResult.score !== 'number') {
throw new TrajectoryValidationError('Invalid trajectory validation: Final result score must be a number');
}
}
/**
* Check if a trajectory file exists
*/
private async trajectoryExists(id: string): Promise<boolean> {
try {
const filePath = this.getTrajectoryFilePath(id);
await fs.access(filePath);
return true;
} catch {
return false;
}
}
/**
* Get file path for a trajectory
*/
private getTrajectoryFilePath(id: string): string {
const fileName = FILE_PATTERNS.TRAJECTORY.replace('{id}', id);
return join(this.config.dataDir, fileName);
}
/**
* Ensure all required directories exist
*/
private async ensureDirectoriesExist(): Promise<void> {
await fs.mkdir(this.config.dataDir, { recursive: true });
if (this.config.indexingEnabled) {
const indexDir = join(this.config.dataDir, FILE_PATTERNS.INDEX_DIR);
await fs.mkdir(indexDir, { recursive: true });
}
}
/**
* Ensure archive directory exists
*/
private async ensureArchiveDirectoryExists(): Promise<void> {
const archiveDir = join(this.config.dataDir, FILE_PATTERNS.ARCHIVE_DIR);
await fs.mkdir(archiveDir, { recursive: true });
}
/**
* Serialize trajectory for JSON storage (handle Maps and complex types)
*/
private serializeTrajectory(trajectory: ExecutionTrajectory): Record<string, unknown> {
return JSON.parse(JSON.stringify(trajectory, (_, value) => {
// Convert Maps to objects for JSON serialization
if (value instanceof Map) {
return Object.fromEntries(value);
}
return value;
}));
}
/**
* Deserialize trajectory from JSON (restore Maps and Dates)
*/
private deserializeTrajectory(data: Record<string, unknown>): ExecutionTrajectory {
// Restore Date objects
if (data.timestamp) {
data.timestamp = new Date(data.timestamp as string);
}
if (data.steps && Array.isArray(data.steps)) {
data.steps.forEach((step: Record<string, unknown>) => {
if (step.timestamp) {
step.timestamp = new Date(step.timestamp as string);
}
});
}
if (data.llmCalls && Array.isArray(data.llmCalls)) {
data.llmCalls.forEach((call: Record<string, unknown>) => {
if (call.timestamp) {
call.timestamp = new Date(call.timestamp as string);
}
});
}
// Ensure the returned object has all required ExecutionTrajectory properties
const trajectory = data as unknown as ExecutionTrajectory;
return trajectory;
}
/**
* Update index with new trajectory
*/
private async updateIndex(trajectory: ExecutionTrajectory, filePath: string, fileSize: number): Promise<void> {
await this.ensureIndexLoaded();
const indexEntry: TrajectoryIndexEntry = {
id: trajectory.id,
promptId: trajectory.promptId,
taskId: trajectory.taskId,
timestamp: trajectory.timestamp,
success: trajectory.finalResult.success,
score: trajectory.finalResult.score,
filePath,
fileSize,
archived: false,
};
this.indexCache.set(trajectory.id, indexEntry);
await this.saveIndex();
}
/**
* Ensure index is loaded
*/
private async ensureIndexLoaded(): Promise<void> {
if (this.indexLoaded || !this.config.indexingEnabled) return;
try {
const indexPath = this.getIndexFilePath();
const content = await fs.readFile(indexPath, 'utf-8');
const indexData = JSON.parse(content);
this.indexCache.clear();
for (const [id, entry] of Object.entries(indexData)) {
const indexEntry = entry as TrajectoryIndexEntry;
indexEntry.timestamp = new Date(indexEntry.timestamp);
this.indexCache.set(id, indexEntry);
}
this.indexLoaded = true;
} catch {
// Index doesn't exist or is corrupted, rebuild it
if (this.config.indexingEnabled) {
await this.rebuildIndex();
}
}
}
/**
* Save index to disk
*/
private async saveIndex(): Promise<void> {
if (!this.config.indexingEnabled) return;
try {
const indexPath = this.getIndexFilePath();
const indexData = Object.fromEntries(this.indexCache);
await fs.writeFile(indexPath, JSON.stringify(indexData, null, 2), 'utf-8');
} catch (error) {
throw new TrajectoryIndexError('Failed to save index', error);
}
}
/**
* Get index file path
*/
private getIndexFilePath(): string {
return join(this.config.dataDir, FILE_PATTERNS.INDEX_DIR, FILE_PATTERNS.INDEX);
}
/**
* Filter index entries based on query criteria
*/
private filterIndexEntries(filter: TrajectoryFilter): TrajectoryIndexEntry[] {
let entries = Array.from(this.indexCache.values());
// Filter out archived trajectories from normal queries
entries = entries.filter(entry => !entry.archived);
if (filter.promptId) {
entries = entries.filter(entry => entry.promptId === filter.promptId);
}
if (filter.taskId) {
entries = entries.filter(entry => entry.taskId === filter.taskId);
}
if (filter.successOnly) {
entries = entries.filter(entry => entry.success === true);
}
if (filter.minScore !== undefined) {
entries = entries.filter(entry => entry.score >= filter.minScore!);
}
if (filter.maxScore !== undefined) {
entries = entries.filter(entry => entry.score <= filter.maxScore!);
}
if (filter.dateRange) {
entries = entries.filter(entry =>
entry.timestamp >= filter.dateRange!.start &&
entry.timestamp <= filter.dateRange!.end
);
}
// Sort by timestamp (most recent first)
entries.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime());
return entries;
}
/**
* Query trajectories from file system when indexing is disabled
*/
private async queryFromFileSystem(filter: TrajectoryFilter = {}): Promise<ExecutionTrajectory[]> {
try {
await this.ensureDirectoriesExist();
// Scan data directory for trajectory files
const files = await fs.readdir(this.config.dataDir);
const trajectoryFiles = files.filter(file => file.endsWith('.json') && !file.startsWith('.'));
const trajectories: ExecutionTrajectory[] = [];
for (const file of trajectoryFiles) {
try {
const filePath = join(this.config.dataDir, file);
const content = await fs.readFile(filePath, 'utf-8');
const trajectory = JSON.parse(content);
const deserializedTrajectory = this.deserializeTrajectory(trajectory);
// Apply filters
if (this.matchesFilter(deserializedTrajectory, filter)) {
trajectories.push(deserializedTrajectory);
}
} catch (error) {
// Skip corrupted files
// eslint-disable-next-line no-console
console.error(`Failed to load trajectory file ${file}:`, error);
}
}
// Sort by timestamp (most recent first)
trajectories.sort((a, b) => new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime());
// Apply pagination
const { offset = 0, limit } = filter;
let result = trajectories.slice(offset);
if (limit !== undefined) {
result = result.slice(0, limit);
}
return result;
} catch (error) {
// eslint-disable-next-line no-console
console.error('Failed to query from filesystem:', error);
return [];
}
}
/**
* Check if a trajectory matches the given filter
*/
private matchesFilter(trajectory: ExecutionTrajectory, filter: TrajectoryFilter): boolean {
if (filter.promptId && trajectory.promptId !== filter.promptId) {
return false;
}
if (filter.taskId && trajectory.taskId !== filter.taskId) {
return false;
}
if (filter.successOnly && !trajectory.finalResult.success) {
return false;
}
if (filter.minScore !== undefined && trajectory.finalResult.score < filter.minScore) {
return false;
}
if (filter.maxScore !== undefined && trajectory.finalResult.score > filter.maxScore) {
return false;
}
if (filter.dateRange) {
const timestamp = new Date(trajectory.timestamp);
if (timestamp < filter.dateRange.start || timestamp > filter.dateRange.end) {
return false;
}
}
return true;
}
/**
* Archive a single trajectory
*/
private async archiveTrajectory(id: string, entry: TrajectoryIndexEntry): Promise<void> {
const sourceFile = entry.filePath;
const archiveDir = join(this.config.dataDir, FILE_PATTERNS.ARCHIVE_DIR);
const archiveFile = join(archiveDir, `${id}.json`);
// Move file to archive directory
await fs.rename(sourceFile, archiveFile);
// Update index entry
entry.archived = true;
entry.filePath = archiveFile;
this.indexCache.set(id, entry);
}
}