Skip to main content
Glama
by Coder-RL
redis-client.ts13.1 kB
import { createClient, RedisClientType, RedisDefaultModules, RedisFunctions, RedisModules, RedisScripts } from 'redis'; import { logger } from '@shared/logging'; export interface RedisConfig { url?: string; host?: string; port?: number; password?: string; database?: number; db?: number; // Alias for database property retryDelayOnFailover?: number; maxRetriesPerRequest?: number; connectTimeout?: number; commandTimeout?: number; } export type RedisClient = RedisClientType<RedisDefaultModules & RedisModules, RedisFunctions, RedisScripts>; class RedisConnectionManager { private client: RedisClient | null = null; private subscribers: Map<string, RedisClient> = new Map(); private isConnected = false; private isShuttingDown = false; private connectionMetrics = { connectionAttempts: 0, successfulConnections: 0, failedConnections: 0, reconnections: 0, lastConnectionTime: null as Date | null, commands: { executed: 0, failed: 0, totalLatency: 0, }, }; constructor(private config: RedisConfig) { this.initializeClient(); } private initializeClient(): void { const clientConfig = { url: this.config.url || `redis://${this.config.host || 'localhost'}:${this.config.port || 6379}`, password: this.config.password, database: this.config.database || 0, socket: { connectTimeout: this.config.connectTimeout || 5000, commandTimeout: this.config.commandTimeout || 5000, reconnectStrategy: (retries: number) => { if (retries > 10) { logger.error('Redis maximum reconnection attempts reached'); return false; } const delay = Math.min(retries * 100, 3000); logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`); return delay; }, }, }; this.client = createClient(clientConfig); // Connection event handlers this.client.on('connect', () => { this.connectionMetrics.connectionAttempts++; logger.info('Redis connecting...'); }); this.client.on('ready', () => { this.isConnected = true; this.connectionMetrics.successfulConnections++; this.connectionMetrics.lastConnectionTime = new Date(); logger.info('Redis connection ready', { attempts: this.connectionMetrics.connectionAttempts, successful: this.connectionMetrics.successfulConnections, }); }); this.client.on('error', (error) => { this.isConnected = false; this.connectionMetrics.failedConnections++; logger.error('Redis connection error', { error: error.message, attempts: this.connectionMetrics.connectionAttempts, failed: this.connectionMetrics.failedConnections, }); }); this.client.on('reconnecting', () => { this.connectionMetrics.reconnections++; logger.info('Redis reconnecting...', { reconnections: this.connectionMetrics.reconnections, }); }); this.client.on('end', () => { this.isConnected = false; logger.info('Redis connection ended'); }); } /** * Connect to Redis */ async connect(): Promise<void> { if (!this.client) { throw new Error('Redis client not initialized'); } try { await this.client.connect(); logger.info('Redis client connected successfully'); } catch (error) { logger.error('Failed to connect to Redis', { error: error instanceof Error ? error.message : 'Unknown error', }); throw error; } } /** * Test the Redis connection */ async testConnection(): Promise<boolean> { try { if (!this.client || !this.isConnected) { return false; } const start = Date.now(); const result = await this.client.ping(); const latency = Date.now() - start; logger.debug('Redis ping successful', { result, latency }); return result === 'PONG'; } catch (error) { logger.error('Redis ping failed', { error: error instanceof Error ? error.message : 'Unknown error', }); return false; } } /** * Execute Redis command with metrics tracking */ private async executeCommand<T>(operation: string, command: () => Promise<T>): Promise<T> { const start = Date.now(); try { const result = await command(); const duration = Date.now() - start; this.connectionMetrics.commands.executed++; this.connectionMetrics.commands.totalLatency += duration; logger.debug(`Redis ${operation} executed`, { duration, avgLatency: this.connectionMetrics.commands.totalLatency / this.connectionMetrics.commands.executed, }); return result; } catch (error) { const duration = Date.now() - start; this.connectionMetrics.commands.failed++; logger.error(`Redis ${operation} failed`, { duration, error: error instanceof Error ? error.message : 'Unknown error', }); throw error; } } /** * Set a key-value pair with optional expiration */ async set(key: string, value: string, expireInSeconds?: number): Promise<string | null> { if (!this.client) { throw new Error('Redis client not connected'); } return this.executeCommand('SET', async () => { if (expireInSeconds) { return await this.client!.setEx(key, expireInSeconds, value); } return await this.client!.set(key, value); }); } /** * Get a value by key */ async get(key: string): Promise<string | null> { if (!this.client) { throw new Error('Redis client not connected'); } return this.executeCommand('GET', () => this.client!.get(key)); } /** * Set JSON object with optional expiration */ async setJSON(key: string, object: any, expireInSeconds?: number): Promise<string | null> { const serialized = JSON.stringify(object); return this.set(key, serialized, expireInSeconds); } /** * Get and parse JSON object */ async getJSON<T = any>(key: string): Promise<T | null> { const value = await this.get(key); if (!value) return null; try { return JSON.parse(value) as T; } catch (error) { logger.error('Failed to parse JSON from Redis', { key, error: error instanceof Error ? error.message : 'Unknown error', }); return null; } } /** * Delete one or more keys */ async del(keys: string | string[]): Promise<number> { if (!this.client) { throw new Error('Redis client not connected'); } const keyArray = Array.isArray(keys) ? keys : [keys]; return this.executeCommand('DEL', () => this.client!.del(keyArray)); } /** * Check if key exists */ async exists(key: string): Promise<boolean> { if (!this.client) { throw new Error('Redis client not connected'); } const result = await this.executeCommand('EXISTS', () => this.client!.exists(key)); return result === 1; } /** * Set expiration for a key */ async expire(key: string, seconds: number): Promise<boolean> { if (!this.client) { throw new Error('Redis client not connected'); } const result = await this.executeCommand('EXPIRE', () => this.client!.expire(key, seconds)); return result; } /** * Get time to live for a key */ async ttl(key: string): Promise<number> { if (!this.client) { throw new Error('Redis client not connected'); } return this.executeCommand('TTL', () => this.client!.ttl(key)); } /** * Get all keys matching a pattern */ async keys(pattern: string): Promise<string[]> { if (!this.client) { throw new Error('Redis client not connected'); } return this.executeCommand('KEYS', () => this.client!.keys(pattern)); } /** * Publish a message to a channel */ async publish(channel: string, message: string): Promise<number> { if (!this.client) { throw new Error('Redis client not connected'); } return this.executeCommand('PUBLISH', () => this.client!.publish(channel, message)); } /** * Subscribe to channels */ async subscribe(channels: string[], onMessage: (channel: string, message: string) => void): Promise<string> { if (!this.client) { throw new Error('Redis client not connected'); } const subscriberId = `sub_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; const subscriber = this.client.duplicate(); try { await subscriber.connect(); this.subscribers.set(subscriberId, subscriber); await subscriber.subscribe(channels, (message, channel) => { logger.debug('Redis message received', { channel, messageLength: message.length }); onMessage(channel, message); }); logger.info('Redis subscribed to channels', { channels, subscriberId }); return subscriberId; } catch (error) { // Clean up on failure if (subscriber.isOpen) { await subscriber.disconnect(); } throw error; } } /** * Unsubscribe from channels */ async unsubscribe(subscriberId: string): Promise<void> { const subscriber = this.subscribers.get(subscriberId); if (!subscriber) { logger.warn(`Subscriber ${subscriberId} not found`); return; } try { await subscriber.unsubscribe(); await subscriber.disconnect(); this.subscribers.delete(subscriberId); logger.info(`Unsubscribed and cleaned up subscriber ${subscriberId}`); } catch (error) { logger.error(`Failed to unsubscribe ${subscriberId}`, { error: error instanceof Error ? error.message : 'Unknown error', }); } } /** * Get connection metrics */ getMetrics(): typeof this.connectionMetrics & { isConnected: boolean } { return { ...this.connectionMetrics, isConnected: this.isConnected, commands: { ...this.connectionMetrics.commands }, }; } /** * Health check for Redis connection */ async healthCheck(): Promise<{ status: 'healthy' | 'unhealthy'; metrics: any; latency?: number; }> { const start = Date.now(); try { const isConnected = await this.testConnection(); const latency = Date.now() - start; return { status: isConnected ? 'healthy' : 'unhealthy', metrics: this.getMetrics(), latency, }; } catch (error) { return { status: 'unhealthy', metrics: this.getMetrics(), }; } } /** * Add message to Redis stream */ async xadd(streamName: string, id: string, field: string, value: string): Promise<string> { if (!this.client) { throw new Error('Redis client not connected'); } return this.executeCommand('XADD', () => this.client!.xAdd(streamName, id, { [field]: value })); } /** * Read messages from Redis stream */ async xread(streams: Record<string, string>, options?: { count?: number; block?: number }): Promise<any> { if (!this.client) { throw new Error('Redis client not connected'); } return this.executeCommand('XREAD', () => this.client!.xRead( Object.entries(streams).map(([key, id]) => ({ key, id })), options || {} )); } /** * Get raw Redis client (use with caution) */ getClient(): RedisClient | null { return this.client; } /** * Close the Redis connection */ async close(): Promise<void> { if (this.isShuttingDown) return; this.isShuttingDown = true; // Close all subscribers first for (const [subscriberId, subscriber] of this.subscribers) { try { await subscriber.disconnect(); logger.debug(`Closed subscriber ${subscriberId}`); } catch (error) { logger.error(`Failed to close subscriber ${subscriberId}`, { error: error instanceof Error ? error.message : 'Unknown error', }); } } this.subscribers.clear(); // Close main client if (this.client) { await this.client.quit(); this.client = null; this.isConnected = false; logger.info('Redis client disconnected'); } } } // Create and export the Redis instance const redisConfig: RedisConfig = { url: process.env.REDIS_URL, host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), password: process.env.REDIS_PASSWORD, database: parseInt(process.env.REDIS_DATABASE || '0'), connectTimeout: parseInt(process.env.REDIS_CONNECT_TIMEOUT || '5000'), commandTimeout: parseInt(process.env.REDIS_COMMAND_TIMEOUT || '5000'), }; export const redis = new RedisConnectionManager(redisConfig); // Graceful shutdown handler process.on('SIGINT', async () => { logger.info('Received SIGINT, closing Redis connection...'); await redis.close(); }); process.on('SIGTERM', async () => { logger.info('Received SIGTERM, closing Redis connection...'); await redis.close(); }); export { RedisConnectionManager };

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Coder-RL/Claude_MCPServer_Dev1'

If you have feedback or need assistance with the MCP directory API, please join our Discord server