Skip to main content
Glama
kv.ts8.39 kB
// Cloudflare KV storage with encryption support // Provider-agnostic version from Spotify MCP import type { ProviderTokens, RsRecord, SessionRecord, SessionStore, TokenStore, Transaction, } from './interface.js'; import { MemorySessionStore, MemoryTokenStore } from './memory.js'; // Cloudflare KV namespace type type KVNamespace = { get(key: string): Promise<string | null>; put( key: string, value: string, options?: { expiration?: number; expirationTtl?: number }, ): Promise<void>; delete(key: string): Promise<void>; }; type EncryptFn = (plaintext: string) => Promise<string> | string; type DecryptFn = (ciphertext: string) => Promise<string> | string; function ttl(seconds: number): number { return Math.floor(Date.now() / 1000) + seconds; } function toJson(value: unknown): string { return JSON.stringify(value); } function fromJson<T>(value: string | null): T | null { if (!value) { return null; } try { return JSON.parse(value) as T; } catch { return null; } } export class KvTokenStore implements TokenStore { private kv: KVNamespace; private encrypt: EncryptFn; private decrypt: DecryptFn; private fallback: MemoryTokenStore; constructor( kv: KVNamespace, options?: { encrypt?: EncryptFn; decrypt?: DecryptFn; fallback?: MemoryTokenStore; }, ) { this.kv = kv; this.encrypt = options?.encrypt ?? ((s) => s); this.decrypt = options?.decrypt ?? ((s) => s); this.fallback = options?.fallback ?? new MemoryTokenStore(); } private async putJson( key: string, value: unknown, options?: { expiration?: number; expirationTtl?: number }, ): Promise<void> { try { const raw = await this.encrypt(toJson(value)); await this.kv.put(key, raw, options); } catch (error) { // KV write failed (likely quota exceeded) - log but don't crash // Fallback memory store will still have the data console.error('[KV] Write failed:', (error as Error).message); throw error; // Re-throw so caller knows KV failed } } private async getJson<T>(key: string): Promise<T | null> { const raw = await this.kv.get(key); if (!raw) { return null; } const plain = await this.decrypt(raw); return fromJson<T>(plain); } async storeRsMapping( rsAccess: string, provider: ProviderTokens, rsRefresh?: string, ): Promise<RsRecord> { const rec: RsRecord = { rs_access_token: rsAccess, rs_refresh_token: rsRefresh ?? crypto.randomUUID(), provider: { ...provider }, created_at: Date.now(), }; // CRITICAL: Store in memory fallback FIRST // If KV fails (quota/network), memory still has it await this.fallback.storeRsMapping(rsAccess, provider, rsRefresh); // Then try KV (may fail due to quota) try { await Promise.all([ this.putJson(`rs:access:${rec.rs_access_token}`, rec), this.putJson(`rs:refresh:${rec.rs_refresh_token}`, rec), ]); } catch (error) { console.warn( '[KV] Failed to persist RS mapping (using memory fallback):', (error as Error).message, ); // Don't throw - memory fallback has the data } return rec; } async getByRsAccess(rsAccess: string): Promise<RsRecord | null> { const rec = await this.getJson<RsRecord>(`rs:access:${rsAccess}`); return rec ?? (await this.fallback.getByRsAccess(rsAccess)); } async getByRsRefresh(rsRefresh: string): Promise<RsRecord | null> { const rec = await this.getJson<RsRecord>(`rs:refresh:${rsRefresh}`); return rec ?? (await this.fallback.getByRsRefresh(rsRefresh)); } async updateByRsRefresh( rsRefresh: string, provider: ProviderTokens, maybeNewRsAccess?: string, ): Promise<RsRecord | null> { const existing = await this.getJson<RsRecord>(`rs:refresh:${rsRefresh}`); if (!existing) { return this.fallback.updateByRsRefresh(rsRefresh, provider, maybeNewRsAccess); } const next: RsRecord = { rs_access_token: maybeNewRsAccess || existing.rs_access_token, rs_refresh_token: rsRefresh, provider: { ...provider }, created_at: Date.now(), }; // Update memory fallback first await this.fallback.updateByRsRefresh(rsRefresh, provider, maybeNewRsAccess); // Then try KV (may fail due to quota) try { await Promise.all([ this.kv.delete(`rs:access:${existing.rs_access_token}`), this.putJson(`rs:access:${next.rs_access_token}`, next), this.putJson(`rs:refresh:${rsRefresh}`, next), ]); } catch (error) { console.warn( '[KV] Failed to update RS mapping (using memory fallback):', (error as Error).message, ); // Don't throw - memory fallback has the data } return next; } async saveTransaction( txnId: string, txn: Transaction, ttlSeconds = 600, ): Promise<void> { // Memory fallback first (critical for OAuth flow) await this.fallback.saveTransaction(txnId, txn); // KV is optional (nice to have for persistence across instances) try { await this.putJson(`txn:${txnId}`, txn, { expiration: ttl(ttlSeconds) }); } catch (error) { console.warn( '[KV] Failed to save transaction (using memory):', (error as Error).message, ); // Don't throw - memory has it } } async getTransaction(txnId: string): Promise<Transaction | null> { const txn = await this.getJson<Transaction>(`txn:${txnId}`); return txn ?? (await this.fallback.getTransaction(txnId)); } async deleteTransaction(txnId: string): Promise<void> { await this.kv.delete(`txn:${txnId}`); await this.fallback.deleteTransaction(txnId); } async saveCode(code: string, txnId: string, ttlSeconds = 600): Promise<void> { // Memory fallback first (critical for OAuth flow) await this.fallback.saveCode(code, txnId); // KV is optional try { await this.putJson(`code:${code}`, { v: txnId }, { expiration: ttl(ttlSeconds) }); } catch (error) { console.warn( '[KV] Failed to save code (using memory):', (error as Error).message, ); // Don't throw - memory has it } } async getTxnIdByCode(code: string): Promise<string | null> { const obj = await this.getJson<{ v: string }>(`code:${code}`); return obj?.v ?? (await this.fallback.getTxnIdByCode(code)); } async deleteCode(code: string): Promise<void> { await this.kv.delete(`code:${code}`); await this.fallback.deleteCode(code); } } const SESSION_KEY_PREFIX = 'session:'; const SESSION_TTL_SECONDS = 24 * 60 * 60; export class KvSessionStore implements SessionStore { private kv: KVNamespace; private encrypt: EncryptFn; private decrypt: DecryptFn; private fallback: MemorySessionStore; constructor( kv: KVNamespace, options?: { encrypt?: EncryptFn; decrypt?: DecryptFn; fallback?: MemorySessionStore; }, ) { this.kv = kv; this.encrypt = options?.encrypt ?? ((s) => s); this.decrypt = options?.decrypt ?? ((s) => s); this.fallback = options?.fallback ?? new MemorySessionStore(); } private async putSession(key: string, value: SessionRecord): Promise<void> { const raw = await this.encrypt(toJson(value)); await this.kv.put(`${SESSION_KEY_PREFIX}${key}`, raw, { expiration: ttl(SESSION_TTL_SECONDS), }); await this.fallback.put(key, value); } private async getSession(key: string): Promise<SessionRecord | null> { const raw = await this.kv.get(`${SESSION_KEY_PREFIX}${key}`); if (!raw) { return this.fallback.get(key); } const plain = await this.decrypt(raw); return fromJson<SessionRecord>(plain); } async ensure(sessionId: string): Promise<void> { const existing = await this.getSession(sessionId); if (!existing) { await this.putSession(sessionId, { created_at: Date.now() }); } } async get(sessionId: string): Promise<SessionRecord | null> { return this.getSession(sessionId); } async put(sessionId: string, value: SessionRecord): Promise<void> { await this.putSession(sessionId, value); } async delete(sessionId: string): Promise<void> { await this.kv.delete(`${SESSION_KEY_PREFIX}${sessionId}`); await this.fallback.delete(sessionId); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/iceener/tesla-streamable-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server