Skip to main content
Glama
session-manager.ts15.8 kB
/** * Session manager for MCP server state persistence * Ported from Python Phase 1C session_manager.py * Phase 1D: Added observability instrumentation */ import type { Session } from "../types/session.js"; import { SessionSchema, createSession, updateSessionActivity } from "../types/session.js"; import { RedisClient } from "./redis-client.js"; import { D1Client } from "./d1-client.js"; import { Logger } from "../observability/logger.js"; import { parseJsonSafe, safeD1Call, safeRedisCall } from "./storage-utils.js"; import { sessionCreateCounter, sessionCreateDuration, sessionGetCounter, sessionGetDuration, sessionUpdateCounter, sessionEndCounter, cacheHitCounter, cacheMissCounter, activeSessionsGauge, d1WriteCounter, d1ReadCounter, } from "../observability/metrics.js"; import { trace } from "../observability/tracing.js"; export class SessionManager { constructor( private redis: RedisClient, private d1: D1Client, private logger: Logger = new Logger() ) {} /** * Create a new session with observability instrumentation */ @trace("session.create") async create(userId: string, adapterType: string): Promise<Session> { return this.logger.timer("session.create", { userId, adapterType }, async () => { const session = createSession(userId, adapterType); this.logger.info("Creating new session", { sessionId: session.sessionId, userId, adapterType, }); try { // Write to Redis (blocking, fast) const redisKey = `session:${session.sessionId}`; const redisStart = Date.now(); const redisSuccess = await safeRedisCall(this.logger, { operation: "session.create.redis_set", context: { sessionId: session.sessionId }, fn: () => this.redis.set(redisKey, session, { ex: 86400 }), fallbackValue: false, }); const redisDuration = Date.now() - redisStart; if (!redisSuccess) { sessionCreateCounter.increment({ result: "redis_error" }); sessionCreateDuration.observe(redisDuration, { result: "redis_error" }); } else { activeSessionsGauge.increment(); } // Write to D1 (async, non-blocking) this.writeSessionToD1(session).catch(() => { /* Errors logged inside helper */ }); sessionCreateCounter.increment({ result: "success" }); sessionCreateDuration.observe(redisDuration, { result: "success" }); this.logger.info("Session created successfully", { sessionId: session.sessionId, duration: redisDuration, }); return session; } catch (error) { sessionCreateCounter.increment({ result: "error" }); this.logger.error( "Failed to create session", { sessionId: session.sessionId }, error as Error ); throw error; } }); } /** * Helper method to write session to D1 with observability */ private async writeSessionToD1(session: Session): Promise<void> { await safeD1Call(this.logger, { operation: "session.create.d1_insert", context: { sessionId: session.sessionId }, rethrow: false, fn: () => this.d1.execute( `INSERT INTO sessions ( session_id, user_id, adapter_type, created_at, last_active, version, domain_state, total_messages, total_cost_usd, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, session.sessionId, session.userId, session.adapterType, session.createdAt, session.lastActive, session.version, JSON.stringify(session.domainState), session.metadata.totalMessages, session.metadata.totalCostUsd, JSON.stringify(session.metadata) ), onSuccess: () => d1WriteCounter.increment({ table: "sessions", result: "success" }), onError: () => d1WriteCounter.increment({ table: "sessions", result: "error" }), }); } /** * Get a session by ID with observability instrumentation */ @trace("session.get") async get(sessionId: string): Promise<Session | null> { return this.logger.timer("session.get", { sessionId }, async () => { const redisKey = `session:${sessionId}`; this.logger.debug("Retrieving session", { sessionId }); try { // Try Redis first (fast path) const startTime = Date.now(); let cached: Session | null = null; let redisDuration = 0; cached = await safeRedisCall(this.logger, { operation: "session.get.redis_get", context: { sessionId }, fn: () => this.redis.get<Session>(redisKey), fallbackValue: null, }); redisDuration = Date.now() - startTime; if (cached) { try { // Validate the cached data const session = SessionSchema.parse(cached); cacheHitCounter.increment({ operation: "session.get" }); sessionGetCounter.increment({ result: "cache_hit" }); sessionGetDuration.observe(redisDuration, { result: "cache_hit" }); this.logger.debug("Session retrieved from cache", { sessionId, duration: redisDuration, }); return session; } catch (error) { this.logger.warn("Invalid cached session data, falling back to D1", { sessionId, error: (error as Error).message, }); } } // Cache miss - fallback to D1 (slow path) cacheMissCounter.increment({ operation: "session.get" }); const d1StartTime = Date.now(); const row = await safeD1Call(this.logger, { operation: "session.get.d1_query", context: { sessionId }, rethrow: false, fn: () => this.d1.queryOne<{ session_id: string; user_id: string; adapter_type: string; created_at: string; last_active: string; version: number; domain_state: string; total_messages: number; total_cost_usd: number; metadata: string; }>("SELECT * FROM sessions WHERE session_id = ?", sessionId), fallbackValue: null, onSuccess: (result) => d1ReadCounter.increment({ table: "sessions", result: result ? "success" : "not_found", }), onError: () => d1ReadCounter.increment({ table: "sessions", result: "error" }), }); const d1Duration = Date.now() - d1StartTime; if (!row) { sessionGetCounter.increment({ result: "not_found" }); sessionGetDuration.observe(d1Duration + redisDuration, { result: "not_found" }); this.logger.info("Session not found", { sessionId, totalDuration: d1Duration + redisDuration, }); return null; } // Reconstruct session from database row const domainState = parseJsonSafe<Record<string, unknown>>( row.domain_state, "session domain_state", this.logger, { sessionId } ) ?? {}; const metadata = parseJsonSafe<Session["metadata"]>( row.metadata, "session metadata", this.logger, { sessionId, } ) ?? { totalMessages: row.total_messages, totalCostUsd: row.total_cost_usd, activeTools: [], lastActivity: row.last_active, }; const session: Session = { sessionId: row.session_id, userId: row.user_id, adapterType: row.adapter_type, createdAt: row.created_at, lastActive: row.last_active, version: row.version, domainState, metadata: { totalMessages: metadata.totalMessages ?? row.total_messages, totalCostUsd: metadata.totalCostUsd ?? row.total_cost_usd, activeTools: metadata.activeTools ?? [], lastActivity: metadata.lastActivity ?? row.last_active, }, }; // Re-cache in Redis await safeRedisCall(this.logger, { operation: "session.get.redis_set_cache", context: { sessionId }, fn: () => this.redis.set(redisKey, session, { ex: 86400 }), fallbackValue: false, }); sessionGetCounter.increment({ result: "cache_miss" }); sessionGetDuration.observe(d1Duration + redisDuration, { result: "cache_miss" }); this.logger.info("Session retrieved from database and cached", { sessionId, redisDuration, d1Duration, totalDuration: d1Duration + redisDuration, }); return session; } catch (error) { sessionGetCounter.increment({ result: "error" }); this.logger.error("Failed to retrieve session", { sessionId }, error as Error); throw error; } }); } /** * Update a session with optimistic locking and observability instrumentation */ @trace("session.update") async update( sessionId: string, updates: Partial<Session>, expectedVersion: number ): Promise<boolean> { return this.logger.timer("session.update", { sessionId, expectedVersion }, async () => { const redisKey = `session:${sessionId}`; this.logger.debug("Updating session", { sessionId, expectedVersion }); try { // Get current session const currentSession = await this.get(sessionId); if (!currentSession) { sessionUpdateCounter.increment({ result: "not_found" }); this.logger.info("Session not found for update", { sessionId }); return false; } // Check version (optimistic locking) if (currentSession.version !== expectedVersion) { sessionUpdateCounter.increment({ result: "version_conflict" }); this.logger.warn("Version conflict in session update", { sessionId, expectedVersion, actualVersion: currentSession.version, }); return false; // Conflict detected } // Apply updates const updatedSession: Session = { ...currentSession, ...updates, version: currentSession.version + 1, lastActive: new Date().toISOString(), }; // Update Redis (blocking) const redisStartTime = Date.now(); const success = await safeRedisCall(this.logger, { operation: "session.update.redis_set", context: { sessionId }, fn: () => this.redis.set(redisKey, updatedSession, { ex: 86400 }), fallbackValue: false, }); const redisDuration = Date.now() - redisStartTime; if (!success) { sessionUpdateCounter.increment({ result: "redis_error" }); this.logger.error("Failed to update session in Redis", { sessionId }); return false; } // Update D1 (async) Promise.resolve( safeD1Call(this.logger, { operation: "session.update.d1", context: { sessionId }, rethrow: false, fn: () => this.d1.execute( `UPDATE sessions SET last_active = ?, version = ?, domain_state = ?, total_messages = ?, total_cost_usd = ?, metadata = ? WHERE session_id = ? AND version = ?`, updatedSession.lastActive, updatedSession.version, JSON.stringify(updatedSession.domainState), updatedSession.metadata.totalMessages, updatedSession.metadata.totalCostUsd, JSON.stringify(updatedSession.metadata), sessionId, expectedVersion ), onSuccess: () => d1WriteCounter.increment({ table: "sessions", result: "success" }), onError: () => d1WriteCounter.increment({ table: "sessions", result: "error" }), }) ).catch(() => { /* Errors already logged */ }); sessionUpdateCounter.increment({ result: "success" }); this.logger.info("Session updated successfully", { sessionId, newVersion: updatedSession.version, redisDuration, }); return true; } catch (error) { sessionUpdateCounter.increment({ result: "error" }); this.logger.error( "Failed to update session", { sessionId, expectedVersion }, error as Error ); throw error; } }); } /** * End a session (mark as inactive) with observability instrumentation */ @trace("session.end") async end(sessionId: string): Promise<boolean> { return this.logger.timer("session.end", { sessionId }, async () => { const redisKey = `session:${sessionId}`; this.logger.debug("Ending session", { sessionId }); try { // Get current session const session = await this.get(sessionId); if (!session) { sessionEndCounter.increment({ result: "not_found" }); this.logger.info("Session not found for ending", { sessionId }); return false; } // Update with ended status const endedSession = updateSessionActivity(session); // Update Redis const redisStartTime = Date.now(); const success = await safeRedisCall(this.logger, { operation: "session.end.redis_set", context: { sessionId }, fn: () => this.redis.set(redisKey, endedSession, { ex: 3600 }), fallbackValue: false, }); const redisDuration = Date.now() - redisStartTime; if (!success) { sessionEndCounter.increment({ result: "redis_error" }); this.logger.error("Failed to end session in Redis", { sessionId }); return false; } // Update D1 const d1Result = await safeD1Call(this.logger, { operation: "session.end.d1", context: { sessionId }, rethrow: false, fn: () => this.d1.execute( "UPDATE sessions SET last_active = ?, metadata = ? WHERE session_id = ?", endedSession.lastActive, JSON.stringify(endedSession.metadata), sessionId ), onSuccess: () => d1WriteCounter.increment({ table: "sessions", result: "success" }), onError: () => d1WriteCounter.increment({ table: "sessions", result: "error" }), }); if (d1Result === undefined) { sessionEndCounter.increment({ result: "d1_error" }); return false; } activeSessionsGauge.decrement(); sessionEndCounter.increment({ result: "success" }); this.logger.info("Session ended successfully", { sessionId, redisDuration, }); return true; } catch (error) { sessionEndCounter.increment({ result: "error" }); this.logger.error("Failed to end session", { sessionId }, error as Error); throw error; } }); } /** * Check if session exists */ async exists(sessionId: string): Promise<boolean> { const redisKey = `session:${sessionId}`; return await this.redis.exists(redisKey); } /** * Get time until session expires */ async getTimeToExpiry(sessionId: string): Promise<number> { const redisKey = `session:${sessionId}`; return await this.redis.ttl(redisKey); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hummbl-dev/mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server