Skip to main content
Glama
agent-runtime.manager.ts10.4 kB
import { Injectable, Logger } from '@nestjs/common'; import { getGuardValue, type AgentConfig } from '@snakagent/core'; import { BaseAgent } from '@snakagent/agents'; const parsePositiveInt = ( value: string | undefined, fallback: number ): number => { if (!value) { return fallback; } const parsed = Number.parseInt(value, 10); if (!Number.isFinite(parsed) || parsed <= 0) { return fallback; } return parsed; }; const parseDurationMs = (fallback: number): number => { const ttlMsRaw = process.env.AGENT_RUNTIME_CACHE_TTL_MS; if (ttlMsRaw) { const parsed = Number.parseInt(ttlMsRaw, 10); if (Number.isFinite(parsed) && parsed >= 0) { return parsed; } } const ttlSecondsRaw = process.env.AGENT_RUNTIME_CACHE_TTL_SECONDS; if (ttlSecondsRaw) { const parsed = Number.parseInt(ttlSecondsRaw, 10); if (Number.isFinite(parsed) && parsed >= 0) { return parsed * 1000; } } return fallback; }; export interface AgentRuntimeSeed { agentId: string; userId: string; cfgVersion: number; runtime: AgentConfig.Runtime; agent?: BaseAgent; rebuild: () => Promise<AgentRuntimeSeed | null>; ttlMs?: number; } interface CacheEntry { agentId: string; userId: string; cfgVersion: number; runtime: AgentConfig.Runtime; agent?: BaseAgent; rebuild: () => Promise<AgentRuntimeSeed | null>; expiresAt: number; refCount: number; lastAccess: number; } @Injectable() export class AgentRuntimeManager { private readonly logger = new Logger(AgentRuntimeManager.name); private readonly cache = new Map<string, CacheEntry>(); private readonly inflight = new Map<string, Promise<void>>(); private readonly maxEntries: number; private readonly defaultTtlMs: number; constructor() { const guardMaxEntries = getGuardValue( 'agent_runtime.cache.max_entries' ) as number; const guardMaxTtlMs = getGuardValue( 'agent_runtime.cache.max_ttl_ms' ) as number; this.maxEntries = parsePositiveInt( process.env.AGENT_RUNTIME_CACHE_MAX_ENTRIES, guardMaxEntries ); if (this.maxEntries > guardMaxEntries) { this.logger.warn( `AGENT_RUNTIME_CACHE_MAX_ENTRIES resolved to ${this.maxEntries}; capping to guard limit ${guardMaxEntries}` ); this.maxEntries = guardMaxEntries; } const resolvedTtlMs = parseDurationMs(guardMaxTtlMs); if (resolvedTtlMs > guardMaxTtlMs) { this.logger.warn( `AGENT_RUNTIME_CACHE_TTL resolved to ${resolvedTtlMs}; capping to guard limit ${guardMaxTtlMs}` ); this.defaultTtlMs = guardMaxTtlMs; } else { this.defaultTtlMs = resolvedTtlMs; } } /** * Shadow-mode registration used to prime the runtime cache without serving responses from it. * Records hit/miss metrics and updates the cached payload when necessary. */ async shadowSeed(seed: AgentRuntimeSeed): Promise<void> { await this.withInflight(seed.agentId, async () => { const now = Date.now(); this.pruneExpired(now); const existing = this.cache.get(seed.agentId); if ( existing && existing.cfgVersion === seed.cfgVersion && !this.isExpired(existing, now) ) { existing.runtime = seed.runtime; existing.agent = seed.agent; existing.rebuild = seed.rebuild; existing.userId = seed.userId; existing.expiresAt = this.computeExpires(now, seed.ttlMs); existing.lastAccess = now; this.bump(seed.agentId, existing); return; } await this.installSeed(seed, now); }); } /** * Acquire a runtime from cache, incrementing its reference count. */ async acquire(agentId: string): Promise<AgentConfig.Runtime | null> { return this.withInflight(agentId, async () => this.doAcquire(agentId)); } /** * Acquire both runtime and agent from cache, incrementing its reference count. */ async acquireWithAgent( agentId: string ): Promise<{ runtime: AgentConfig.Runtime; agent?: BaseAgent } | null> { return this.withInflight(agentId, async () => this.doAcquireWithAgent(agentId) ); } private doAcquire(agentId: string): AgentConfig.Runtime | null { const now = Date.now(); this.pruneExpired(now); const entry = this.cache.get(agentId); if (!entry) { return null; } if (this.isExpired(entry, now)) { if (entry.refCount === 0) { this.cache.delete(agentId); } return null; } entry.refCount += 1; entry.lastAccess = now; this.bump(agentId, entry); return entry.runtime; } private doAcquireWithAgent( agentId: string ): { runtime: AgentConfig.Runtime; agent?: BaseAgent } | null { const now = Date.now(); this.pruneExpired(now); const entry = this.cache.get(agentId); if (!entry) { return null; } if (this.isExpired(entry, now)) { if (entry.refCount === 0) { this.cache.delete(agentId); } return null; } entry.refCount += 1; entry.lastAccess = now; this.bump(agentId, entry); return { runtime: entry.runtime, agent: entry.agent }; } /** * Release a previously acquired runtime, decrementing its reference count. */ release(agentId: string): void { const entry = this.cache.get(agentId); if (!entry) { return; } if (entry.refCount > 0) { entry.refCount -= 1; } if (entry.refCount === 0 && this.isExpired(entry, Date.now())) { this.cache.delete(agentId); void this.safeDispose(entry.agent); } } /** * Handle configuration invalidation by atomically rebuilding and swapping the cached runtime. */ async onInvalidate(agentId: string, cfgVersion: number): Promise<void> { await this.withInflight(agentId, async () => { const entry = this.cache.get(agentId); if (!entry) { this.logger.debug( `No cached runtime to invalidate for agent ${agentId}` ); return; } try { const rebuiltSeed = await entry.rebuild(); if (!rebuiltSeed) { this.logger.warn( `Runtime rebuild returned null during invalidation for agent ${agentId}; evicting entry` ); this.cache.delete(agentId); return; } if (rebuiltSeed.cfgVersion < cfgVersion) { this.logger.warn( `Rebuilt runtime version ${rebuiltSeed.cfgVersion} is older than invalidation version ${cfgVersion} for agent ${agentId}` ); } await this.installSeed(rebuiltSeed); } catch (error) { this.logger.warn( `Failed to rebuild runtime during invalidation for agent ${agentId}`, { error } ); throw error; } }); } get size(): number { return this.cache.size; } private async withInflight<T>( agentId: string, op: () => Promise<T> ): Promise<T> { const pending = this.inflight.get(agentId); if (pending) { try { await pending; } catch (error) { this.logger.debug( `Previous cache operation failed for agent ${agentId}`, { error } ); } } const task = op(); const sealed = task.then(() => undefined).catch(() => undefined); this.inflight.set(agentId, sealed); try { return await task; } finally { const current = this.inflight.get(agentId); if (current === sealed) { this.inflight.delete(agentId); } } } private async installSeed( seed: AgentRuntimeSeed, now = Date.now() ): Promise<void> { const expiresAt = this.computeExpires(now, seed.ttlMs); const existing = this.cache.get(seed.agentId); if (existing) { const oldAgent = existing.agent; const wasPinned = existing.refCount > 0; existing.userId = seed.userId; existing.cfgVersion = seed.cfgVersion; existing.runtime = seed.runtime; existing.agent = seed.agent; existing.rebuild = seed.rebuild; existing.expiresAt = expiresAt; existing.lastAccess = now; this.bump(seed.agentId, existing); if (oldAgent && !wasPinned) { void this.safeDispose(oldAgent); } } else { const entry: CacheEntry = { agentId: seed.agentId, userId: seed.userId, cfgVersion: seed.cfgVersion, runtime: seed.runtime, agent: seed.agent, rebuild: seed.rebuild, expiresAt, refCount: 0, lastAccess: now, }; this.bump(seed.agentId, entry); this.trimCache(); } } private bump(agentId: string, entry: CacheEntry): void { this.cache.delete(agentId); this.cache.set(agentId, entry); } private pruneExpired(now: number): void { for (const [agentId, entry] of this.cache) { if (entry.refCount === 0 && this.isExpired(entry, now)) { this.cache.delete(agentId); void this.safeDispose(entry.agent); } } } private trimCache(): void { if (this.cache.size <= this.maxEntries) { return; } const removable: string[] = []; for (const [agentId, entry] of this.cache) { if (entry.refCount === 0) { removable.push(agentId); } } for (const agentId of removable) { if (this.cache.size <= this.maxEntries) { break; } const removed = this.cache.get(agentId); this.cache.delete(agentId); if (removed?.agent) { void this.safeDispose(removed.agent); } } if (this.cache.size > this.maxEntries) { this.logger.warn( `Agent runtime cache still above capacity (${this.cache.size}/${this.maxEntries}) due to pinned entries` ); } } private isExpired(entry: CacheEntry, now: number): boolean { return entry.expiresAt <= now; } private computeExpires(now: number, ttlOverride?: number): number { const ttlMs = ttlOverride != null ? ttlOverride : this.defaultTtlMs; if (ttlMs <= 0) { return Number.MAX_SAFE_INTEGER; } return now + ttlMs; } private async safeDispose(agent?: BaseAgent): Promise<void> { try { if (agent && typeof agent.dispose === 'function') { await agent.dispose(); } } catch (error) { this.logger.warn(`Failed to dispose agent`, { error }); } } } export default AgentRuntimeManager;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/KasarLabs/snak'

If you have feedback or need assistance with the MCP directory API, please join our Discord server