Skip to main content
Glama
semantic_merge.py10.9 kB
"""Semantic Merge agent for intelligent memory consolidation. This agent processes merge requests from ClusterDetector, combining similar memories while preserving all unique information. From contracts/agent-api.md: Scan Contract: - MUST only process from beads issues (not free scan) - MUST filter for consolidation:merge label Process Contract: - MUST preserve all unique entities - MUST preserve all unique content - MUST create consolidated_from relations - MUST archive (not delete) originals - MUST close beads issue on success """ from __future__ import annotations import json import logging import time import uuid from typing import TYPE_CHECKING, Any from cortexgraph.agents.base import ConsolidationAgent from cortexgraph.agents.beads_integration import ( claim_issue, close_issue, query_consolidation_issues, ) from cortexgraph.agents.models import MergeResult from cortexgraph.core.consolidation import ( calculate_merged_strength, merge_content_smart, merge_entities, merge_metadata, ) from cortexgraph.storage.models import Memory, MemoryStatus, Relation if TYPE_CHECKING: from cortexgraph.storage.jsonl_storage import JSONLStorage def get_storage() -> JSONLStorage: """Get storage instance. Separated for testability.""" from cortexgraph.context import get_db return get_db() logger = logging.getLogger(__name__) class SemanticMerge(ConsolidationAgent[MergeResult]): """Combines clustered memories intelligently. Unlike DecayAnalyzer and ClusterDetector which scan storage directly, SemanticMerge reads from beads issues created by ClusterDetector. This queue-based approach allows for human oversight and audit trails. Example: >>> merge = SemanticMerge(dry_run=True) >>> issue_ids = merge.scan() # Returns beads issue IDs >>> for issue_id in issue_ids: ... result = merge.process_item(issue_id) ... print(f"Merged {len(result.source_ids)} into {result.new_memory_id}") """ def __init__( self, dry_run: bool = False, rate_limit: int = 100, ) -> None: """Initialize SemanticMerge agent. Args: dry_run: If True, preview merges without making changes rate_limit: Max operations per minute """ super().__init__(dry_run=dry_run, rate_limit=rate_limit) self._storage: JSONLStorage | None = None self._pending_issues: dict[str, dict[str, Any]] = {} # Cache issue data by ID @property def storage(self) -> JSONLStorage: """Get storage instance (lazy initialization).""" if self._storage is None: self._storage = get_storage() return self._storage def scan(self) -> list[str]: """Scan beads for merge issues from ClusterDetector. Returns: List of beads issue IDs to process Contract: - MUST only process from beads issues (not free scan) - MUST filter for consolidation:merge label - MUST NOT modify any data """ # Query beads for merge issues issues = query_consolidation_issues(agent="merge", status="open") # Cache issue data for process_item self._pending_issues = {issue["id"]: issue for issue in issues} # Return issue IDs return list(self._pending_issues.keys()) def process_item(self, issue_id: str) -> MergeResult: """Process a single merge request from beads. Args: issue_id: Beads issue ID containing merge request Returns: MergeResult with merge outcome Contract: - MUST preserve all unique entities - MUST preserve all unique content - MUST create consolidated_from relations - MUST archive (not delete) originals - MUST close beads issue on success - If dry_run=True, MUST NOT modify any data Raises: ValueError: If issue_id is invalid or memories not found RuntimeError: If merge fails """ # Get issue data issue = self._pending_issues.get(issue_id) if not issue: # Try to fetch from beads issues = query_consolidation_issues(agent="merge", status="open") for i in issues: if i["id"] == issue_id: issue = i break if not issue: raise ValueError(f"Unknown beads issue: {issue_id}") # Parse issue notes for memory IDs try: notes = json.loads(issue.get("notes", "{}")) memory_ids = notes.get("memory_ids", []) cluster_id = notes.get("cluster_id", "unknown") except json.JSONDecodeError as e: raise ValueError(f"Invalid notes JSON in issue {issue_id}") from e if len(memory_ids) < 2: raise ValueError(f"Issue {issue_id} has fewer than 2 memories to merge") # Fetch source memories # Check memories dict first (for tests), then try connected storage methods source_memories = [] for mem_id in memory_ids: mem = None # First try direct dict access (for mocks/tests) if hasattr(self.storage, "memories") and isinstance(self.storage.memories, dict): mem = self.storage.memories.get(mem_id) # Then try storage methods (for real storage) if mem is None: try: if hasattr(self.storage, "get_memory"): mem = self.storage.get_memory(mem_id) except RuntimeError: # Storage not connected - already checked dict above pass if mem is None: raise ValueError(f"Memory not found: {mem_id}") source_memories.append(mem) # Collect all unique entities all_entities: set[str] = set() for mem in source_memories: entities = getattr(mem, "entities", []) or [] all_entities.update(entities) # Collect all unique tags all_tags: set[str] = set() for mem in source_memories: tags = getattr(mem, "tags", []) or [] all_tags.update(tags) # Merge content (deduplicate while preserving unique info) content_parts = [] for mem in source_memories: content = getattr(mem, "content", "") if content and content not in content_parts: content_parts.append(content) merged_content = "\n\n---\n\n".join(content_parts) # Generate new memory ID new_memory_id = str(uuid.uuid4()) # Create content diff summary content_diff = ( f"Merged {len(source_memories)} memories about {', '.join(list(all_entities)[:3])}" ) if self.dry_run: # Dry run - don't modify anything logger.info(f"[DRY RUN] Would merge {memory_ids} into {new_memory_id}") return MergeResult( new_memory_id=new_memory_id, source_ids=memory_ids, relation_ids=[], # No relations created in dry run content_diff=content_diff, entities_preserved=len(all_entities), success=True, beads_issue_id=issue_id, ) # === LIVE MODE: Actually perform the merge === # Claim the issue if not claim_issue(issue_id): raise RuntimeError(f"Failed to claim issue {issue_id}") try: # Get cohesion from issue notes (for strength calculation) cohesion = notes.get("cohesion", 0.8) # Use consolidation module for intelligent merging (T053) merged_content = merge_content_smart(source_memories) merged_meta = merge_metadata(source_memories) merged_entities_list = merge_entities(source_memories) merged_strength = calculate_merged_strength(source_memories, cohesion) # Calculate timestamps: earliest created, latest used earliest_created = min( getattr(m, "created_at", int(time.time())) for m in source_memories ) latest_used = max(getattr(m, "last_used", int(time.time())) for m in source_memories) total_use_count = sum(getattr(m, "use_count", 0) for m in source_memories) # Create the merged memory merged_memory = Memory( id=new_memory_id, content=merged_content, meta=merged_meta, entities=merged_entities_list, created_at=earliest_created, last_used=latest_used, use_count=total_use_count, strength=merged_strength, status=MemoryStatus.ACTIVE, ) # Save merged memory self.storage.save_memory(merged_memory) logger.info(f"Created merged memory {new_memory_id}") # Create consolidated_from relations (T054) relation_ids = [] now = int(time.time()) for orig_id in memory_ids: relation = Relation( id=str(uuid.uuid4()), from_memory_id=new_memory_id, to_memory_id=orig_id, relation_type="consolidated_from", strength=1.0, created_at=now, metadata={ "cluster_id": cluster_id, "cohesion": cohesion, "beads_issue_id": issue_id, }, ) self.storage.create_relation(relation) relation_ids.append(relation.id) logger.info(f"Created {len(relation_ids)} consolidated_from relations") # Archive original memories (T055) - status change, not delete for orig_id in memory_ids: self.storage.update_memory(orig_id, status=MemoryStatus.ARCHIVED) logger.info(f"Archived {len(memory_ids)} original memories") # Close the beads issue (T056) close_issue(issue_id, f"Merged into {new_memory_id}") return MergeResult( new_memory_id=new_memory_id, source_ids=memory_ids, relation_ids=relation_ids, content_diff=content_diff, entities_preserved=len(all_entities), success=True, beads_issue_id=issue_id, ) except Exception as e: logger.error(f"Merge failed for {issue_id}: {e}") raise RuntimeError(f"Merge failed: {e}") from e

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server