Skip to main content
Glama
clustering.py12.8 kB
"""Clustering logic for memory consolidation.""" import math import re import uuid from collections import Counter from ..storage.models import Cluster, ClusterConfig, Memory def cosine_similarity(vec1: list[float], vec2: list[float]) -> float: """ Calculate cosine similarity between two vectors. Args: vec1: First vector vec2: Second vector Returns: Cosine similarity (0 to 1) """ if len(vec1) != len(vec2): raise ValueError("Vectors must have the same length") dot_product = sum(a * b for a, b in zip(vec1, vec2, strict=False)) mag1 = math.sqrt(sum(a * a for a in vec1)) mag2 = math.sqrt(sum(b * b for b in vec2)) if mag1 == 0 or mag2 == 0: return 0.0 return dot_product / (mag1 * mag2) def calculate_centroid(embeddings: list[list[float]]) -> list[float]: """ Calculate the centroid (average) of multiple embedding vectors. Args: embeddings: List of embedding vectors Returns: Centroid vector """ if not embeddings: return [] dim = len(embeddings[0]) centroid = [0.0] * dim for embed in embeddings: for i, val in enumerate(embed): centroid[i] += val for i in range(dim): centroid[i] /= len(embeddings) return centroid def tokenize_text(text: str) -> list[str]: """ Tokenize text into words for TF-IDF calculation. Args: text: Input text Returns: List of lowercase tokens """ # Remove punctuation and split on whitespace text = text.lower() text = re.sub(r"[^\w\s]", " ", text) tokens = text.split() return [t for t in tokens if len(t) > 2] # Filter out very short tokens def compute_tf(tokens: list[str]) -> dict[str, float]: """ Compute term frequency for tokens. Args: tokens: List of tokens Returns: Dictionary mapping term to frequency """ if not tokens: return {} counter = Counter(tokens) total = len(tokens) return {term: count / total for term, count in counter.items()} def compute_idf(documents: list[list[str]]) -> dict[str, float]: """ Compute inverse document frequency across documents. Args: documents: List of tokenized documents Returns: Dictionary mapping term to IDF score """ if not documents: return {} num_docs = len(documents) doc_freq: dict[str, int] = Counter() for doc in documents: unique_terms = set(doc) for term in unique_terms: doc_freq[term] += 1 return {term: math.log(num_docs / freq) for term, freq in doc_freq.items()} def tfidf_similarity(text1: str, text2: str, idf_scores: dict[str, float] | None = None) -> float: """ Calculate TF-IDF cosine similarity between two texts. Args: text1: First text text2: Second text idf_scores: Pre-computed IDF scores (optional, computed if not provided) Returns: Cosine similarity (0 to 1) """ tokens1 = tokenize_text(text1) tokens2 = tokenize_text(text2) if not tokens1 or not tokens2: return 0.0 # Compute TF for each document tf1 = compute_tf(tokens1) tf2 = compute_tf(tokens2) # If IDF scores not provided, compute them from these two documents if idf_scores is None: idf_scores = compute_idf([tokens1, tokens2]) # Get all unique terms all_terms = set(tf1.keys()) | set(tf2.keys()) # Compute TF-IDF vectors vec1 = [tf1.get(term, 0) * idf_scores.get(term, 0) for term in all_terms] vec2 = [tf2.get(term, 0) * idf_scores.get(term, 0) for term in all_terms] # Compute cosine similarity return cosine_similarity(vec1, vec2) def jaccard_similarity(tokens1: set[str], tokens2: set[str]) -> float: """ Calculate Jaccard similarity between two sets of tokens. Args: tokens1: First set of tokens tokens2: Second set of tokens Returns: Jaccard similarity (0 to 1) """ if not tokens1 or not tokens2: return 0.0 intersection = tokens1 & tokens2 union = tokens1 | tokens2 if not union: return 0.0 return len(intersection) / len(union) def text_similarity(text1: str, text2: str) -> float: """ Calculate text similarity using Jaccard similarity on tokens (fallback when embeddings unavailable). This method works well for duplicate detection and similar content clustering, avoiding the TF-IDF edge case where identical documents get 0 similarity. Args: text1: First text text2: Second text Returns: Similarity score (0 to 1) """ tokens1 = set(tokenize_text(text1)) tokens2 = set(tokenize_text(text2)) return jaccard_similarity(tokens1, tokens2) def cluster_memories_simple(memories: list[Memory], config: ClusterConfig) -> list[Cluster]: """ Cluster memories using simple similarity-based grouping. Uses single-linkage clustering with cosine similarity threshold. Automatically falls back to Jaccard text similarity when embeddings are unavailable. Args: memories: List of memories (with or without embeddings) config: Clustering configuration Returns: List of clusters """ if not memories: return [] # Detect if we should use embeddings or text similarity # Use embeddings ONLY if ALL memories have embeddings, otherwise fall back to text similarity memories_with_embed = [m for m in memories if m.embed is not None] use_embeddings = len(memories_with_embed) == len(memories) and len(memories_with_embed) > 0 # Always use all memories (fallback handles mixed cases gracefully) active_memories = memories if not active_memories: return [] # Early termination: if we have too few memories, return individual clusters if len(active_memories) < config.min_cluster_size: return [] # Track which memories are in which cluster memory_to_cluster: dict[str, int] = {} clusters: list[list[Memory]] = [] # Cache for similarity calculations to avoid recomputation similarity_cache: dict[tuple[str, str], float] = {} for memory in active_memories: # Find clusters similar to this memory similar_clusters = [] for cluster_idx, cluster_memories in enumerate(clusters): # Early termination: skip if cluster is already at max size if len(cluster_memories) >= config.max_cluster_size: continue # Check if memory is similar to any in this cluster for cluster_mem in cluster_memories: # Use cache for similarity calculation cache_key: tuple[str, str] = ( min(memory.id, cluster_mem.id), max(memory.id, cluster_mem.id), ) if cache_key not in similarity_cache: # Calculate similarity using appropriate method if use_embeddings: if memory.embed is not None and cluster_mem.embed is not None: similarity_cache[cache_key] = cosine_similarity( memory.embed, cluster_mem.embed ) else: similarity_cache[cache_key] = 0.0 else: # Fallback to TF-IDF text similarity similarity_cache[cache_key] = text_similarity( memory.content, cluster_mem.content ) similarity = similarity_cache[cache_key] if similarity >= config.threshold: similar_clusters.append(cluster_idx) break # Found a match in this cluster if not similar_clusters: # Start new cluster clusters.append([memory]) memory_to_cluster[memory.id] = len(clusters) - 1 else: # Merge into first similar cluster # (and potentially merge similar clusters together) target_idx = similar_clusters[0] clusters[target_idx].append(memory) memory_to_cluster[memory.id] = target_idx # Merge other similar clusters into target for idx in sorted(similar_clusters[1:], reverse=True): clusters[target_idx].extend(clusters[idx]) for mem in clusters[idx]: memory_to_cluster[mem.id] = target_idx del clusters[idx] # Convert to Cluster objects result_clusters = [] for cluster_memories in clusters: # Filter by size constraints if len(cluster_memories) < config.min_cluster_size: continue if len(cluster_memories) > config.max_cluster_size: # Split large clusters (simplified: just take first max_size) cluster_memories = cluster_memories[: config.max_cluster_size] # Calculate centroid and cohesion if use_embeddings: embeddings = [m.embed for m in cluster_memories if m.embed is not None] centroid = calculate_centroid(embeddings) if embeddings else None # Calculate average pairwise similarity (cohesion) if len(embeddings) > 1: similarities = [] for i in range(len(embeddings)): for j in range(i + 1, len(embeddings)): sim = cosine_similarity(embeddings[i], embeddings[j]) similarities.append(sim) cohesion = sum(similarities) / len(similarities) else: cohesion = 1.0 else: # Use text similarity for cohesion calculation (fallback) centroid = None if len(cluster_memories) > 1: similarities = [] for i in range(len(cluster_memories)): for j in range(i + 1, len(cluster_memories)): sim = text_similarity( cluster_memories[i].content, cluster_memories[j].content ) similarities.append(sim) cohesion = sum(similarities) / len(similarities) else: cohesion = 1.0 # Determine suggested action based on cohesion if cohesion >= 0.9: suggested_action = "auto-merge" elif cohesion >= 0.75: suggested_action = "llm-review" else: suggested_action = "keep-separate" cluster = Cluster( id=str(uuid.uuid4()), memories=cluster_memories, centroid=centroid, cohesion=cohesion, suggested_action=suggested_action, ) result_clusters.append(cluster) return result_clusters def find_duplicate_candidates( memories: list[Memory], threshold: float = 0.88 ) -> list[tuple[Memory, Memory, float]]: """ Find pairs of memories that are likely duplicates based on similarity. Automatically falls back to Jaccard text similarity when embeddings are unavailable. Args: memories: List of memories (with or without embeddings) threshold: Similarity threshold for considering duplicates Returns: List of (memory1, memory2, similarity) tuples """ candidates = [] # Detect if we should use embeddings or text similarity # Use embeddings ONLY if ALL memories have embeddings, otherwise fall back to text similarity memories_with_embed = [m for m in memories if m.embed is not None] use_embeddings = len(memories_with_embed) == len(memories) and len(memories_with_embed) > 0 # Always use all memories (fallback handles mixed cases gracefully) active_memories = memories for i in range(len(active_memories)): for j in range(i + 1, len(active_memories)): mem1 = active_memories[i] mem2 = active_memories[j] # Calculate similarity using appropriate method if use_embeddings: if mem1.embed is not None and mem2.embed is not None: similarity = cosine_similarity(mem1.embed, mem2.embed) else: continue # Skip pairs without embeddings else: # Fallback to TF-IDF text similarity similarity = text_similarity(mem1.content, mem2.content) if similarity >= threshold: candidates.append((mem1, mem2, similarity)) # Sort by similarity descending candidates.sort(key=lambda x: x[2], reverse=True) return candidates

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server