temporal.py•7.8 kB
"""
Temporal Domain for time-aware memory processing.
The Temporal Domain is responsible for:
- Managing memory decay and importance over time
- Temporal indexing and sequencing
- Chronological relationship tracking
- Time-based memory consolidation
- Recency effects in retrieval
"""
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List
from loguru import logger
from memory_mcp.domains.persistence import PersistenceDomain
class TemporalDomain:
    """
    Manages time-aware memory processing.
    
    This domain handles temporal aspects of memory, including
    decay over time, recency-based relevance, and time-based
    consolidation of memories.
    """
    
    def __init__(self, config: Dict[str, Any], persistence_domain: PersistenceDomain) -> None:
        """
        Initialize the temporal domain.
        
        Args:
            config: Configuration dictionary
            persistence_domain: Reference to the persistence domain
        """
        self.config = config
        self.persistence_domain = persistence_domain
        self.last_consolidation = datetime.now()
    
    async def initialize(self) -> None:
        """Initialize the temporal domain."""
        logger.info("Initializing Temporal Domain")
        
        # Schedule initial consolidation if needed
        consolidation_interval = self.config["memory"].get("consolidation_interval_hours", 24)
        self.consolidation_interval = timedelta(hours=consolidation_interval)
        
        # Get last consolidation time from persistence
        last_consolidation = await self.persistence_domain.get_metadata("last_consolidation")
        if last_consolidation:
            try:
                self.last_consolidation = datetime.fromisoformat(last_consolidation)
            except ValueError:
                logger.warning(f"Invalid last_consolidation timestamp: {last_consolidation}")
                self.last_consolidation = datetime.now()
        
        # Check if consolidation is due
        if datetime.now() - self.last_consolidation > self.consolidation_interval:
            logger.info("Consolidation is due. Will run after initialization.")
            # Note: We don't run consolidation here to avoid slow startup
            # It will run on the next memory operation
    
    async def process_new_memory(self, memory: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process a new memory with temporal information.
        
        Args:
            memory: The memory to process
            
        Returns:
            Processed memory with temporal information
        """
        # Add timestamps
        now = datetime.now().isoformat()
        memory["created_at"] = now
        memory["last_accessed"] = now
        memory["last_modified"] = now
        memory["access_count"] = 0
        
        return memory
    
    async def update_memory_access(self, memory_id: str) -> None:
        """
        Update the access time for a memory.
        
        Args:
            memory_id: ID of the memory to update
        """
        # Get the memory
        memory = await self.persistence_domain.get_memory(memory_id)
        if not memory:
            logger.warning(f"Memory {memory_id} not found for access update")
            return
        
        # Update access time and count
        memory["last_accessed"] = datetime.now().isoformat()
        memory["access_count"] = memory.get("access_count", 0) + 1
        
        # Save the updated memory
        current_tier = await self.persistence_domain.get_memory_tier(memory_id)
        await self.persistence_domain.update_memory(memory, current_tier)
        
        # Check if consolidation is due
        await self._check_consolidation()
    
    async def update_memory_modification(self, memory: Dict[str, Any]) -> Dict[str, Any]:
        """
        Update the modification time for a memory.
        
        Args:
            memory: The memory to update
            
        Returns:
            Updated memory
        """
        memory["last_modified"] = datetime.now().isoformat()
        return memory
    
    async def adjust_memory_relevance(
        self,
        memories: List[Dict[str, Any]],
        query: str
    ) -> List[Dict[str, Any]]:
        """
        Adjust memory relevance based on temporal factors.
        
        Args:
            memories: List of memories to adjust
            query: The query string
            
        Returns:
            Adjusted memories
        """
        # Weight configuration
        recency_weight = self.config["memory"].get("retrieval", {}).get("recency_weight", 0.3)
        importance_weight = self.config["memory"].get("retrieval", {}).get("importance_weight", 0.7)
        
        now = datetime.now()
        adjusted_memories = []
        
        for memory in memories:
            # Calculate recency score
            last_accessed_str = memory.get("last_accessed", memory.get("created_at"))
            try:
                last_accessed = datetime.fromisoformat(last_accessed_str)
                days_since_access = (now - last_accessed).days
                # Recency score: 1.0 for just accessed, decreasing with time
                recency_score = 1.0 / (1.0 + days_since_access)
            except (ValueError, TypeError):
                recency_score = 0.5  # Default if timestamp is invalid
            
            # Get importance score
            importance_score = memory.get("importance", 0.5)
            
            # Get similarity score (from semantic search)
            similarity_score = memory.get("similarity", 0.5)
            
            # Combine scores
            combined_score = (
                similarity_score * (1.0 - recency_weight - importance_weight) +
                recency_score * recency_weight +
                importance_score * importance_weight
            )
            
            # Update memory with combined score
            memory["adjusted_score"] = combined_score
            memory["recency_score"] = recency_score
            
            adjusted_memories.append(memory)
        
        # Sort by combined score
        adjusted_memories.sort(key=lambda m: m["adjusted_score"], reverse=True)
        
        return adjusted_memories
    
    async def _check_consolidation(self) -> None:
        """Check if memory consolidation is due and run if needed."""
        now = datetime.now()
        
        # Check if enough time has passed since last consolidation
        if now - self.last_consolidation > self.consolidation_interval:
            logger.info("Running memory consolidation")
            await self._consolidate_memories()
            
            # Update last consolidation time
            self.last_consolidation = now
            await self.persistence_domain.set_metadata("last_consolidation", now.isoformat())
    
    async def _consolidate_memories(self) -> None:
        """
        Consolidate memories based on temporal patterns.
        
        This includes:
        - Moving old short-term memories to long-term
        - Archiving rarely accessed long-term memories
        - Adjusting importance scores based on access patterns
        """
        # Placeholder for consolidation logic
        logger.info("Memory consolidation not yet implemented")
    
    async def get_stats(self) -> Dict[str, Any]:
        """
        Get statistics about the temporal domain.
        
        Returns:
            Temporal domain statistics
        """
        return {
            "last_consolidation": self.last_consolidation.isoformat(),
            "next_consolidation": (self.last_consolidation + self.consolidation_interval).isoformat(),
            "status": "initialized"
        }