Skip to main content
Glama
portel-dev

NCP - Natural Context Provider

by portel-dev
csv-cache.ts17.5 kB
/** * CSV-based incremental cache for NCP * Enables resumable indexing by appending each MCP as it's indexed */ import { createWriteStream, WriteStream, fsync, openSync, fsyncSync, closeSync } from 'fs'; import { mkdir, readFile, writeFile, access, constants } from 'fs/promises'; import { join, dirname } from 'path'; import { fileURLToPath } from 'url'; import { createHash } from 'crypto'; import { logger } from '../utils/logger.js'; export interface CachedTool { mcpName: string; toolId: string; toolName: string; description: string; hash: string; timestamp: string; } export interface CachedMCP { name: string; hash: string; toolCount: number; timestamp: string; tools: CachedTool[]; } export interface FailedMCP { name: string; lastAttempt: string; // ISO timestamp errorType: string; // 'timeout', 'connection_refused', 'unknown' errorMessage: string; attemptCount: number; nextRetry: string; // ISO timestamp - when to retry next } export interface CacheMetadata { version: string; profileName: string; profileHash: string; ncpVersion: string; // Track NCP version to detect code changes createdAt: string; lastUpdated: string; totalMCPs: number; totalTools: number; indexedMCPs: Map<string, string>; // mcpName -> mcpHash failedMCPs: Map<string, FailedMCP>; // mcpName -> failure info } export class CSVCache { private csvPath: string; private metaPath: string; private writeStream: WriteStream | null = null; private metadata: CacheMetadata | null = null; constructor(private cacheDir: string, private profileName: string) { // Debug logging to track profile name if (process.env.NCP_DEBUG === 'true') { console.error(`[DEBUG CSV] CSVCache initialized with profileName: ${profileName}`); console.error(`[DEBUG CSV] csvPath: ${join(cacheDir, `${profileName}-tools.csv`)}`); console.error(`[DEBUG CSV] metaPath: ${join(cacheDir, `${profileName}-cache-meta.json`)}`); } this.csvPath = join(cacheDir, `${profileName}-tools.csv`); this.metaPath = join(cacheDir, `${profileName}-cache-meta.json`); } /** * Initialize cache - create files if needed */ async initialize(): Promise<void> { // Ensure cache directory exists await mkdir(dirname(this.csvPath), { recursive: true }); // Get current NCP version const currentVersion = await this.getNcpVersion(); // Load or create metadata try { await access(this.metaPath, constants.F_OK); // File exists, read it const content = await readFile(this.metaPath, 'utf-8'); const parsed = JSON.parse(content); // Convert objects back to Maps const loadedMetadata = { ...parsed, indexedMCPs: new Map(Object.entries(parsed.indexedMCPs || {})), failedMCPs: new Map(Object.entries(parsed.failedMCPs || {})) }; // Check if NCP version changed (code updated) if (loadedMetadata.ncpVersion !== currentVersion) { logger.info(`NCP version changed (${loadedMetadata.ncpVersion || 'unknown'} → ${currentVersion}), clearing failed MCPs`); loadedMetadata.ncpVersion = currentVersion; loadedMetadata.failedMCPs.clear(); // Clear failures when code changes } this.metadata = loadedMetadata; await this.saveMetadata(); } catch (error) { // File doesn't exist or parse error - will create new metadata below if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { logger.warn(`Failed to load cache metadata: ${error}`); } this.metadata = null; } if (!this.metadata) { this.metadata = { version: '1.0', profileName: this.profileName, profileHash: '', ncpVersion: currentVersion, createdAt: new Date().toISOString(), lastUpdated: new Date().toISOString(), totalMCPs: 0, totalTools: 0, indexedMCPs: new Map(), failedMCPs: new Map() }; } } /** * Get NCP version from package.json (async) */ private async getNcpVersion(): Promise<string> { try { // Get __dirname equivalent in ES modules const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); // Try multiple paths to find package.json const possiblePaths = [ join(__dirname, '../../package.json'), // From dist/cache join(process.cwd(), 'package.json'), // From CWD join(__dirname, '../../../package.json') // From node_modules ]; for (const pkgPath of possiblePaths) { try { await access(pkgPath, constants.F_OK); const content = await readFile(pkgPath, 'utf-8'); const pkg = JSON.parse(content); if (pkg.version) { return pkg.version; } } catch { // Try next path continue; } } return '1.0.0'; // Fallback version } catch (error) { logger.debug(`Could not determine NCP version: ${error}`); return '1.0.0'; } } /** * Validate cache against current profile configuration (async) */ async validateCache(currentProfileHash: string): Promise<boolean> { if (!this.metadata) return false; // Check if profile configuration changed if (this.metadata.profileHash !== currentProfileHash) { logger.info('Profile configuration changed, cache invalid'); return false; } // Check if CSV file exists try { await access(this.csvPath, constants.F_OK); } catch { logger.info('CSV cache file missing'); return false; } // Check cache age (invalidate after 7 days) const cacheAge = Date.now() - new Date(this.metadata.createdAt).getTime(); const maxAge = 7 * 24 * 60 * 60 * 1000; // 7 days if (cacheAge > maxAge) { logger.info('Cache older than 7 days, invalidating'); return false; } return true; } /** * Get list of already-indexed MCPs with their hashes */ getIndexedMCPs(): Map<string, string> { return this.metadata?.indexedMCPs || new Map(); } /** * Check if an MCP is already indexed and up-to-date */ isMCPIndexed(mcpName: string, currentHash: string): boolean { const cached = this.metadata?.indexedMCPs.get(mcpName); return cached === currentHash; } /** * Remove MCP from index (forces re-indexing) */ removeMCPFromIndex(mcpName: string): void { if (this.metadata?.indexedMCPs) { this.metadata.indexedMCPs.delete(mcpName); } } /** * Load all cached tools from CSV (async) */ async loadCachedTools(): Promise<CachedTool[]> { try { await access(this.csvPath, constants.F_OK); } catch { return []; } try { const content = await readFile(this.csvPath, 'utf-8'); const lines = content.trim().split('\n'); // Skip header if (lines.length <= 1) return []; const tools: CachedTool[] = []; for (let i = 1; i < lines.length; i++) { const parts = this.parseCSVLine(lines[i]); if (parts.length >= 6) { tools.push({ mcpName: parts[0], toolId: parts[1], toolName: parts[2], description: parts[3], hash: parts[4], timestamp: parts[5] }); } } return tools; } catch (error) { logger.error(`Failed to load cached tools: ${error}`); return []; } } /** * Load cached tools for a specific MCP (async) */ async loadMCPTools(mcpName: string): Promise<CachedTool[]> { const allTools = await this.loadCachedTools(); return allTools.filter(t => t.mcpName === mcpName); } /** * Start incremental writing (append mode) */ async startIncrementalWrite(profileHash: string): Promise<void> { let isNewCache = true; try { await access(this.csvPath, constants.F_OK); isNewCache = false; } catch { // File doesn't exist, it's a new cache } // Always update profile hash (critical for cache validation) if (this.metadata) { this.metadata.profileHash = profileHash; } if (isNewCache) { // Create new cache file with header this.writeStream = createWriteStream(this.csvPath, { flags: 'w' }); this.writeStream.write('mcp_name,tool_id,tool_name,description,hash,timestamp\n'); // Initialize metadata for new cache if (this.metadata) { this.metadata.createdAt = new Date().toISOString(); this.metadata.indexedMCPs.clear(); } } else { // Append to existing cache this.writeStream = createWriteStream(this.csvPath, { flags: 'a' }); } } /** * Append tools from an MCP to cache */ async appendMCP(mcpName: string, tools: CachedTool[], mcpHash: string): Promise<void> { if (!this.writeStream) { throw new Error('Cache writer not initialized. Call startIncrementalWrite() first.'); } // Write each tool as a CSV row for (const tool of tools) { const row = this.formatCSVLine([ tool.mcpName, tool.toolId, tool.toolName, tool.description, tool.hash, tool.timestamp ]); this.writeStream.write(row + '\n'); } // Force flush to disk for crash safety await this.flushWriteStream(); // Update metadata if (this.metadata) { this.metadata.indexedMCPs.set(mcpName, mcpHash); this.metadata.totalMCPs = this.metadata.indexedMCPs.size; this.metadata.totalTools += tools.length; this.metadata.lastUpdated = new Date().toISOString(); // Save metadata after each MCP (for crash safety) this.saveMetadata(); } logger.info(`📝 Appended ${tools.length} tools from ${mcpName} to cache`); } /** * Finalize cache writing */ async finalize(): Promise<void> { if (this.writeStream) { // Wait for stream to finish writing before closing await new Promise<void>((resolve, reject) => { this.writeStream!.end((err: any) => { if (err) reject(err); else resolve(); }); }); this.writeStream = null; } await this.saveMetadata(); logger.debug(`Cache finalized: ${this.metadata?.totalTools} tools from ${this.metadata?.totalMCPs} MCPs`); } /** * Clear cache completely */ async clear(): Promise<void> { try { // Try to delete CSV file try { await access(this.csvPath, constants.F_OK); const fs = await import('fs/promises'); await fs.unlink(this.csvPath); } catch { // File doesn't exist, skip } // Try to delete metadata file try { await access(this.metaPath, constants.F_OK); const fs = await import('fs/promises'); await fs.unlink(this.metaPath); } catch { // File doesn't exist, skip } this.metadata = null; logger.info('Cache cleared'); } catch (error) { logger.error(`Failed to clear cache: ${error}`); } } /** * Save metadata to disk with fsync for crash safety (async) */ private async saveMetadata(): Promise<void> { if (!this.metadata) return; try { // Convert Maps to objects for JSON serialization const metaToSave = { ...this.metadata, indexedMCPs: Object.fromEntries(this.metadata.indexedMCPs), failedMCPs: Object.fromEntries(this.metadata.failedMCPs) }; // Write metadata file asynchronously await writeFile(this.metaPath, JSON.stringify(metaToSave, null, 2), 'utf-8'); // Force sync to disk (open file, fsync, close) const fd = openSync(this.metaPath, 'r+'); await new Promise<void>((resolve, reject) => { fsync(fd, (err) => { closeSync(fd); if (err) reject(err); else resolve(); }); }); } catch (error) { logger.error(`Failed to save metadata: ${error}`); } } /** * Force flush write stream to disk */ private async flushWriteStream(): Promise<void> { if (!this.writeStream) return; return new Promise((resolve, reject) => { // Wait for any pending writes to drain if (this.writeStream!.writableNeedDrain) { this.writeStream!.once('drain', () => { // Then force sync to disk const fd = (this.writeStream as any).fd; if (fd !== undefined) { fsync(fd, (err) => { if (err) reject(err); else resolve(); }); } else { resolve(); } }); } else { // No drain needed, just sync to disk const fd = (this.writeStream as any).fd; if (fd !== undefined) { fsync(fd, (err) => { if (err) reject(err); else resolve(); }); } else { resolve(); } } }); } /** * Format CSV line with proper escaping */ private formatCSVLine(fields: string[]): string { return fields.map(field => { // Escape quotes and wrap in quotes if contains comma, quote, or newline if (field.includes(',') || field.includes('"') || field.includes('\n')) { return `"${field.replace(/"/g, '""')}"`; } return field; }).join(','); } /** * Parse CSV line handling quoted fields */ private parseCSVLine(line: string): string[] { const fields: string[] = []; let current = ''; let inQuotes = false; for (let i = 0; i < line.length; i++) { const char = line[i]; if (char === '"') { if (inQuotes && line[i + 1] === '"') { current += '"'; i++; // Skip next quote } else { inQuotes = !inQuotes; } } else if (char === ',' && !inQuotes) { fields.push(current); current = ''; } else { current += char; } } fields.push(current); return fields; } /** * Mark an MCP as failed with retry scheduling */ markFailed(mcpName: string, error: Error): void { if (!this.metadata) return; const existing = this.metadata.failedMCPs.get(mcpName); const attemptCount = (existing?.attemptCount || 0) + 1; // Exponential backoff: 1 hour, 6 hours, 24 hours, then always 24 hours const retryDelays = [ 60 * 60 * 1000, // 1 hour 6 * 60 * 60 * 1000, // 6 hours 24 * 60 * 60 * 1000 // 24 hours (then keep this) ]; const delayIndex = Math.min(attemptCount - 1, retryDelays.length - 1); const retryDelay = retryDelays[delayIndex]; // Determine error type let errorType = 'unknown'; if (error.message.includes('timeout') || error.message.includes('Probe timeout')) { errorType = 'timeout'; } else if (error.message.includes('ECONNREFUSED') || error.message.includes('connection')) { errorType = 'connection_refused'; } else if (error.message.includes('ENOENT') || error.message.includes('command not found')) { errorType = 'command_not_found'; } const failedMCP: FailedMCP = { name: mcpName, lastAttempt: new Date().toISOString(), errorType, errorMessage: error.message, attemptCount, nextRetry: new Date(Date.now() + retryDelay).toISOString() }; this.metadata.failedMCPs.set(mcpName, failedMCP); this.saveMetadata(); logger.info(`📋 Marked ${mcpName} as failed (attempt ${attemptCount}), will retry after ${new Date(failedMCP.nextRetry).toLocaleString()}`); } /** * Check if we should retry a failed MCP */ shouldRetryFailed(mcpName: string, forceRetry: boolean = false): boolean { if (!this.metadata) return true; const failed = this.metadata.failedMCPs.get(mcpName); if (!failed) return true; // Never tried, should try if (forceRetry) return true; // Force retry flag // Check if enough time has passed const now = new Date(); const nextRetry = new Date(failed.nextRetry); return now >= nextRetry; } /** * Clear all failed MCPs (for force retry) */ clearFailedMCPs(): void { if (!this.metadata) return; this.metadata.failedMCPs.clear(); this.saveMetadata(); logger.info('Cleared all failed MCPs'); } /** * Get failed MCPs count */ getFailedMCPsCount(): number { return this.metadata?.failedMCPs.size || 0; } /** * Get failed MCPs that are ready for retry */ getRetryReadyFailedMCPs(): string[] { if (!this.metadata) return []; const now = new Date(); const ready: string[] = []; for (const [name, failed] of this.metadata.failedMCPs) { const nextRetry = new Date(failed.nextRetry); if (now >= nextRetry) { ready.push(name); } } return ready; } /** * Check if an MCP is in the failed list */ isMCPFailed(mcpName: string): boolean { if (!this.metadata) return false; return this.metadata.failedMCPs.has(mcpName); } /** * Hash profile configuration for change detection */ static hashProfile(profile: any): string { const str = JSON.stringify(profile, Object.keys(profile).sort()); return createHash('sha256').update(str).digest('hex'); } /** * Hash tool configuration for change detection */ static hashTools(tools: any[]): string { const str = JSON.stringify(tools.map(t => ({ name: t.name, description: t.description }))); return createHash('sha256').update(str).digest('hex'); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/portel-dev/ncp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server