Skip to main content
Glama

MCP Memory Service

# Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Semantic clustering system for memory organization.""" import uuid import numpy as np from typing import List, Dict, Any, Optional, Tuple from datetime import datetime from collections import Counter import re try: from sklearn.cluster import DBSCAN from sklearn.cluster import AgglomerativeClustering from sklearn.metrics import silhouette_score SKLEARN_AVAILABLE = True except ImportError: SKLEARN_AVAILABLE = False from .base import ConsolidationBase, ConsolidationConfig, MemoryCluster from ..models.memory import Memory class SemanticClusteringEngine(ConsolidationBase): """ Creates semantic clusters of related memories for organization and compression. Uses embedding-based clustering algorithms (DBSCAN, Hierarchical) to group semantically similar memories, enabling efficient compression and retrieval. """ def __init__(self, config: ConsolidationConfig): super().__init__(config) self.min_cluster_size = config.min_cluster_size self.algorithm = config.clustering_algorithm if not SKLEARN_AVAILABLE: self.logger.warning("sklearn not available, using simple clustering fallback") self.algorithm = 'simple' async def process(self, memories: List[Memory], **kwargs) -> List[MemoryCluster]: """Create semantic clusters from memories.""" if not self._validate_memories(memories) or len(memories) < self.min_cluster_size: return [] # Filter memories with embeddings memories_with_embeddings = [m for m in memories if m.embedding] if len(memories_with_embeddings) < self.min_cluster_size: self.logger.warning(f"Only {len(memories_with_embeddings)} memories have embeddings, need at least {self.min_cluster_size}") return [] # Extract embeddings matrix embeddings = np.array([m.embedding for m in memories_with_embeddings]) # Perform clustering if self.algorithm == 'dbscan': cluster_labels = await self._dbscan_clustering(embeddings) elif self.algorithm == 'hierarchical': cluster_labels = await self._hierarchical_clustering(embeddings) else: cluster_labels = await self._simple_clustering(embeddings) # Create cluster objects clusters = await self._create_clusters(memories_with_embeddings, cluster_labels, embeddings) # Filter by minimum cluster size valid_clusters = [c for c in clusters if len(c.memory_hashes) >= self.min_cluster_size] self.logger.info(f"Created {len(valid_clusters)} valid clusters from {len(memories_with_embeddings)} memories") return valid_clusters async def _dbscan_clustering(self, embeddings: np.ndarray) -> np.ndarray: """Perform DBSCAN clustering on embeddings.""" if not SKLEARN_AVAILABLE: return await self._simple_clustering(embeddings) # Adaptive epsilon based on data size and dimensionality n_samples, n_features = embeddings.shape eps = 0.5 - (n_samples / 10000) * 0.1 # Decrease eps for larger datasets eps = max(0.2, min(0.7, eps)) # Clamp between 0.2 and 0.7 min_samples = max(2, self.min_cluster_size // 2) clustering = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine') labels = clustering.fit_predict(embeddings) self.logger.debug(f"DBSCAN: eps={eps}, min_samples={min_samples}, found {len(set(labels))} clusters") return labels async def _hierarchical_clustering(self, embeddings: np.ndarray) -> np.ndarray: """Perform hierarchical clustering on embeddings.""" if not SKLEARN_AVAILABLE: return await self._simple_clustering(embeddings) # Estimate number of clusters (heuristic: sqrt of samples / 2) n_samples = embeddings.shape[0] n_clusters = max(2, min(n_samples // self.min_cluster_size, int(np.sqrt(n_samples) / 2))) clustering = AgglomerativeClustering( n_clusters=n_clusters, metric='cosine', linkage='average' ) labels = clustering.fit_predict(embeddings) self.logger.debug(f"Hierarchical: n_clusters={n_clusters}, found {len(set(labels))} clusters") return labels async def _simple_clustering(self, embeddings: np.ndarray) -> np.ndarray: """Simple fallback clustering using cosine similarity threshold.""" n_samples = embeddings.shape[0] labels = np.full(n_samples, -1) # Start with all as noise current_cluster = 0 similarity_threshold = 0.7 # Threshold for grouping for i in range(n_samples): if labels[i] != -1: # Already assigned continue # Start new cluster cluster_members = [i] labels[i] = current_cluster # Find similar memories for j in range(i + 1, n_samples): if labels[j] != -1: # Already assigned continue # Calculate cosine similarity similarity = np.dot(embeddings[i], embeddings[j]) / ( np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[j]) ) if similarity >= similarity_threshold: labels[j] = current_cluster cluster_members.append(j) # Only keep cluster if it meets minimum size if len(cluster_members) >= self.min_cluster_size: current_cluster += 1 else: # Mark as noise for member in cluster_members: labels[member] = -1 self.logger.debug(f"Simple clustering: threshold={similarity_threshold}, found {current_cluster} clusters") return labels async def _create_clusters( self, memories: List[Memory], labels: np.ndarray, embeddings: np.ndarray ) -> List[MemoryCluster]: """Create MemoryCluster objects from clustering results.""" clusters = [] unique_labels = set(labels) for label in unique_labels: if label == -1: # Skip noise points continue # Get memories in this cluster cluster_indices = np.where(labels == label)[0] cluster_memories = [memories[i] for i in cluster_indices] cluster_embeddings = embeddings[cluster_indices] if len(cluster_memories) < self.min_cluster_size: continue # Calculate centroid embedding centroid = np.mean(cluster_embeddings, axis=0) # Calculate coherence score (average cosine similarity to centroid) coherence_scores = [] for embedding in cluster_embeddings: similarity = np.dot(embedding, centroid) / ( np.linalg.norm(embedding) * np.linalg.norm(centroid) ) coherence_scores.append(similarity) coherence_score = np.mean(coherence_scores) # Extract theme keywords theme_keywords = await self._extract_theme_keywords(cluster_memories) # Create cluster cluster = MemoryCluster( cluster_id=str(uuid.uuid4()), memory_hashes=[m.content_hash for m in cluster_memories], centroid_embedding=centroid.tolist(), coherence_score=float(coherence_score), created_at=datetime.now(), theme_keywords=theme_keywords, metadata={ 'algorithm': self.algorithm, 'cluster_size': len(cluster_memories), 'average_memory_age': self._calculate_average_age(cluster_memories), 'tag_distribution': self._analyze_tag_distribution(cluster_memories) } ) clusters.append(cluster) return clusters async def _extract_theme_keywords(self, memories: List[Memory]) -> List[str]: """Extract theme keywords that represent the cluster.""" # Combine all content all_text = ' '.join([m.content for m in memories]) # Collect all tags all_tags = [] for memory in memories: all_tags.extend(memory.tags) # Count tag frequency tag_counts = Counter(all_tags) # Extract frequent words from content (simple approach) words = re.findall(r'\b[a-zA-Z]{4,}\b', all_text.lower()) word_counts = Counter(words) # Remove common stop words stop_words = { 'this', 'that', 'with', 'have', 'will', 'from', 'they', 'know', 'want', 'been', 'good', 'much', 'some', 'time', 'very', 'when', 'come', 'here', 'just', 'like', 'long', 'make', 'many', 'over', 'such', 'take', 'than', 'them', 'well', 'were', 'what', 'work', 'your', 'could', 'should', 'would', 'there', 'their', 'these', 'about', 'after', 'again', 'before', 'being', 'between', 'during', 'under', 'where', 'while', 'other', 'through', 'against' } # Filter and get top words filtered_words = {word: count for word, count in word_counts.items() if word not in stop_words and count > 1} # Combine tags and words, prioritize tags theme_keywords = [] # Add top tags (weight by frequency) for tag, count in tag_counts.most_common(5): if count > 1: # Tag appears in multiple memories theme_keywords.append(tag) # Add top words for word, count in sorted(filtered_words.items(), key=lambda x: x[1], reverse=True)[:10]: if word not in theme_keywords: theme_keywords.append(word) return theme_keywords[:10] # Limit to top 10 def _calculate_average_age(self, memories: List[Memory]) -> float: """Calculate average age of memories in days.""" now = datetime.now() ages = [] for memory in memories: if memory.created_at: created_dt = datetime.utcfromtimestamp(memory.created_at) age_days = (now - created_dt).days ages.append(age_days) elif memory.timestamp: age_days = (now - memory.timestamp).days ages.append(age_days) return sum(ages) / len(ages) if ages else 0.0 def _analyze_tag_distribution(self, memories: List[Memory]) -> Dict[str, int]: """Analyze tag distribution within the cluster.""" all_tags = [] for memory in memories: all_tags.extend(memory.tags) return dict(Counter(all_tags)) async def merge_similar_clusters( self, clusters: List[MemoryCluster], similarity_threshold: float = 0.8 ) -> List[MemoryCluster]: """Merge clusters that are very similar to each other.""" if len(clusters) <= 1: return clusters # Calculate pairwise similarities between cluster centroids centroids = np.array([cluster.centroid_embedding for cluster in clusters]) merged = [False] * len(clusters) result_clusters = [] for i, cluster1 in enumerate(clusters): if merged[i]: continue # Start with current cluster merge_group = [i] merged[i] = True # Find similar clusters to merge for j in range(i + 1, len(clusters)): if merged[j]: continue # Calculate cosine similarity between centroids similarity = np.dot(centroids[i], centroids[j]) / ( np.linalg.norm(centroids[i]) * np.linalg.norm(centroids[j]) ) if similarity >= similarity_threshold: merge_group.append(j) merged[j] = True # Create merged cluster if len(merge_group) == 1: # No merging needed result_clusters.append(clusters[i]) else: # Merge clusters merged_cluster = await self._merge_cluster_group( [clusters[idx] for idx in merge_group] ) result_clusters.append(merged_cluster) self.logger.info(f"Merged {len(clusters)} clusters into {len(result_clusters)}") return result_clusters async def _merge_cluster_group(self, clusters: List[MemoryCluster]) -> MemoryCluster: """Merge a group of similar clusters into one.""" # Combine all memory hashes all_memory_hashes = [] for cluster in clusters: all_memory_hashes.extend(cluster.memory_hashes) # Calculate new centroid (average of all centroids weighted by cluster size) total_size = sum(len(cluster.memory_hashes) for cluster in clusters) weighted_centroid = np.zeros(len(clusters[0].centroid_embedding)) for cluster in clusters: weight = len(cluster.memory_hashes) / total_size centroid = np.array(cluster.centroid_embedding) weighted_centroid += weight * centroid # Combine theme keywords all_keywords = [] for cluster in clusters: all_keywords.extend(cluster.theme_keywords) keyword_counts = Counter(all_keywords) merged_keywords = [kw for kw, count in keyword_counts.most_common(10)] # Calculate average coherence score total_memories = sum(len(cluster.memory_hashes) for cluster in clusters) weighted_coherence = sum( cluster.coherence_score * len(cluster.memory_hashes) / total_memories for cluster in clusters ) return MemoryCluster( cluster_id=str(uuid.uuid4()), memory_hashes=all_memory_hashes, centroid_embedding=weighted_centroid.tolist(), coherence_score=weighted_coherence, created_at=datetime.now(), theme_keywords=merged_keywords, metadata={ 'algorithm': f"{self.algorithm}_merged", 'cluster_size': len(all_memory_hashes), 'merged_from': [cluster.cluster_id for cluster in clusters], 'merge_timestamp': datetime.now().isoformat() } )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server