import { createClient, RedisClientType } from 'redis';
import {
ProjectState,
ProjectStateSchema,
CheckpointHistory,
CheckpointHistorySchema,
SessionLock,
SessionLockSchema,
createInitialState,
} from '../types/schema.js';
import { logger } from '../utils/logger.js';
import { getCurrentTimestamp } from '../utils/time.js';
import { validateProjectState } from '../utils/validation.js';
const MAX_RETRIES = 3;
const RETRY_BASE_DELAY_MS = 100;
const MAX_CHECKPOINT_HISTORY = 5;
const SESSION_LOCK_TTL_SECONDS = 300; // 5 minutes
/**
* Redis client with optimistic locking for atomic state updates
*/
export class RedisClient {
private client: RedisClientType;
private connected: boolean = false;
constructor(redisUrl: string) {
this.client = createClient({ url: redisUrl }) as RedisClientType;
// Error handling
this.client.on('error', (err) => {
logger.error('Redis client error', { error: err });
});
this.client.on('connect', () => {
logger.info('Redis client connected');
});
}
/**
* Connects to Redis
*/
async connect(): Promise<void> {
if (this.connected) {
return;
}
try {
await this.client.connect();
this.connected = true;
logger.info('Redis connection established');
} catch (error) {
logger.error('Failed to connect to Redis', { error });
throw new Error(`Redis connection failed: ${error}`);
}
}
/**
* Disconnects from Redis
*/
async disconnect(): Promise<void> {
if (!this.connected) {
return;
}
try {
await this.client.quit();
this.connected = false;
logger.info('Redis connection closed');
} catch (error) {
logger.error('Error disconnecting from Redis', { error });
}
}
/**
* Generates the Redis key for project state
*/
private getStateKey(sessionId: string): string {
return `project:state:${sessionId}`;
}
/**
* Generates the Redis key for checkpoint history
*/
private getHistoryKey(sessionId: string): string {
return `project:history:${sessionId}`;
}
/**
* Generates the Redis key for session lock
*/
private getLockKey(sessionId: string): string {
return `project:lock:${sessionId}`;
}
/**
* Retrieves the current project state
*/
async getState(sessionId: string): Promise<ProjectState> {
try {
const key = this.getStateKey(sessionId);
const data = await this.client.json.get(key);
if (!data) {
logger.info('No existing state found, creating initial state', { sessionId });
return createInitialState(sessionId);
}
// Validate with centralized validator
return validateProjectState(data);
} catch (error) {
logger.error('Failed to get state from Redis', { error, sessionId });
throw new Error(`Failed to retrieve state: ${error}`);
}
}
/**
* Updates state with optimistic locking using WATCH/MULTI/EXEC
*/
async updateStateWithLock(
sessionId: string,
updater: (state: ProjectState) => Promise<ProjectState>
): Promise<ProjectState> {
const key = this.getStateKey(sessionId);
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
try {
// Start watching the key
await this.client.watch(key);
// Get current state
const currentState = await this.getState(sessionId);
// Apply updater function
const updatedState = await updater(currentState);
// Increment version for optimistic locking
updatedState.meta.version = currentState.meta.version + 1;
updatedState.meta.last_access = getCurrentTimestamp();
// Validate updated state
const validated = ProjectStateSchema.parse(updatedState);
// Execute transaction
const multi = this.client.multi();
multi.json.set(key, '$', validated as any);
const result = await multi.exec();
// Check if transaction succeeded
if (result === null) {
// Transaction failed due to concurrent modification
const delay = RETRY_BASE_DELAY_MS * Math.pow(2, attempt);
logger.warn('Concurrent modification detected, retrying', {
attempt: attempt + 1,
delay,
});
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
logger.info('State updated successfully', {
sessionId,
version: validated.meta.version,
});
return validated;
} catch (error) {
await this.client.unwatch();
logger.error('Failed to update state', { error, attempt, sessionId });
if (attempt === MAX_RETRIES - 1) {
throw new Error(`Failed to update state after ${MAX_RETRIES} attempts: ${error}`);
}
}
}
throw new Error('Unreachable code');
}
/**
* Saves a checkpoint to history (async, doesn't block)
*/
async saveCheckpointHistory(
sessionId: string,
checkpoint: CheckpointHistory
): Promise<void> {
try {
const key = this.getHistoryKey(sessionId);
// Validate checkpoint
const validated = CheckpointHistorySchema.parse(checkpoint);
// Add to list (LPUSH adds to front)
await this.client.lPush(key, JSON.stringify(validated));
// Trim to keep only last MAX_CHECKPOINT_HISTORY entries
await this.client.lTrim(key, 0, MAX_CHECKPOINT_HISTORY - 1);
logger.info('Checkpoint saved to history', {
sessionId,
version: validated.version,
});
} catch (error) {
logger.error('Failed to save checkpoint history', { error, sessionId });
// Don't throw - this is non-critical
}
}
/**
* Retrieves checkpoint history
*/
async getCheckpointHistory(sessionId: string): Promise<CheckpointHistory[]> {
try {
const key = this.getHistoryKey(sessionId);
const items = await this.client.lRange(key, 0, -1);
const checkpoints: CheckpointHistory[] = [];
for (const item of items) {
try {
const parsed = CheckpointHistorySchema.parse(JSON.parse(item));
checkpoints.push(parsed);
} catch (error) {
logger.warn('Invalid checkpoint in history, skipping', { error });
}
}
return checkpoints;
} catch (error) {
logger.error('Failed to get checkpoint history', { error, sessionId });
return [];
}
}
/**
* Acquires a session lock to detect concurrent sessions
*/
async acquireSessionLock(sessionId: string): Promise<boolean> {
try {
const key = this.getLockKey(sessionId);
const existing = await this.client.get(key);
if (existing) {
const lock = SessionLockSchema.parse(JSON.parse(existing));
logger.warn('Session already locked by another process', { lock });
return false;
}
const lock: SessionLock = {
session_id: sessionId,
locked_at: getCurrentTimestamp(),
ttl_seconds: SESSION_LOCK_TTL_SECONDS,
};
await this.client.setEx(key, SESSION_LOCK_TTL_SECONDS, JSON.stringify(lock));
logger.info('Session lock acquired', { sessionId });
return true;
} catch (error) {
logger.error('Failed to acquire session lock', { error, sessionId });
return false;
}
}
/**
* Releases the session lock
*/
async releaseSessionLock(sessionId: string): Promise<void> {
try {
const key = this.getLockKey(sessionId);
await this.client.del(key);
logger.info('Session lock released', { sessionId });
} catch (error) {
logger.error('Failed to release session lock', { error, sessionId });
}
}
/**
* Refreshes the session lock TTL
*/
async refreshSessionLock(sessionId: string): Promise<void> {
try {
const key = this.getLockKey(sessionId);
await this.client.expire(key, SESSION_LOCK_TTL_SECONDS);
logger.debug('Session lock refreshed', { sessionId });
} catch (error) {
logger.error('Failed to refresh session lock', { error, sessionId });
}
}
}