hierarchical_compression.py•6.73 kB
"""
Hierarchical Compression
Implements N0 (chunks), N1 (micro-summaries), N2 (meta-summaries) levels
"""
import hashlib
import logging
from datetime import datetime
from typing import Any, Optional, cast
logger = logging.getLogger(__name__)
class HierarchicalCompressor:
"""
Manages hierarchical context compression
N0: Raw chunks
N1: Micro-summaries (5-10 chunks → 1 summary)
N2: Meta-summaries (5-10 micro-summaries → 1 meta)
"""
def __init__(
self,
max_working_set_size: int = 10,
decay_hours: int = 168, # 7 days
compression_ratio_n1: int = 5,
compression_ratio_n2: int = 5,
):
"""
Args:
max_working_set_size: Maximum items in working set
decay_hours: Hours for relevance decay
compression_ratio_n1: How many N0 chunks → 1 N1 summary
compression_ratio_n2: How many N1 summaries → 1 N2 meta
"""
self.max_working_set_size = max_working_set_size
self.decay_hours = decay_hours
self.compression_ratio_n1 = compression_ratio_n1
self.compression_ratio_n2 = compression_ratio_n2
def compute_relevance_score(
self,
item: dict[str, Any],
query_embedding: Optional[list[float]] = None,
current_time: Optional[datetime] = None,
) -> float: # type: ignore[no-any-return]
"""
Computes relevance score considering:
- Semantic similarity (if query provided)
- Temporal decay
- Access frequency
Args:
item: Item with metadata
query_embedding: Query embedding (optional)
current_time: Current timestamp
Returns:
Relevance score [0, 1]
"""
score = 0.0
current_time = current_time or datetime.utcnow()
# 1. Semantic similarity (weight: 0.5)
if query_embedding and "embedding" in item:
similarity: float = self._cosine_similarity(query_embedding, item["embedding"])
score += 0.5 * similarity
# 2. Temporal decay (weight: 0.3)
if "timestamp" in item.get("metadata", {}):
timestamp = datetime.fromisoformat(item["metadata"]["timestamp"])
age_hours = (current_time - timestamp).total_seconds() / 3600
decay_factor = max(0, 1 - (age_hours / self.decay_hours))
score += 0.3 * decay_factor
# 3. Access frequency (weight: 0.2)
access_count = item.get("metadata", {}).get("access_count", 0)
# Normalize to [0, 1] assuming max 100 accesses
access_factor = min(1.0, access_count / 100.0)
score += 0.2 * access_factor
return min(1.0, score) # type: ignore[no-any-return]
def _cosine_similarity(self, vec1: list[float], vec2: list[float]) -> float:
"""Computes cosine similarity between vectors"""
import numpy as np
v1 = np.array(vec1, dtype=np.float64)
v2 = np.array(vec2, dtype=np.float64)
dot_product = np.dot(v1, v2)
norm_product = np.linalg.norm(v1) * np.linalg.norm(v2)
result = cast(float, float(dot_product / norm_product))
return result
def create_micro_summary(
self, chunks: list[dict[str, Any]], llm_summarize_fn=None
) -> dict[str, Any]:
"""
Creates micro-summary (N1) from N0 chunks
Args:
chunks: List of raw chunks
llm_summarize_fn: Function to generate summary
Returns:
Micro-summary with metadata
"""
if not chunks:
return {}
# Concatenate texts
combined_text = "\n".join([c.get("text", "") for c in chunks])
# Generate summary
if llm_summarize_fn:
summary_text = llm_summarize_fn(combined_text)
else:
# Fallback: first N words
summary_text = " ".join(combined_text.split()[:100]) + "..."
# Collect metadata
source_ids = [c.get("id") for c in chunks if c.get("id")]
timestamps = [c.get("metadata", {}).get("timestamp") for c in chunks]
timestamps = [t for t in timestamps if t]
# Create unique hash
content_hash = hashlib.sha256(combined_text.encode()).hexdigest()[:16]
return {
"id": f"n1_{content_hash}",
"level": "N1",
"text": summary_text,
"source_chunks": source_ids,
"metadata": {
"timestamp": max(timestamps) if timestamps else None,
"chunk_count": len(chunks),
"compression_ratio": len(combined_text) / len(summary_text) if summary_text else 1,
},
}
def create_meta_summary(
self, micro_summaries: list[dict[str, Any]], llm_summarize_fn=None
) -> dict[str, Any]:
"""
Creates meta-summary (N2) from N1 micro-summaries
Args:
micro_summaries: List of micro-summaries
llm_summarize_fn: Function to generate summary
Returns:
Meta-summary with metadata
"""
if not micro_summaries:
return {}
combined_text = "\n".join([s.get("text", "") for s in micro_summaries])
if llm_summarize_fn:
meta_text = llm_summarize_fn(combined_text)
else:
meta_text = " ".join(combined_text.split()[:50]) + "..."
source_ids = [s.get("id") for s in micro_summaries if s.get("id")]
content_hash = hashlib.sha256(combined_text.encode()).hexdigest()[:16]
return {
"id": f"n2_{content_hash}",
"level": "N2",
"text": meta_text,
"source_summaries": source_ids,
"metadata": {
"timestamp": datetime.utcnow().isoformat(),
"summary_count": len(micro_summaries),
},
}
def select_working_set(
self,
all_items: list[dict[str, Any]],
query_embedding: Optional[list[float]] = None,
) -> list[dict[str, Any]]:
"""
Selects optimized working set based on relevance
Args:
all_items: All available items
query_embedding: Current query embedding
Returns:
List of most relevant items
"""
# Calculate scores
scored_items = [
(
item,
self.compute_relevance_score(item, query_embedding),
)
for item in all_items
]
# Sort by descending score
scored_items.sort(key=lambda x: x[1], reverse=True)
# Return top-K
return [item for item, _ in scored_items[: self.max_working_set_size]]