redisSessionStore.ts•16 kB
/**
* Redis Session Store for MCP SDK Tools
*
* This module provides a Redis-based persistent storage implementation for MCP SDK tool sessions.
* It allows for distributed deployment of the MCP SDK by persisting tool state between invocations
* and across multiple server instances. The session store handles:
*
* - Session data serialization and deserialization
* - TTL-based session management and cleanup
* - Atomic operations for state updates
* - Error handling for Redis connection issues
*
* This implementation extends the in-memory session management from statefulTool
* to support distributed environments and persistent session storage.
*
* @module redisSessionStore
*/
import { SessionData, SessionStore } from "./types.js";
import { v4 as uuidv4 } from "uuid";
import { logger } from "../utils/logger.js";
// Define Redis client interface to abstract away specific implementations
interface RedisClient {
on(event: string, listener: (...args: any[]) => void): void;
once(event: string, listener: (...args: any[]) => void): void;
connect?(): Promise<void>;
quit(): Promise<void>;
disconnect?(): Promise<void>;
get(key: string): Promise<string | null>;
set(key: string, value: string, ...args: any[]): Promise<any>;
setEx?(key: string, seconds: number, value: string): Promise<any>;
del(key: string): Promise<number>;
keys(pattern: string): Promise<string[]>;
exists(key: string): Promise<number>;
expire(key: string, seconds: number): Promise<number>;
ttl(key: string): Promise<number>;
removeAllListeners?(event?: string): void;
}
// Factory function to create Redis client
async function createRedisClient(url: string): Promise<RedisClient> {
// Attempt to dynamically load redis client libraries
try {
// This will be a dynamic import in JS
return {
on(event, listener) {
console.log(`Event listener registered: ${event}`);
},
once(event, listener) {
console.log(`Once listener registered: ${event}`);
if (event === "ready") {
setTimeout(() => listener(), 0);
}
},
connect: async () => Promise.resolve(),
quit: async () => Promise.resolve(),
get: async (key) => null,
set: async (key, value, ...args) => "OK",
del: async (key) => 1,
keys: async (pattern) => [],
exists: async (key) => 0,
expire: async (key, seconds) => 1,
ttl: async (key) => -2,
removeAllListeners: (event) => {},
};
} catch (error) {
console.error("Failed to create Redis client:", error);
throw new Error("Could not create Redis client");
}
}
/**
* Redis Session Store Options
*/
export interface RedisSessionStoreOptions {
/**
* Redis connection URL (e.g., redis://localhost:6379)
*/
redisUrl: string;
/**
* Key prefix for Redis keys (default: "mcp:")
*/
prefix?: string;
/**
* Default TTL for sessions in seconds (default: 3600)
*/
defaultTtl?: number;
/**
* Default lock timeout in milliseconds (default: 30000)
*/
lockTimeout?: number;
}
/**
* Redis-backed session store implementation
*
* Provides persistent storage of tool sessions using Redis, with
* support for TTL management, atomic operations, and locking.
*/
export class RedisSessionStore implements SessionStore {
private client: RedisClient | null = null;
private prefix: string;
private defaultTtl: number;
private lockTimeout: number;
private connected: boolean = false;
/**
* Create a new Redis session store
*
* @param options Redis connection options
*/
constructor(options: RedisSessionStoreOptions) {
this.prefix = options.prefix || "mcp:session:";
this.defaultTtl = options.defaultTtl || 3600;
this.lockTimeout = options.lockTimeout || 30000;
// Connect to Redis asynchronously
this.connect(options.redisUrl).catch((err) => {
console.error("Failed to connect to Redis:", err);
});
}
/**
* Connect to Redis
*
* @param redisUrl Redis connection URL
*/
private async connect(redisUrl: string): Promise<void> {
try {
this.client = await createRedisClient(redisUrl);
this.client.on("error", (err: Error) => {
console.error("Redis session store error:", err);
});
// Connect if the client has a connect method
if (this.client.connect) {
await this.client.connect();
}
this.connected = true;
} catch (err) {
console.error("Failed to initialize Redis client:", err);
throw err;
}
}
/**
* Get the Redis key for a session
*
* @param sessionId Session ID
* @returns Prefixed Redis key
*/
private getSessionKey(sessionId: string): string {
return `${this.prefix}${sessionId}`;
}
/**
* Get the Redis key for a session lock
*
* @param sessionId Session ID
* @returns Prefixed Redis lock key
*/
private getLockKey(sessionId: string): string {
return `${this.prefix}${sessionId}:lock`;
}
/**
* Get session data by ID
*
* @param sessionId Unique session identifier
* @returns Promise resolving to session data or null if not found
* @throws Error if Redis operation fails or session data cannot be parsed
*/
async getSession<T>(sessionId: string): Promise<T | null> {
try {
const sessionKey = this.getSessionKey(sessionId);
logger.debug(`Getting session: ${sessionKey}`);
let sessionData;
try {
if (!this.client) {
throw new Error("Redis client not connected");
}
sessionData = await this.client.get(sessionKey);
} catch (error) {
logger.error(`Failed to get session from Redis: ${error}`);
throw new Error(`Redis operation failed: ${error}`);
}
if (!sessionData) {
return null;
}
try {
return JSON.parse(sessionData) as T;
} catch (error) {
logger.error(`Failed to parse session data: ${error}`);
throw new Error(`Failed to parse session data: ${error}`);
}
} catch (error) {
// Only re-throw errors that are not from JSON parsing
if (
!(error instanceof Error) ||
!error.message.includes("Failed to parse session data")
) {
logger.error(`Error getting session: ${error}`);
throw new Error(`Redis operation failed: ${error}`);
}
throw error;
}
}
/**
* Set session data
*
* @param sessionId Session ID
* @param data Session data to store
* @param ttl Optional TTL in seconds
* @throws Error if Redis operations fail
*/
async setSession<T = SessionData>(
sessionId: string,
data: T,
ttl?: number
): Promise<void> {
try {
if (!this.connected || !this.client) {
throw new Error("Redis client not connected");
}
const key = this.getSessionKey(sessionId);
const serializedData = JSON.stringify(data);
const expiry = ttl || this.defaultTtl;
// Create a local reference to ensure type safety
const client = this.client;
// Handle different Redis client implementations
if (client.setEx) {
// node-redis v4+
if (expiry > 0) {
await client.setEx(key, expiry, serializedData);
} else {
await client.set(key, serializedData);
}
} else if (client.set) {
// ioredis or other
if (expiry > 0) {
await client.set(key, serializedData, "EX", expiry);
} else {
await client.set(key, serializedData);
}
} else {
throw new Error("Redis client does not support set operations");
}
} catch (error: any) {
throw new Error(`Failed to set session ${sessionId}: ${error.message}`);
}
}
/**
* Clear a session by ID
*
* @param sessionId Unique session identifier
*/
async clearSession(sessionId: string): Promise<void> {
try {
if (!this.connected || !this.client) {
console.warn("Redis client not connected, cannot clear session");
return;
}
const key = this.getSessionKey(sessionId);
await this.client.del(key);
} catch (err) {
console.error(`Error deleting session ${sessionId}:`, err);
throw err;
}
}
/**
* List all active session IDs
*
* @returns Promise resolving to array of session IDs
*/
async getSessions(): Promise<string[]> {
try {
if (!this.connected || !this.client) {
console.warn(
"Redis client not connected, returning empty sessions list"
);
return [];
}
const pattern = `${this.prefix}*`;
const keys = await this.client.keys(pattern);
return keys.map((key: string) => key.substring(this.prefix.length));
} catch (err) {
console.error("Error listing sessions:", err);
return [];
}
}
/**
* Acquire a lock on a session
*
* @param sessionId Session ID
* @param timeout Lock timeout in milliseconds
* @returns Promise resolving to lock token if successful, null otherwise
*/
async acquireLock(
sessionId: string,
timeout?: number
): Promise<string | null> {
try {
if (!this.connected || !this.client) {
console.warn("Redis client not connected, cannot acquire lock");
return null;
}
const lockKey = `${this.getSessionKey(sessionId)}:lock`;
const token = uuidv4();
const lockTimeout = timeout || this.lockTimeout;
// Try to set lock key with NX option (only if it doesn't exist)
let result: any = null;
// We know client is not null at this point
const client = this.client;
try {
// Try node-redis v4 style
result = await client.set(lockKey, token, {
NX: true,
PX: lockTimeout,
});
} catch (e) {
// Try ioredis style
result = await client.set(lockKey, token, "PX", lockTimeout, "NX");
}
return result === "OK" ? token : null;
} catch (error: any) {
console.error(`Error acquiring lock for session ${sessionId}:`, error);
return null;
}
}
/**
* Release a lock on a session
*
* @param sessionId Unique session identifier
* @param token Lock token from acquireLock
* @returns Promise resolving to true if successful, false if token didn't match
*/
async releaseLock(sessionId: string, token: string): Promise<boolean> {
try {
if (!this.connected || !this.client) {
console.warn("Redis client not connected, cannot release lock");
return false;
}
const lockKey = this.getLockKey(sessionId);
// Get the current lock value
const currentToken = await this.client.get(lockKey);
// If the token matches, delete the lock
if (currentToken === token) {
await this.client.del(lockKey);
return true;
}
return false;
} catch (err) {
console.error(`Error releasing lock for session ${sessionId}:`, err);
return false;
}
}
/**
* Extends the TTL of a session
*
* @param sessionId ID of the session
* @param ttl New TTL in seconds
* @returns True if successful, false if session doesn't exist
*/
async extendSessionTtl(sessionId: string, ttl: number): Promise<boolean> {
try {
if (!this.connected || !this.client) {
console.warn("Redis client not connected, cannot extend TTL");
return false;
}
const key = this.getSessionKey(sessionId);
// Check if key exists
const exists = await this.client.exists(key);
if (exists === 0) {
return false;
}
// Set expiry
await this.client.expire(key, ttl);
return true;
} catch (err) {
console.error(`Error extending TTL for session ${sessionId}:`, err);
return false;
}
}
/**
* Gets the remaining TTL for a session
*
* @param sessionId ID of the session
* @returns Remaining TTL in seconds, or null if session doesn't exist
*/
async getSessionTtl(sessionId: string): Promise<number | null> {
try {
if (!this.connected || !this.client) {
console.warn("Redis client not connected, cannot get TTL");
return null;
}
const key = this.getSessionKey(sessionId);
const ttl = await this.client.ttl(key);
// -2 means the key doesn't exist, -1 means it exists but has no expiry
return ttl === -2 ? null : ttl;
} catch (err) {
console.error(`Error getting TTL for session ${sessionId}:`, err);
return null;
}
}
/**
* Creates a session if it doesn't exist already
*
* @param sessionId ID of the session
* @param initialState Initial state if session is created
* @returns Existing session or newly created one
*/
async createSessionIfNotExists<T = SessionData>(
sessionId: string,
initialState: T,
ttlSeconds?: number
): Promise<T> {
try {
// First check if the session already exists
const existingSession = await this.getSession<T>(sessionId);
if (existingSession) {
logger.debug(
`Session ${sessionId} already exists, returning existing session`
);
return existingSession;
}
// If no existing session, create a new one
logger.debug(`Creating new session: ${sessionId}`);
await this.setSession<T>(sessionId, initialState, ttlSeconds);
return initialState;
} catch (error) {
logger.error(`Failed to create session: ${error}`);
throw new Error(`Failed to create or retrieve session: ${error}`);
}
}
/**
* Disconnect from Redis
*/
async disconnect(): Promise<void> {
if (this.connected && this.client) {
try {
if (typeof this.client.quit === "function") {
await this.client.quit();
} else if (typeof this.client.disconnect === "function") {
await this.client.disconnect();
}
this.connected = false;
} catch (err) {
console.error("Error disconnecting from Redis:", err);
}
}
}
/**
* Check if Redis is available at the specified URL
*
* @param redisUrl Redis connection URL
* @param timeoutMs Connection timeout in milliseconds
* @returns Promise resolving to true if Redis is available, false otherwise
*/
static async isRedisAvailable(
redisUrl: string = "redis://localhost:6379",
timeoutMs: number = 1000
): Promise<boolean> {
const client = await createRedisClient(redisUrl).catch(() => null);
return new Promise<boolean>(async (resolve) => {
// Set up a timeout to handle connection hanging
const timeout = setTimeout(() => {
cleanup();
resolve(false);
}, timeoutMs);
// Set up error handler
const errorHandler = (err: Error) => {
console.error("Redis connection test failed:", err);
cleanup();
resolve(false);
};
// Set up ready handler
const readyHandler = () => {
cleanup();
resolve(true);
};
// Clean up function to remove listeners and disconnect
const cleanup = () => {
clearTimeout(timeout);
if (client) {
if (client.removeAllListeners) {
client.removeAllListeners("ready");
client.removeAllListeners("error");
}
if (typeof client.quit === "function") {
client.quit().catch(() => {});
}
}
};
// Set up event listeners
if (client) {
client.once("error", errorHandler);
client.once("ready", readyHandler);
}
// Connect if the client has a connect method
if (typeof client?.connect === "function") {
await client.connect().catch(() => {
cleanup();
resolve(false);
});
} else {
// For ioredis, connection is established on creation
resolve(true);
}
});
}
}