"""
Result deduplication.
Removes duplicate content across sources using URL normalization,
title matching, and content similarity detection.
Typically achieves 30-40% reduction in duplicates.
"""
import hashlib
import json
import logging
import re
import urllib.parse
from typing import Any
from core.quality import QualityScorer
__all__ = ["deduplicate_results"]
logger = logging.getLogger(__name__)
# ══════════════════════════════════════════════════════════════════════════════
# Deduplication
# ══════════════════════════════════════════════════════════════════════════════
def deduplicate_results(
results: dict[str, list[dict[str, Any]]],
) -> dict[str, list[dict[str, Any]]]:
"""
Remove duplicate content across sources.
Keeps the highest-quality version of each unique result based on:
1. URL normalization (same page)
2. Title matching (same question/topic)
3. Content similarity (same subject being discussed)
Args:
results: Dict mapping source names to result lists
Returns:
Deduplicated results dict
"""
scorer = QualityScorer()
best_by_key: dict[str, dict[str, Any]] = {}
key_to_source: dict[str, str] = {}
title_to_key: dict[str, str] = {}
content_fingerprints: dict[str, str] = {} # fingerprint -> key
for source, items in results.items():
for item in items:
key = _build_key(item)
title = _normalize_title(item.get("title", ""))
# Check if title matches existing key
if title and title in title_to_key:
key = title_to_key[title]
# Check content similarity (for catching same topic across sources)
fingerprint = _content_fingerprint(item)
if fingerprint and fingerprint in content_fingerprints:
existing_key = content_fingerprints[fingerprint]
# Merge with existing if very similar content
key = existing_key
# Score the item
scored = {**item, "source": item.get("source", source)}
scored["quality_score"] = scored.get("quality_score") or scorer.score(
scored
)
# Keep highest quality version
existing = best_by_key.get(key)
if not existing or scored["quality_score"] > existing["quality_score"]:
best_by_key[key] = scored
key_to_source[key] = scored["source"]
if title:
title_to_key[title] = key
if fingerprint:
content_fingerprints[fingerprint] = key
# Rebuild results by source (normalize sub-sources to base source)
deduped: dict[str, list[dict[str, Any]]] = {s: [] for s in results}
for key, item in best_by_key.items():
source = key_to_source.get(key, item.get("source", "unknown"))
# Normalize sub-sources to base source (e.g., "discourse:python.org" -> "discourse")
base_source = source.split(":")[0] if ":" in source else source
# Map to original source keys
if base_source in deduped:
deduped[base_source].append(item)
elif source in deduped:
deduped[source].append(item)
else:
# Try to find a matching base source in the original results
matched = False
for orig_source in results.keys():
if orig_source.startswith(base_source) or base_source.startswith(
orig_source.split(":")[0]
):
deduped[orig_source].append(item)
matched = True
break
if not matched:
deduped.setdefault(base_source, []).append(item)
# Log stats
original = sum(len(items) for items in results.values())
final = sum(len(items) for items in deduped.values())
removed = original - final
if removed > 0:
pct = (removed / original * 100) if original > 0 else 0
logger.info(f"Deduplication: removed {removed} ({pct:.1f}%)")
return deduped
def _build_key(item: dict[str, Any]) -> str:
"""Build deduplication key from URL, title, or content hash."""
# Try URL first
if url := item.get("url", "").strip():
try:
parsed = urllib.parse.urlparse(url)
host = parsed.netloc.lower().lstrip("www.")
path = parsed.path.rstrip("/")
return f"{host}{path}"
except Exception:
return url.rstrip("/").split("?")[0]
# Try title
if title := item.get("title", ""):
normalized = _normalize_title(title)
if len(normalized) > 12:
return normalized
# Fall back to content hash
content = item.get("snippet") or item.get("content") or ""
if content:
return hashlib.md5(content.encode()).hexdigest()
return hashlib.md5(json.dumps(item, sort_keys=True).encode()).hexdigest()
def _normalize_title(title: str) -> str:
"""Normalize title for comparison."""
t = title.lower().strip()
# Remove common suffixes
for suffix in [" - stack overflow", " | hacker news", " | stackoverflow"]:
t = t.replace(suffix, "")
return re.sub(r"\s+", " ", t)
def _content_fingerprint(item: dict[str, Any]) -> str | None:
"""
Generate a fingerprint for content similarity detection.
Extracts key terms (project names, libraries, specific technical terms)
to identify when multiple results discuss the same subject.
"""
title = item.get("title", "").lower()
snippet = item.get("snippet", "").lower()[:500]
content = f"{title} {snippet}"
# Extract significant terms (capitalized words, likely project/library names)
# From the original (non-lowercased) content
orig_content = f"{item.get('title', '')} {item.get('snippet', '')[:500]}"
# Find project/library names (CamelCase or all caps, 3+ chars)
project_names = set()
# CamelCase names (e.g., FastAPI, PyTorch, NumPy)
camel_matches = re.findall(r"\b([A-Z][a-z]+(?:[A-Z][a-z]+)+)\b", orig_content)
project_names.update(m.lower() for m in camel_matches)
# All caps acronyms (e.g., API, SQL, HTTP) - 2-6 chars
acronym_matches = re.findall(r"\b([A-Z]{2,6})\b", orig_content)
project_names.update(
m.lower()
for m in acronym_matches
if m.lower() not in {"the", "and", "for", "not", "but", "how", "why", "what"}
)
# Package/library names (lowercase with hyphens/underscores)
pkg_matches = re.findall(
r"\b([a-z][a-z0-9]*[-_][a-z0-9]+(?:[-_][a-z0-9]+)*)\b", content
)
project_names.update(pkg_matches)
# Common library names that might appear in various forms
known_libs = re.findall(
r"\b(regex|numpy|pandas|pytorch|tensorflow|fastapi|flask|django|"
r"react|vue|angular|express|node|rust|python|java|go|typescript)\b",
content,
)
project_names.update(known_libs)
# If we found significant project names, use them as fingerprint
if project_names:
# Sort for consistency, take top 3 most significant
significant = sorted(project_names, key=len, reverse=True)[:3]
if significant:
return "|".join(sorted(significant))
return None