Skip to main content
Glama
decay.py6.51 kB
"""Temporal decay functions for memory scoring. Supports multiple decay models: - power_law (default): (1 + dt/t0)^(-alpha) - exponential: exp(-lambda * dt) - two_component: w * exp(-lambda_fast*dt) + (1-w) * exp(-lambda_slow*dt) """ import math import time # Re-export math utilities for backward compatibility with existing imports # These functions were refactored to math_utils.py but are imported here to maintain # the public API (used by core/__init__.py and server.py) from .math_utils import calculate_decay_lambda, calculate_halflife # noqa: F401 # Constants SECONDS_PER_DAY = 86400.0 SECONDS_PER_HOUR = 3600.0 BISECTION_MAX_YEARS = 10.0 BISECTION_MAX_SECONDS = 3650 * SECONDS_PER_DAY # ~10 years BISECTION_EXPANSION_ITERATIONS = 32 BISECTION_PRECISION_ITERATIONS = 60 def calculate_score( use_count: int, last_used: int, strength: float, now: int | None = None, lambda_: float | None = None, beta: float | None = None, ) -> float: """Calculate current score using the configured decay model. If `lambda_` is provided, uses exponential decay explicitly (for backward compatibility and tests). Otherwise, branches by `config.decay_model`. """ from ..config import get_config if now is None: now = int(time.time()) config = get_config() if lambda_ is None: lambda_ = config.decay_lambda if beta is None: beta = config.decay_beta time_delta = max(0, now - last_used) # Calculate components # Add 1 to use_count so new memories (use_count=0) don't get zero score # This gives new memories a grace period before decay dominates use_component = math.pow(use_count + 1, beta) # If lambda_ explicitly provided, force exponential path if lambda_ is not None and (getattr(config, "decay_model", "power_law") != "exponential"): decay_component = math.exp(-lambda_ * time_delta) else: model = getattr(config, "decay_model", "power_law") if model == "power_law": # Derive t0 from alpha and target half-life alpha = config.pl_alpha t_half = config.pl_halflife_days * SECONDS_PER_DAY # t0 = H / (2^(1/alpha) - 1) denom = math.pow(2.0, 1.0 / alpha) - 1.0 t0 = t_half / denom if denom > 0 else t_half decay_component = math.pow(1.0 + (time_delta / t0), -alpha) elif model == "two_component": w = config.tc_weight_fast decay_component = w * math.exp(-config.tc_lambda_fast * time_delta) + ( 1.0 - w ) * math.exp(-config.tc_lambda_slow * time_delta) else: # exponential decay_component = math.exp(-lambda_ * time_delta) return use_component * decay_component * strength def time_until_threshold( current_score: float, threshold: float, last_used: int, lambda_: float | None = None, ) -> float | None: """Calculate seconds until score drops below threshold. If `lambda_` is provided, uses exponential closed-form. Otherwise, branches by configured decay model. For two-component decay, uses numeric bisection. """ from ..config import get_config if current_score <= threshold: return None config = get_config() now = int(time.time()) if lambda_ is not None or getattr(config, "decay_model", "power_law") == "exponential": if lambda_ is None: lambda_ = config.decay_lambda # threshold = current_score * exp(-lambda * t) -> t = -ln(threshold/current)/lambda time_delta = -math.log(threshold / current_score) / lambda_ elapsed = now - last_used remaining = time_delta - elapsed return max(0, remaining) # Factor out K * f(dt). Let f be decay function; current_score = K * f(elapsed). elapsed = now - last_used def f(dt: float) -> float: model = getattr(config, "decay_model", "power_law") if model == "power_law": alpha = config.pl_alpha t_half = config.pl_halflife_days * SECONDS_PER_DAY denom = math.pow(2.0, 1.0 / alpha) - 1.0 t0 = t_half / denom if denom > 0 else t_half return math.pow(1.0 + (dt / t0), -alpha) elif model == "two_component": return config.tc_weight_fast * math.exp(-config.tc_lambda_fast * dt) + ( 1.0 - config.tc_weight_fast ) * math.exp(-config.tc_lambda_slow * dt) else: return math.exp(-config.decay_lambda * dt) # We want t such that K*f(elapsed + t) = threshold; K = current_score / f(elapsed) # => f(elapsed + t) = threshold * f(elapsed) / current_score f_elapsed = f(float(elapsed)) target = (threshold * f_elapsed) / current_score # If target >= f_elapsed, then threshold already reached or below (shouldn't happen due to early return) # Bisection search for t >= 0 with upper bound expansion lo = 0.0 hi = SECONDS_PER_HOUR # start with 1 hour # Expand until f(elapsed + hi) <= target or cap for _ in range(BISECTION_EXPANSION_ITERATIONS): if f(elapsed + hi) <= target: break hi *= 2.0 if hi > BISECTION_MAX_SECONDS: # cap ~10 years break # If even at very large hi we haven't crossed, return None (effectively never) if f(elapsed + hi) > target: return None for _ in range(BISECTION_PRECISION_ITERATIONS): # high-precision bisection mid = (lo + hi) / 2.0 if f(elapsed + mid) <= target: hi = mid else: lo = mid remaining = hi return max(0.0, remaining) def project_score_at_time( use_count: int, last_used: int, strength: float, target_time: int, lambda_: float | None = None, beta: float | None = None, ) -> float: """ Project what the memory score will be at a future time. Args: use_count: Number of times memory has been accessed last_used: Unix timestamp when memory was last used strength: Base strength multiplier target_time: Unix timestamp to project to lambda_: Decay constant (defaults to config value) beta: Use count exponent (defaults to config value) Returns: Projected score at target_time """ return calculate_score( use_count=use_count, last_used=last_used, strength=strength, now=target_time, lambda_=lambda_, beta=beta, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server