Skip to main content
Glama

MCP Memory Service

# Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Semantic compression engine for memory cluster summarization.""" import numpy as np from typing import List, Dict, Any, Optional, Tuple, Set from datetime import datetime from dataclasses import dataclass from collections import Counter import re import hashlib from .base import ConsolidationBase, ConsolidationConfig, MemoryCluster from ..models.memory import Memory @dataclass class CompressionResult: """Result of compressing a memory cluster.""" cluster_id: str compressed_memory: Memory compression_ratio: float key_concepts: List[str] temporal_span: Dict[str, Any] source_memory_count: int compression_metadata: Dict[str, Any] class SemanticCompressionEngine(ConsolidationBase): """ Creates condensed representations of memory clusters for efficient storage. This creates higher-level abstractions while preserving key information, using statistical methods and concept extraction to summarize clusters. """ def __init__(self, config: ConsolidationConfig): super().__init__(config) self.max_summary_length = config.max_summary_length self.preserve_originals = config.preserve_originals # Word importance patterns self._important_patterns = { 'technical_terms': re.compile(r'\b[A-Z][a-z]*[A-Z][a-zA-Z]*\b'), # CamelCase 'acronyms': re.compile(r'\b[A-Z]{2,}\b'), 'numbers': re.compile(r'\b\d+(?:\.\d+)?\b'), 'urls': re.compile(r'https?://[^\s]+'), 'file_paths': re.compile(r'[/\\][^\s]+'), 'quoted_text': re.compile(r'"([^"]*)"'), 'code_blocks': re.compile(r'```[\s\S]*?```|`[^`]+`') } async def process(self, clusters: List[MemoryCluster], memories: List[Memory], **kwargs) -> List[CompressionResult]: """Compress memory clusters into condensed representations.""" if not clusters: return [] # Create memory hash lookup memory_lookup = {m.content_hash: m for m in memories} compression_results = [] for cluster in clusters: # Get memories for this cluster cluster_memories = [] for hash_val in cluster.memory_hashes: if hash_val in memory_lookup: cluster_memories.append(memory_lookup[hash_val]) if not cluster_memories: continue # Compress the cluster result = await self._compress_cluster(cluster, cluster_memories) if result: compression_results.append(result) self.logger.info(f"Compressed {len(compression_results)} clusters") return compression_results async def _compress_cluster(self, cluster: MemoryCluster, memories: List[Memory]) -> Optional[CompressionResult]: """Compress a single memory cluster.""" if len(memories) < 2: return None # Extract key concepts and themes key_concepts = await self._extract_key_concepts(memories, cluster.theme_keywords) # Generate thematic summary summary = await self._generate_thematic_summary(memories, key_concepts) # Calculate temporal information temporal_span = self._calculate_temporal_span(memories) # Aggregate tags and metadata aggregated_tags = self._aggregate_tags(memories) aggregated_metadata = self._aggregate_metadata(memories) # Create compressed memory embedding (cluster centroid) compressed_embedding = cluster.centroid_embedding # Calculate compression ratio original_size = sum(len(m.content) for m in memories) compressed_size = len(summary) compression_ratio = compressed_size / original_size if original_size > 0 else 0 # Create content hash for the compressed memory content_hash = hashlib.sha256(summary.encode()).hexdigest() # Create compressed memory object compressed_memory = Memory( content=summary, content_hash=content_hash, tags=aggregated_tags, memory_type='compressed_cluster', metadata={ **aggregated_metadata, 'cluster_id': cluster.cluster_id, 'compression_date': datetime.now().isoformat(), 'source_memory_count': len(memories), 'compression_ratio': compression_ratio, 'key_concepts': key_concepts, 'temporal_span': temporal_span, 'theme_keywords': cluster.theme_keywords, 'coherence_score': cluster.coherence_score, 'compression_version': '1.0' }, embedding=compressed_embedding, created_at=datetime.now().timestamp(), created_at_iso=datetime.now().isoformat() + 'Z' ) return CompressionResult( cluster_id=cluster.cluster_id, compressed_memory=compressed_memory, compression_ratio=compression_ratio, key_concepts=key_concepts, temporal_span=temporal_span, source_memory_count=len(memories), compression_metadata={ 'algorithm': 'semantic_compression_v1', 'original_total_length': original_size, 'compressed_length': compressed_size, 'concept_count': len(key_concepts), 'theme_keywords': cluster.theme_keywords } ) async def _extract_key_concepts(self, memories: List[Memory], theme_keywords: List[str]) -> List[str]: """Extract key concepts from cluster memories.""" all_text = ' '.join([m.content for m in memories]) concepts = set() # Add theme keywords as primary concepts concepts.update(theme_keywords) # Extract important patterns for pattern_name, pattern in self._important_patterns.items(): matches = pattern.findall(all_text) if pattern_name == 'quoted_text': # For quoted text, add the content inside quotes concepts.update(matches) else: concepts.update(matches) # Extract capitalized terms (potential proper nouns) capitalized = re.findall(r'\b[A-Z][a-z]{2,}\b', all_text) concepts.update(capitalized) # Extract frequent meaningful words words = re.findall(r'\b[a-zA-Z]{4,}\b', all_text.lower()) word_counts = Counter(words) # Filter out common words stop_words = { 'this', 'that', 'with', 'have', 'will', 'from', 'they', 'know', 'want', 'been', 'good', 'much', 'some', 'time', 'very', 'when', 'come', 'here', 'just', 'like', 'long', 'make', 'many', 'over', 'such', 'take', 'than', 'them', 'well', 'were', 'what', 'work', 'your', 'could', 'should', 'would', 'there', 'their', 'these', 'about', 'after', 'again', 'before', 'being', 'between', 'during', 'under', 'where', 'while', 'other', 'through', 'against', 'without' } # Add frequent non-stop words for word, count in word_counts.most_common(20): if word not in stop_words and count >= 2: # Must appear at least twice concepts.add(word) # Convert to list and limit concept_list = list(concepts) concept_list.sort(key=lambda x: word_counts.get(x.lower(), 0), reverse=True) return concept_list[:15] # Limit to top 15 concepts async def _generate_thematic_summary(self, memories: List[Memory], key_concepts: List[str]) -> str: """Generate a thematic summary of the memory cluster.""" # Analyze the memories to identify common themes and patterns all_content = [m.content for m in memories] # Extract representative sentences that contain key concepts representative_sentences = [] concept_coverage = set() for memory in memories: sentences = self._split_into_sentences(memory.content) for sentence in sentences: sentence_concepts = set() sentence_lower = sentence.lower() # Check which concepts this sentence covers for concept in key_concepts: if concept.lower() in sentence_lower: sentence_concepts.add(concept) # If this sentence covers new concepts, include it new_concepts = sentence_concepts - concept_coverage if new_concepts and len(sentence) > 20: # Minimum sentence length representative_sentences.append({ 'sentence': sentence.strip(), 'concepts': sentence_concepts, 'new_concepts': new_concepts, 'score': len(new_concepts) + len(sentence_concepts) * 0.1 }) concept_coverage.update(new_concepts) # Sort by score and select best sentences representative_sentences.sort(key=lambda x: x['score'], reverse=True) # Build summary summary_parts = [] # Add cluster overview memory_count = len(memories) time_span = self._calculate_temporal_span(memories) concept_str = ', '.join(key_concepts[:5]) overview = f"Cluster of {memory_count} related memories about {concept_str}" if time_span['span_days'] > 0: overview += f" spanning {time_span['span_days']} days" overview += "." summary_parts.append(overview) # Add key insights from representative sentences used_length = len(overview) remaining_length = self.max_summary_length - used_length - 100 # Reserve space for conclusion for sent_info in representative_sentences: sentence = sent_info['sentence'] if used_length + len(sentence) < remaining_length: summary_parts.append(sentence) used_length += len(sentence) else: break # Add concept summary if space allows if used_length < self.max_summary_length - 50: concept_summary = f"Key concepts: {', '.join(key_concepts[:8])}." if used_length + len(concept_summary) < self.max_summary_length: summary_parts.append(concept_summary) summary = ' '.join(summary_parts) # Truncate if still too long if len(summary) > self.max_summary_length: summary = summary[:self.max_summary_length - 3] + '...' return summary def _split_into_sentences(self, text: str) -> List[str]: """Split text into sentences using simple heuristics.""" # Simple sentence splitting (could be improved with NLTK) sentences = re.split(r'[.!?]+\s+', text) # Filter out very short sentences and clean up clean_sentences = [] for sentence in sentences: sentence = sentence.strip() if len(sentence) > 10: # Minimum sentence length clean_sentences.append(sentence) return clean_sentences def _calculate_temporal_span(self, memories: List[Memory]) -> Dict[str, Any]: """Calculate temporal information for the memory cluster.""" timestamps = [] for memory in memories: if memory.created_at: timestamps.append(memory.created_at) elif memory.timestamp: timestamps.append(memory.timestamp.timestamp()) if not timestamps: return { 'start_time': None, 'end_time': None, 'span_days': 0, 'span_description': 'unknown' } start_time = min(timestamps) end_time = max(timestamps) span_seconds = end_time - start_time span_days = int(span_seconds / (24 * 3600)) # Create human-readable span description if span_days == 0: span_description = 'same day' elif span_days < 7: span_description = f'{span_days} days' elif span_days < 30: weeks = span_days // 7 span_description = f'{weeks} week{"s" if weeks > 1 else ""}' elif span_days < 365: months = span_days // 30 span_description = f'{months} month{"s" if months > 1 else ""}' else: years = span_days // 365 span_description = f'{years} year{"s" if years > 1 else ""}' return { 'start_time': start_time, 'end_time': end_time, 'span_days': span_days, 'span_description': span_description, 'start_iso': datetime.utcfromtimestamp(start_time).isoformat() + 'Z', 'end_iso': datetime.utcfromtimestamp(end_time).isoformat() + 'Z' } def _aggregate_tags(self, memories: List[Memory]) -> List[str]: """Aggregate tags from cluster memories.""" all_tags = [] for memory in memories: all_tags.extend(memory.tags) # Count tag frequency tag_counts = Counter(all_tags) # Include tags that appear in multiple memories or are important aggregated_tags = ['cluster', 'compressed'] # Always include these for tag, count in tag_counts.most_common(): if count > 1 or tag in {'important', 'critical', 'reference', 'project'}: if tag not in aggregated_tags: aggregated_tags.append(tag) return aggregated_tags[:10] # Limit to 10 tags def _aggregate_metadata(self, memories: List[Memory]) -> Dict[str, Any]: """Aggregate metadata from cluster memories.""" aggregated = { 'source_memory_hashes': [m.content_hash for m in memories] } # Collect unique metadata keys and their values all_metadata = {} for memory in memories: for key, value in memory.metadata.items(): if key not in all_metadata: all_metadata[key] = [] all_metadata[key].append(value) # Aggregate metadata intelligently for key, values in all_metadata.items(): unique_values = list(set(str(v) for v in values)) if len(unique_values) == 1: # All memories have the same value aggregated[f'common_{key}'] = unique_values[0] elif len(unique_values) <= 5: # Small number of unique values, list them aggregated[f'varied_{key}'] = unique_values else: # Many unique values, just note the variety aggregated[f'{key}_variety_count'] = len(unique_values) return aggregated async def estimate_compression_benefit( self, clusters: List[MemoryCluster], memories: List[Memory] ) -> Dict[str, Any]: """Estimate the benefit of compressing given clusters.""" memory_lookup = {m.content_hash: m for m in memories} total_original_size = 0 total_compressed_size = 0 compressible_clusters = 0 for cluster in clusters: cluster_memories = [memory_lookup[h] for h in cluster.memory_hashes if h in memory_lookup] if len(cluster_memories) < 2: continue compressible_clusters += 1 original_size = sum(len(m.content) for m in cluster_memories) # Estimate compressed size (rough approximation) estimated_compressed_size = min(self.max_summary_length, original_size // 3) total_original_size += original_size total_compressed_size += estimated_compressed_size overall_ratio = total_compressed_size / total_original_size if total_original_size > 0 else 1.0 savings = total_original_size - total_compressed_size return { 'compressible_clusters': compressible_clusters, 'total_original_size': total_original_size, 'estimated_compressed_size': total_compressed_size, 'compression_ratio': overall_ratio, 'estimated_savings_bytes': savings, 'estimated_savings_percent': (1 - overall_ratio) * 100 }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server