Skip to main content
Glama
storage.js10.8 kB
/** * JSON 文件存储层 * 提供内存数据的持久化存储功能 */ import fs from 'fs/promises'; import path from 'path'; import { fileURLToPath } from 'url'; import { prepareModalitiesForStorage } from './modalities.js'; import { optimizeMemory } from '../utils/data-optimizer.js'; import { deduplicateImageModalities } from '../utils/image-processor.js'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); // 默认存储目录(项目根目录下的 data 文件夹) const DEFAULT_DATA_DIR = path.join(__dirname, '../../data'); // 目录存在性缓存,避免重复检查 const dirExistsCache = new Set(); /** * 确保数据目录存在(带缓存优化) * @param {string} dirPath - 目录路径 */ async function ensureDir(dirPath) { // 检查缓存 if (dirExistsCache.has(dirPath)) { return; } try { await fs.access(dirPath); // 目录存在,加入缓存 dirExistsCache.add(dirPath); } catch (error) { if (error.code === 'ENOENT') { // 目录不存在,创建它 try { await fs.mkdir(dirPath, { recursive: true }); dirExistsCache.add(dirPath); } catch (mkdirError) { // 处理并发创建的竞态条件 if (mkdirError.code === 'EEXIST') { dirExistsCache.add(dirPath); } else { // 其他错误则抛出 throw mkdirError; } } } else { // 其他访问错误(权限等) throw error; } } } /** * 加载 JSON 文件,如果文件不存在则返回默认值 * @param {string} filePath - 文件路径 * @param {*} defaultValue - 默认值 * @returns {Promise<*>} 加载的数据或默认值 */ export async function loadJsonFileIfExists(filePath, defaultValue = null) { try { const content = await fs.readFile(filePath, 'utf-8'); return JSON.parse(content); } catch (error) { if (error.code === 'ENOENT') { // 文件不存在,返回默认值 return defaultValue; } // 其他错误(如 JSON 解析错误)抛出 console.error(`Error loading JSON file ${filePath}:`, error); throw error; } } // 写入缓存和批次处理 const writeCache = new Map(); // filePath -> { data, timer, dirty } const WRITE_DELAY_MS = 1000; // 延迟写入时间(1秒) const MAX_RETRIES = 3; // 最大重试次数 /** * 保存数据到 JSON 文件(带重试机制) * @param {string} filePath - 文件路径 * @param {*} data - 要保存的数据 * @param {number} retryCount - 当前重试次数 */ async function saveJsonFileWithRetry(filePath, data, retryCount = 0) { try { // 确保目录存在 await ensureDir(path.dirname(filePath)); // 序列化并保存 const content = JSON.stringify(data, null, 2); await fs.writeFile(filePath, content, 'utf-8'); } catch (error) { if (retryCount < MAX_RETRIES && (error.code === 'EBUSY' || error.code === 'EAGAIN')) { // 文件正在被使用,延迟后重试 const delay = Math.pow(2, retryCount) * 100; // 指数退避 await new Promise(resolve => setTimeout(resolve, delay)); return saveJsonFileWithRetry(filePath, data, retryCount + 1); } console.error(`Error saving JSON file ${filePath}:`, error); throw error; } } /** * 保存数据到 JSON 文件(带缓存和批次处理) * @param {string} filePath - 文件路径 * @param {*} data - 要保存的数据 * @param {boolean} immediate - 是否立即写入 */ export async function saveJsonFile(filePath, data, immediate = false) { if (immediate) { // 立即写入模式 if (writeCache.has(filePath)) { const cached = writeCache.get(filePath); clearTimeout(cached.timer); writeCache.delete(filePath); } return saveJsonFileWithRetry(filePath, data); } // 延迟写入模式 const cached = writeCache.get(filePath); if (cached) { // 更新缓存数据并重置定时器 clearTimeout(cached.timer); cached.data = data; cached.dirty = true; } else { // 创建新缓存项 writeCache.set(filePath, { data, dirty: true, timer: null }); } // 设置延迟写入定时器 const cacheEntry = writeCache.get(filePath); cacheEntry.timer = setTimeout(async () => { if (cacheEntry.dirty) { try { await saveJsonFileWithRetry(filePath, cacheEntry.data); cacheEntry.dirty = false; } catch (error) { console.error(`Failed to flush write cache for ${filePath}:`, error); } } writeCache.delete(filePath); }, WRITE_DELAY_MS); } /** * 刷新所有待写入的缓存(用于优雅关机) */ export async function flushAllWrites() { const flushPromises = []; for (const [filePath, cached] of writeCache.entries()) { if (cached.dirty) { clearTimeout(cached.timer); flushPromises.push( saveJsonFileWithRetry(filePath, cached.data) .catch(error => { console.error(`Error flushing ${filePath}:`, error); }) ); } else { clearTimeout(cached.timer); } } await Promise.all(flushPromises); writeCache.clear(); } function stripEphemeralFields(object) { if (!object || typeof object !== 'object') { return; } for (const key of Object.keys(object)) { if (key.startsWith('_')) { delete object[key]; } } } function normalizeDateLike(value) { if (value instanceof Date) { return value.toISOString(); } return value; } function prepareShortTermMemoryForStorage(memory) { const sanitized = { ...memory }; stripEphemeralFields(sanitized); // 統一時間戳為 timestamp 字段(ISO 字符串) const timeSource = sanitized.time_stamp ?? sanitized.timestamp ?? sanitized.timeStamp; if (timeSource) { sanitized.timestamp = normalizeDateLike(timeSource); } delete sanitized.time_stamp; delete sanitized.timeStamp; if (Array.isArray(memory.keywords)) { sanitized.keywords = memory.keywords .filter(kw => kw && typeof kw.word === 'string') .map(kw => ({ word: kw.word, weight: Number.isFinite(kw.weight) ? kw.weight : 1 })); } // 準備 modalities,優先使用 modalities 字段 let modalities = prepareModalitiesForStorage(memory.modalities ?? memory.attachments ?? []); // 去除重複的圖像 modalities = deduplicateImageModalities(modalities); // 只保存 modalities,不保存冗餘的 attachments sanitized.modalities = modalities; delete sanitized.attachments; // 應用數據優化(去重關鍵詞等) return optimizeMemory(sanitized, { normalizeTimestamps: false, // 已經處理過了 removeAttachmentsRedundancy: false, // 已經處理過了 deduplicateKeywords: true }); } function prepareLongTermMemoryForStorage(memory) { const sanitized = { ...memory }; stripEphemeralFields(sanitized); // 標準化時間戳 if (sanitized.createdAt) { sanitized.createdAt = normalizeDateLike(sanitized.createdAt); } if (sanitized.updatedAt) { sanitized.updatedAt = normalizeDateLike(sanitized.updatedAt); } // 準備 modalities,優先使用 modalities 字段 let modalities = prepareModalitiesForStorage(memory.modalities ?? memory.attachments ?? []); // 去除重複的圖像 modalities = deduplicateImageModalities(modalities); // 只保存 modalities,不保存冗餘的 attachments sanitized.modalities = modalities; delete sanitized.attachments; return sanitized; } /** * 存储管理器类 * 管理特定对话的记忆存储 */ export class StorageManager { /** * @param {string} conversationId - 对话 ID * @param {string} [dataDir] - 数据目录路径 */ constructor(conversationId, dataDir = DEFAULT_DATA_DIR) { this.conversationId = conversationId; this.dataDir = dataDir; this.conversationDir = path.join(dataDir, this.sanitizeId(conversationId)); } /** * 清理 ID,使其适合作为文件/目录名 * @param {string} id - 原始 ID * @returns {string} 清理后的 ID */ sanitizeId(id) { return id.replace(/[^a-zA-Z0-9_-]/g, '_'); } /** * 获取短期记忆文件路径 * @returns {string} */ getShortTermPath() { return path.join(this.conversationDir, 'short-term-memory.json'); } /** * 获取长期记忆文件路径 * @returns {string} */ getLongTermPath() { return path.join(this.conversationDir, 'long-term-memory.json'); } /** * 加载短期记忆 * @returns {Promise<Array>} */ async loadShortTermMemories() { return await loadJsonFileIfExists(this.getShortTermPath(), []); } /** * 保存短期记忆 * @param {Array} memories */ async saveShortTermMemories(memories) { const dataToSave = Array.isArray(memories) ? memories.map(prepareShortTermMemoryForStorage) : []; await saveJsonFile(this.getShortTermPath(), dataToSave); } /** * 加载长期记忆 * @returns {Promise<Array>} */ async loadLongTermMemories() { return await loadJsonFileIfExists(this.getLongTermPath(), []); } /** * 保存长期记忆 * @param {Array} memories */ async saveLongTermMemories(memories) { const dataToSave = Array.isArray(memories) ? memories.map(prepareLongTermMemoryForStorage) : []; await saveJsonFile(this.getLongTermPath(), dataToSave); } /** * 检查对话数据是否存在 * @returns {Promise<boolean>} */ async exists() { try { await fs.access(this.conversationDir); return true; } catch { return false; } } /** * 删除对话的所有数据 */ async deleteAll() { try { await fs.rm(this.conversationDir, { recursive: true, force: true }); } catch (error) { console.error(`Error deleting conversation data:`, error); throw error; } } } /** * 获取所有已存储的对话 ID 列表 * @param {string} [dataDir] - 数据目录路径 * @returns {Promise<string[]>} */ export async function listConversations(dataDir = DEFAULT_DATA_DIR) { try { await ensureDir(dataDir); const entries = await fs.readdir(dataDir, { withFileTypes: true }); return entries .filter(entry => entry.isDirectory()) .map(entry => entry.name); } catch (error) { console.error('Error listing conversations:', error); return []; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/win10ogod/memory-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server