Skip to main content
Glama

CodeAnalysis MCP Server

by 0xjcf
redisToolExecutionService.ts19.8 kB
/** * Redis-based Tool Execution Service for MCP SDK * * This module provides a Redis-backed implementation of the Tool Execution Service for MCP SDK tools. * It extends the standard ToolExecutionService with persistent state storage to enable: * * - Stateful tool invocations across distributed server instances * - High availability for tool state during server restarts or failures * - Long-running tool operations that span multiple invocations * - Centralized state management for clustered MCP deployments * * This implementation integrates with the standard MCP tool definition pattern while * enhancing it with persistent state management using Redis as the backend storage. * * @module redisToolExecutionService */ import { RedisSessionStore, type RedisSessionStoreOptions, } from "./redisSessionStore"; import { v4 as uuidv4 } from "uuid"; import { createMachine, createActor } from "xstate"; import { toolMachine } from "../machines/toolMachine"; import { ToolExecution, ToolExecutionOptions, ToolMachineContext, ToolSelectEvent, SetParametersEvent, ToolExecutionResponse, ExecutionStatusEvent, } from "../../types"; import { validateParameters } from "../../validators"; import { createErrorResponse } from "../../responses"; import type { Redis as IORedis } from "ioredis"; import { ToolState, ToolExecutionResult, ToolExecutionService, } from "../interfaces/toolExecutionService"; import { RedisCacheStore } from "../store/redisCacheStore"; import { Tool } from "../../tools/interfaces"; // Define our own interface to avoid direct dependency on ioredis interface RedisClientOptions { host?: string; port?: number; password?: string; db?: number; } export interface RedisToolExecutionServiceOptions { /** * Redis connection URL (e.g., redis://localhost:6379) */ redisUrl: string; /** * Key prefix for Redis keys (default: "mcp:toolexec:") */ prefix?: string; /** * Default TTL for cache entries in seconds (default: 3600 - 1 hour) */ defaultTtl?: number; /** * Enable memory caching layer (default: true) */ useMemoryCache?: boolean; /** * Tools registry */ tools: Map<string, Tool>; /** * Service ID for this execution service instance */ serviceId?: string; /** * Timeout for operations in milliseconds (default: 30000 - 30 seconds) */ operationTimeout?: number; } /** * Configuration for Redis Tool Execution Service */ export interface RedisToolExecutionServiceConfig { /** * Redis connection options */ redisUrl?: string; /** * Prefix for Redis keys to prevent collisions */ keyPrefix?: string; /** * Time-to-live for sessions in seconds */ sessionTtl?: number; /** * Operation timeout in milliseconds */ operationTimeout?: number; /** * Service ID for this execution service */ serviceId?: string; } /** * Redis-backed tool execution service for MCP SDK * * Extends the regular tool execution service with Redis persistence, * enabling stateful tool executions in distributed MCP deployments. * This implementation synchronizes state between memory and Redis, * providing both high performance and persistence. */ export class RedisToolExecutionService implements ToolExecutionService { private sessionStore: RedisSessionStore; private cacheStore: RedisCacheStore; private tools: Map<string, Tool>; private prefix: string; private defaultTtl: number; private serviceId: string; private actor: any; private operationTimeout: number; /** * Create a new Redis-backed tool execution service * * @param options Configuration options for the Redis execution service */ constructor(options: RedisToolExecutionServiceOptions) { this.prefix = options.prefix || "mcp:toolexec:"; this.defaultTtl = options.defaultTtl || 3600; // 1 hour default this.tools = options.tools; this.serviceId = options.serviceId || uuidv4(); this.operationTimeout = options.operationTimeout || 30000; // 30 seconds default // Initialize Redis session store for state persistence this.sessionStore = new RedisSessionStore({ redisUrl: options.redisUrl, prefix: `${this.prefix}state:`, defaultTtl: this.defaultTtl, }); // Initialize Redis cache store for result caching this.cacheStore = new RedisCacheStore({ redisUrl: options.redisUrl, prefix: `${this.prefix}cache:`, defaultTtl: this.defaultTtl, useMemoryCache: options.useMemoryCache, }); // Initialize the actor with a local machine - we'll hydrate the state later this.actor = createActor(toolMachine); this.actor.start(); } /** * Initializes the state from Redis or creates a new state if none exists */ public async initializeState(): Promise<void> { try { // Get or initialize the state in Redis const persistedState = await this.sessionStore.createSessionIfNotExists( this.serviceId, { state: { value: "idle" }, context: { sessionId: this.serviceId, toolName: null, parameters: null, result: null, error: null, selectedTool: null, history: [], }, } ); // Stop the existing actor this.actor.stop(); // Create a new actor with the persisted state const machine = createMachine({ ...toolMachine.config, context: { ...toolMachine.config.context, ...persistedState.context, sessionId: this.serviceId, }, }); // Create a new actor with the initial state this.actor = createActor(machine); this.actor.start(); // Set up state change handler to persist state changes this.actor.subscribe((state: any) => { this.persistState(state); }); } catch (error) { console.error("Failed to initialize state:", error); throw new Error("Failed to initialize state"); } } /** * Gets the current service ID */ public getServiceId(): string { return this.serviceId; } /** * Persists the current state to Redis * @param state Current machine state */ private async persistState(state: any): Promise<void> { try { await this.sessionStore.setSession(this.serviceId, { state: { value: state.value }, context: state.context, }); } catch (error) { console.error("Failed to persist state:", error); } } /** * Acquires a lock for the current service * @returns Lock token if acquired * @throws Error if lock cannot be acquired */ private async acquireLock(): Promise<string> { const lockToken = await this.sessionStore.acquireLock(this.serviceId); if (!lockToken) { throw new Error("Could not acquire lock for service"); } return lockToken; } /** * Releases a lock for the current service * @param lockToken The lock token to release */ private async releaseLock(lockToken: string): Promise<void> { await this.sessionStore.releaseLock(this.serviceId, lockToken); } /** * Extends the TTL for the current session */ private async extendTtl(): Promise<void> { await this.sessionStore.extendSessionTtl(this.serviceId, this.defaultTtl); } /** * Performs an operation with a lock * @param operation The operation to perform with the lock * @returns Result of the operation */ private async withLock<T>( operation: (lockToken: string) => Promise<T> ): Promise<T> { const lockToken = await this.acquireLock(); try { const result = await operation(lockToken); await this.extendTtl(); return result; } finally { await this.releaseLock(lockToken); } } /** * Gets the current context */ public getContext(): ToolMachineContext { return this.actor.getSnapshot().context; } /** * Selects a tool for execution * @param tool The tool to select */ public async selectTool(tool: Tool): Promise<void> { return this.withLock(async () => { const event: ToolSelectEvent = { type: "SELECT_TOOL", toolName: tool.name, }; // Update the context manually since the event might not be properly processed const snapshot = this.actor.getSnapshot(); const updatedContext = { ...snapshot.context, toolName: tool.name, selectedTool: tool, parameters: null, result: null, error: null, }; // Send event to actor this.actor.send(event); // Force immediate state persistence with updated context await this.persistState({ value: this.actor.getSnapshot().value, context: updatedContext, }); }); } /** * Sets parameters for the selected tool * @param parameters Parameters for the tool */ public async setParameters( parameters: Record<string, unknown> ): Promise<void> { return this.withLock(async () => { const event: SetParametersEvent = { type: "SET_PARAMETERS", parameters, }; // Update the context manually since the event might not be properly processed const snapshot = this.actor.getSnapshot(); const updatedContext = { ...snapshot.context, parameters, }; // Send event to actor this.actor.send(event); // Force immediate state persistence with updated context await this.persistState({ value: this.actor.getSnapshot().value, context: updatedContext, }); }); } /** * Executes the selected tool with current parameters * @param options Execution options */ public async execute( options?: ToolExecutionOptions ): Promise<ToolExecutionResponse> { return this.withLock(async () => { const context = this.getContext(); if (!context.toolName) { const errorResponse = createErrorResponse( "No tool selected", "tool-execution", { code: 400, data: { error: "NO_TOOL_SELECTED" }, } ); // Update the context manually const snapshot = this.actor.getSnapshot(); const updatedContext = { ...snapshot.context, error: new Error("No tool selected"), }; // Force immediate state persistence with updated context await this.persistState({ value: "error", context: updatedContext, }); return errorResponse; } // Validate parameters if needed - skipping schema validation in this fix // We'll assume parameters are valid for now const executePromise = new Promise<ToolExecutionResponse>( (resolve, reject) => { // Set up a listener for completion events const unsubscribe = this.actor.subscribe((state: any) => { if (state.matches("succeeded")) { unsubscribe(); resolve(state.context.result); } else if (state.matches("failed")) { unsubscribe(); // Create an error response const errorResponse = createErrorResponse( state.context.error?.message || "Unknown error", "tool-execution" ); // Ensure state is persisted with error this.persistState({ value: "failed", context: state.context, }); reject(errorResponse); } }); // Send the execute event this.actor.send({ type: "EXECUTE", }); } ); try { return await executePromise; } catch (error) { // Create standardized error response const errorResponse = createErrorResponse( error instanceof Error ? error.message : String(error), "tool-execution" ); // Update the context manually with the error const snapshot = this.actor.getSnapshot(); const updatedContext = { ...snapshot.context, error: error instanceof Error ? error : new Error(String(error)), }; // Force immediate state persistence with updated context await this.persistState({ value: "failed", context: updatedContext, }); return errorResponse; } }); } /** * Cancels the current execution */ public async cancel(): Promise<void> { return this.withLock(async () => { this.actor.send({ type: "CANCEL" }); }); } /** * Resets the service state */ public async reset(): Promise<void> { await this.sessionStore.clearSession(this.serviceId); // Stop and restart the actor this.actor.stop(); this.actor = createActor(toolMachine); this.actor.start(); // Set up state change handler to persist state changes this.actor.subscribe((state: any) => { this.persistState(state); }); } /** * Disposes of the service and its resources */ public async dispose(): Promise<void> { this.actor.stop(); await this.sessionStore.disconnect(); await this.cacheStore.disconnect(); } /** * Executes a tool with the given parameters and session ID * * @param toolId ID of the tool to execute * @param params Tool parameters * @param sessionId Session ID for state persistence * @param useCached Whether to use cached results if available * @returns Tool execution result */ public async executeTool( toolId: string, params: any, sessionId?: string, useCached: boolean = true ): Promise<ToolExecutionResult> { // Generate execution ID for tracking const executionId = uuidv4(); const start = Date.now(); try { // Find the tool implementation const tool = this.tools.get(toolId); if (!tool) { return this.createErrorResult( executionId, start, `Tool "${toolId}" not found` ); } // Session ID is required for stateful tools const actualSessionId = sessionId || `temp-session-${Date.now()}`; // Check cache for identical invocation if caching is enabled if (useCached) { const cacheKey = this.createCacheKey(toolId, params); const cachedResult = await this.cacheStore.get<ToolExecutionResult>( cacheKey, actualSessionId ); if (cachedResult) { console.log( `Cache hit for tool ${toolId} in session ${actualSessionId}` ); return { ...cachedResult, fromCache: true, executionId, executionTimeMs: 0, // No execution time for cached results }; } } // Get tool state for this session let state = await this.getToolState(actualSessionId, toolId); // Execute the tool console.log(`Executing tool ${toolId} in session ${actualSessionId}`); const result = await tool.execute(params, state); // Update tool state if it was modified if (result.state && result.state !== state) { await this.setToolState(actualSessionId, toolId, result.state); } // Create the final result object const executionTime = Date.now() - start; const finalResult: ToolExecutionResult = { toolId, executionId, sessionId: actualSessionId, params, result: result.result, error: result.error, status: result.error ? "error" : "success", executionTimeMs: executionTime, timestamp: new Date().toISOString(), fromCache: false, }; // Cache the result for future invocations if successful if (!result.error && useCached) { const cacheKey = this.createCacheKey(toolId, params); await this.cacheStore.set<ToolExecutionResult>( cacheKey, finalResult, tool.cacheTtl || this.defaultTtl, actualSessionId ); } return finalResult; } catch (error) { console.error(`Error executing tool ${toolId}:`, error); return this.createErrorResult( executionId, start, error instanceof Error ? error.message : String(error) ); } } /** * Gets the state for a specific tool in a session * * @param sessionId Session ID * @param toolId Tool ID * @returns Tool state object */ private async getToolState( sessionId: string, toolId: string ): Promise<ToolState> { const sessionState = await this.sessionStore.getSession< Record<string, ToolState> >(sessionId); if (!sessionState) { // Create initial session state const initialState: Record<string, ToolState> = {}; await this.sessionStore.setSession(sessionId, initialState); return {}; } return sessionState[toolId] || {}; } /** * Sets the state for a specific tool in a session * * @param sessionId Session ID * @param toolId Tool ID * @param state Tool state */ private async setToolState( sessionId: string, toolId: string, state: ToolState ): Promise<void> { // Get current session state const sessionState = (await this.sessionStore.getSession<Record<string, ToolState>>( sessionId )) || {}; // Update tool state sessionState[toolId] = state; // Save updated session state await this.sessionStore.setSession(sessionId, sessionState); } /** * Retrieves all available tools * * @returns Map of tools by ID */ public getTools(): Map<string, Tool<any, any>> { return this.tools as Map<string, Tool<any, any>>; } /** * Creates a cache key for a tool invocation * * @param toolId Tool ID * @param params Tool parameters * @returns Cache key */ private createCacheKey(toolId: string, params: any): string { return `${toolId}:${JSON.stringify(params)}`; } /** * Creates an error result for failed tool execution * * @param executionId Execution ID * @param startTime Start time timestamp * @param errorMessage Error message * @returns Error result object */ private createErrorResult( executionId: string, startTime: number, errorMessage: string ): ToolExecutionResult { return { executionId, status: "error", error: errorMessage, executionTimeMs: Date.now() - startTime, timestamp: new Date().toISOString(), fromCache: false, toolId: "", sessionId: "", params: {}, result: null, }; } /** * Invalidates the cache for a specific tool * * @param toolId Tool ID to invalidate * @param sessionId Optional session ID to scope invalidation */ public async invalidateToolCache( toolId: string, sessionId?: string ): Promise<void> { if (sessionId) { // Invalidate tool cache for specific session await this.cacheStore.invalidateNamespace(sessionId); } else { // Invalidate tool cache across all sessions await this.cacheStore.invalidateNamespace(toolId); } } /** * Clears all state and cached results for a session * * @param sessionId Session ID to clear */ public async clearSession(sessionId: string): Promise<void> { await this.sessionStore.clearSession(sessionId); await this.cacheStore.invalidateNamespace(sessionId); } /** * Gets cache statistics * * @returns Cache statistics */ public async getStats(): Promise<any> { return { cache: this.cacheStore.getStats(), }; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/0xjcf/MCP_CodeAnalysis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server