Skip to main content
Glama
history-manager.ts12.5 kB
/** * History manager for MCP server conversation storage * Ported from Python Phase 1C history_manager.py * Phase 1D: Added observability instrumentation */ import type { Message } from "../types/message.js"; import { MessageSchema } from "../types/message.js"; import { RedisClient } from "./redis-client.js"; import { D1Client } from "./d1-client.js"; import { nanoid } from "nanoid"; import { Logger } from "../observability/logger.js"; import { d1WriteCounter, d1ReadCounter, cacheHitCounter, cacheMissCounter, } from "../observability/metrics.js"; import { trace } from "../observability/tracing.js"; import { parseJsonSafe, safeD1Call, safeRedisCall } from "./storage-utils.js"; export class HistoryManager { constructor( private redis: RedisClient, private d1: D1Client, private logger: Logger = new Logger() ) {} /** * Add a message to conversation history with observability instrumentation */ @trace("history.add_message") async addMessage(sessionId: string, message: Message): Promise<string> { return this.logger.timer("history.addMessage", { sessionId, role: message.role }, async () => { const messageId = nanoid(); const redisKey = `history:${sessionId}`; this.logger.debug("Adding message to history", { sessionId, messageId, role: message.role }); try { // Compress large messages (>1KB) for Redis storage const messageJson = JSON.stringify(message); const compressedMessage = messageJson.length > 1024 ? await this.compressMessage(message) : messageJson; // Add to Redis cache (blocking, fast) const redisStartTime = Date.now(); const redisSuccess = await safeRedisCall(this.logger, { operation: "history.add.redis_rpush", context: { sessionId, messageId }, fn: () => this.redis.rpush(redisKey, compressedMessage), fallbackValue: false, }); const redisDuration = Date.now() - redisStartTime; if (redisSuccess) { await safeRedisCall(this.logger, { operation: "history.add.redis_trim", context: { sessionId }, fn: () => this.redis.ltrim(redisKey, -20, -1), fallbackValue: false, }); await safeRedisCall(this.logger, { operation: "history.add.redis_expire", context: { sessionId }, fn: () => this.redis.expire(redisKey, 3600), fallbackValue: false, }); } // Write to D1 (async, non-blocking) void safeD1Call(this.logger, { operation: "history.add.d1_insert", context: { sessionId, messageId }, rethrow: false, fn: () => this.d1.execute( `INSERT INTO messages ( message_id, session_id, role, content, tool_calls, tool_call_id, timestamp, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, messageId, sessionId, message.role, message.content, message.toolCalls ? JSON.stringify(message.toolCalls) : null, message.toolCallId ?? null, message.timestamp, message.metadata ? JSON.stringify(message.metadata) : null ), onSuccess: () => d1WriteCounter.increment({ table: "messages", result: "success" }), onError: () => d1WriteCounter.increment({ table: "messages", result: "error" }), }); this.logger.info("Message added to history successfully", { sessionId, messageId, role: message.role, redisDuration, compressed: compressedMessage !== messageJson, }); return messageId; } catch (error) { this.logger.error( "Failed to add message to history", { sessionId, role: message.role }, error as Error ); throw error; } }); } /** * Get conversation history with observability instrumentation */ @trace("history.get_history") async getHistory(sessionId: string, limit = 50): Promise<Message[]> { return this.logger.timer("history.getHistory", { sessionId, limit }, async () => { const redisKey = `history:${sessionId}`; this.logger.debug("Retrieving conversation history", { sessionId, limit }); try { // Try Redis cache first (last 20 messages) const redisStartTime = Date.now(); const cachedMessages = (await safeRedisCall(this.logger, { operation: "history.get.redis_lrange", context: { sessionId }, fn: () => this.redis.lrange<string>(redisKey, -20, -1), fallbackValue: [], })) ?? []; const redisDuration = Date.now() - redisStartTime; if (cachedMessages.length > 0) { // Parse cached messages (use up to limit) const messages: Message[] = []; const messagesToUse = cachedMessages.slice(-limit); for (const cached of messagesToUse) { const parsed = parseJsonSafe<Message>(cached, "cached history message", this.logger, { sessionId, }); if (!parsed) { continue; } try { messages.push(MessageSchema.parse(parsed)); } catch (error) { this.logger.warn("Cached message failed schema validation", { sessionId, error: (error as Error).message, }); } } cacheHitCounter.increment({ operation: "history.getHistory" }); this.logger.info("History retrieved from cache", { sessionId, messageCount: messages.length, redisDuration, }); return messages; } // Cache miss - fetch from D1 cacheMissCounter.increment({ operation: "history.getHistory" }); const d1StartTime = Date.now(); const rows = await safeD1Call(this.logger, { operation: "history.get.d1_query", context: { sessionId, limit }, fn: () => this.d1.query<{ message_id: string; session_id: string; role: string; content: string; tool_calls: string | null; tool_call_id: string | null; timestamp: string; metadata: string | null; }>( `SELECT * FROM messages WHERE session_id = ? ORDER BY timestamp DESC LIMIT ?`, sessionId, limit ), onSuccess: () => d1ReadCounter.increment({ table: "messages", result: "success" }), onError: () => d1ReadCounter.increment({ table: "messages", result: "error" }), }); const d1Duration = Date.now() - d1StartTime; // Convert rows to messages and reverse (oldest first) const messages: Message[] = (rows ?? []).reverse().map((row) => ({ role: row.role as Message["role"], // Type assertion - validated by MessageSchema content: row.content, toolCalls: parseJsonSafe(row.tool_calls, "row.tool_calls", this.logger) ?? undefined, toolCallId: row.tool_call_id ?? undefined, timestamp: row.timestamp, metadata: parseJsonSafe(row.metadata, "row.metadata", this.logger) ?? undefined, })); // Cache messages back to Redis (last 20 messages) if (messages.length > 0) { void (async () => { try { for (const message of messages.slice(-20)) { await safeRedisCall(this.logger, { operation: "history.cache.redis_rpush", context: { sessionId }, fn: () => this.redis.rpush(redisKey, JSON.stringify(message)), fallbackValue: false, }); } await safeRedisCall(this.logger, { operation: "history.cache.redis_trim", context: { sessionId }, fn: () => this.redis.ltrim(redisKey, -20, -1), fallbackValue: false, }); await safeRedisCall(this.logger, { operation: "history.cache.redis_expire", context: { sessionId }, fn: () => this.redis.expire(redisKey, 3600), fallbackValue: false, }); } catch (error) { // Ignore Redis caching errors - non-blocking this.logger.warn("Failed to cache messages in Redis", { sessionId, error: (error as Error).message, }); } })(); } this.logger.info("History retrieved from database", { sessionId, messageCount: messages.length, redisDuration, d1Duration, totalDuration: redisDuration + d1Duration, }); return messages; } catch (error) { d1ReadCounter.increment({ table: "messages", result: "error" }); this.logger.error( "Failed to retrieve conversation history", { sessionId, limit }, error as Error ); throw error; } }); } /** * Get paginated history with cursor */ async getHistoryPage( sessionId: string, cursor?: string, limit = 50 ): Promise<{ messages: Message[]; nextCursor?: string }> { const queryLimit = limit + 1; // Get one extra to check if there are more let sql = `SELECT * FROM messages WHERE session_id = ?`; const params: unknown[] = [sessionId]; if (cursor) { sql += ` AND timestamp > ?`; params.push(cursor); } sql += ` ORDER BY timestamp ASC LIMIT ?`; params.push(queryLimit); const rows = await this.d1.query<{ message_id: string; session_id: string; role: string; content: string; tool_calls: string | null; tool_call_id: string | null; timestamp: string; metadata: string | null; }>(sql, ...params); const hasMore = rows.length === queryLimit; const messagesToReturn = hasMore ? rows.slice(0, -1) : rows; const messages: Message[] = messagesToReturn.map((row) => ({ role: row.role as Message["role"], // Type assertion - validated by MessageSchema content: row.content, toolCalls: row.tool_calls ? JSON.parse(row.tool_calls) : undefined, toolCallId: row.tool_call_id ?? undefined, timestamp: row.timestamp, metadata: row.metadata ? JSON.parse(row.metadata) : undefined, })); const nextCursor = hasMore ? messagesToReturn[messagesToReturn.length - 1].timestamp : undefined; return { messages, nextCursor, }; } /** * Get message count for session */ async getMessageCount(sessionId: string): Promise<number> { const result = await this.d1.queryOne<{ count: number }>( "SELECT COUNT(*) as count FROM messages WHERE session_id = ?", sessionId ); return result?.count ?? 0; } /** * Clear history for session (dangerous - use with caution) */ async clearHistory(sessionId: string): Promise<boolean> { const redisKey = `history:${sessionId}`; // Clear Redis cache await this.redis.delete(redisKey); // Clear D1 storage try { await this.d1.execute("DELETE FROM messages WHERE session_id = ?", sessionId); return true; } catch (error) { console.error("Failed to clear history from D1", { sessionId, error }); return false; } } /** * Compress large messages for Redis storage * In a real implementation, this would use compression like gzip * For now, just return the JSON (placeholder) */ private async compressMessage(message: Message): Promise<string> { // TODO: Implement compression (gzip, lz4, etc.) return JSON.stringify(message); } /** * Decompress messages from Redis * In a real implementation, this would decompress * For now, just parse JSON (placeholder) * @internal * Reserved for future compression implementation */ // @ts-expect-error - Unused method, reserved for future compression implementation private async _decompressMessage(_compressed: string): Promise<Message> { // TODO: Implement decompression return JSON.parse(_compressed); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hummbl-dev/mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server