Skip to main content
Glama
RedisClient.ts8.13 kB
import { createClient, RedisClientType } from 'redis'; import { ProjectState, ProjectStateSchema, CheckpointHistory, CheckpointHistorySchema, SessionLock, SessionLockSchema, createInitialState, } from '../types/schema.js'; import { logger } from '../utils/logger.js'; import { getCurrentTimestamp } from '../utils/time.js'; import { validateProjectState } from '../utils/validation.js'; const MAX_RETRIES = 3; const RETRY_BASE_DELAY_MS = 100; const MAX_CHECKPOINT_HISTORY = 5; const SESSION_LOCK_TTL_SECONDS = 300; // 5 minutes /** * Redis client with optimistic locking for atomic state updates */ export class RedisClient { private client: RedisClientType; private connected: boolean = false; constructor(redisUrl: string) { this.client = createClient({ url: redisUrl }) as RedisClientType; // Error handling this.client.on('error', (err) => { logger.error('Redis client error', { error: err }); }); this.client.on('connect', () => { logger.info('Redis client connected'); }); } /** * Connects to Redis */ async connect(): Promise<void> { if (this.connected) { return; } try { await this.client.connect(); this.connected = true; logger.info('Redis connection established'); } catch (error) { logger.error('Failed to connect to Redis', { error }); throw new Error(`Redis connection failed: ${error}`); } } /** * Disconnects from Redis */ async disconnect(): Promise<void> { if (!this.connected) { return; } try { await this.client.quit(); this.connected = false; logger.info('Redis connection closed'); } catch (error) { logger.error('Error disconnecting from Redis', { error }); } } /** * Generates the Redis key for project state */ private getStateKey(sessionId: string): string { return `project:state:${sessionId}`; } /** * Generates the Redis key for checkpoint history */ private getHistoryKey(sessionId: string): string { return `project:history:${sessionId}`; } /** * Generates the Redis key for session lock */ private getLockKey(sessionId: string): string { return `project:lock:${sessionId}`; } /** * Retrieves the current project state */ async getState(sessionId: string): Promise<ProjectState> { try { const key = this.getStateKey(sessionId); const data = await this.client.json.get(key); if (!data) { logger.info('No existing state found, creating initial state', { sessionId }); return createInitialState(sessionId); } // Validate with centralized validator return validateProjectState(data); } catch (error) { logger.error('Failed to get state from Redis', { error, sessionId }); throw new Error(`Failed to retrieve state: ${error}`); } } /** * Updates state with optimistic locking using WATCH/MULTI/EXEC */ async updateStateWithLock( sessionId: string, updater: (state: ProjectState) => Promise<ProjectState> ): Promise<ProjectState> { const key = this.getStateKey(sessionId); for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { try { // Start watching the key await this.client.watch(key); // Get current state const currentState = await this.getState(sessionId); // Apply updater function const updatedState = await updater(currentState); // Increment version for optimistic locking updatedState.meta.version = currentState.meta.version + 1; updatedState.meta.last_access = getCurrentTimestamp(); // Validate updated state const validated = ProjectStateSchema.parse(updatedState); // Execute transaction const multi = this.client.multi(); multi.json.set(key, '$', validated as any); const result = await multi.exec(); // Check if transaction succeeded if (result === null) { // Transaction failed due to concurrent modification const delay = RETRY_BASE_DELAY_MS * Math.pow(2, attempt); logger.warn('Concurrent modification detected, retrying', { attempt: attempt + 1, delay, }); await new Promise((resolve) => setTimeout(resolve, delay)); continue; } logger.info('State updated successfully', { sessionId, version: validated.meta.version, }); return validated; } catch (error) { await this.client.unwatch(); logger.error('Failed to update state', { error, attempt, sessionId }); if (attempt === MAX_RETRIES - 1) { throw new Error(`Failed to update state after ${MAX_RETRIES} attempts: ${error}`); } } } throw new Error('Unreachable code'); } /** * Saves a checkpoint to history (async, doesn't block) */ async saveCheckpointHistory( sessionId: string, checkpoint: CheckpointHistory ): Promise<void> { try { const key = this.getHistoryKey(sessionId); // Validate checkpoint const validated = CheckpointHistorySchema.parse(checkpoint); // Add to list (LPUSH adds to front) await this.client.lPush(key, JSON.stringify(validated)); // Trim to keep only last MAX_CHECKPOINT_HISTORY entries await this.client.lTrim(key, 0, MAX_CHECKPOINT_HISTORY - 1); logger.info('Checkpoint saved to history', { sessionId, version: validated.version, }); } catch (error) { logger.error('Failed to save checkpoint history', { error, sessionId }); // Don't throw - this is non-critical } } /** * Retrieves checkpoint history */ async getCheckpointHistory(sessionId: string): Promise<CheckpointHistory[]> { try { const key = this.getHistoryKey(sessionId); const items = await this.client.lRange(key, 0, -1); const checkpoints: CheckpointHistory[] = []; for (const item of items) { try { const parsed = CheckpointHistorySchema.parse(JSON.parse(item)); checkpoints.push(parsed); } catch (error) { logger.warn('Invalid checkpoint in history, skipping', { error }); } } return checkpoints; } catch (error) { logger.error('Failed to get checkpoint history', { error, sessionId }); return []; } } /** * Acquires a session lock to detect concurrent sessions */ async acquireSessionLock(sessionId: string): Promise<boolean> { try { const key = this.getLockKey(sessionId); const existing = await this.client.get(key); if (existing) { const lock = SessionLockSchema.parse(JSON.parse(existing)); logger.warn('Session already locked by another process', { lock }); return false; } const lock: SessionLock = { session_id: sessionId, locked_at: getCurrentTimestamp(), ttl_seconds: SESSION_LOCK_TTL_SECONDS, }; await this.client.setEx(key, SESSION_LOCK_TTL_SECONDS, JSON.stringify(lock)); logger.info('Session lock acquired', { sessionId }); return true; } catch (error) { logger.error('Failed to acquire session lock', { error, sessionId }); return false; } } /** * Releases the session lock */ async releaseSessionLock(sessionId: string): Promise<void> { try { const key = this.getLockKey(sessionId); await this.client.del(key); logger.info('Session lock released', { sessionId }); } catch (error) { logger.error('Failed to release session lock', { error, sessionId }); } } /** * Refreshes the session lock TTL */ async refreshSessionLock(sessionId: string): Promise<void> { try { const key = this.getLockKey(sessionId); await this.client.expire(key, SESSION_LOCK_TTL_SECONDS); logger.debug('Session lock refreshed', { sessionId }); } catch (error) { logger.error('Failed to refresh session lock', { error, sessionId }); } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/coderdeep11/claude-memory-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server