"""News deduplication utilities using SimHash algorithm."""
import re
import hashlib
from typing import Dict, Any, Set, Optional, Union
from collections import Counter
import logging
class NewsDeduplicator:
"""News deduplication using SimHash similarity."""
def __init__(self, threshold: float = 0.85):
"""Initialize deduplicator.
Args:
threshold: Similarity threshold for duplicate detection (0.0 - 1.0)
"""
self.threshold = threshold
self.hash_cache: Dict[str, int] = {} # Cache for computed hashes
self.seen_hashes: Set[int] = set() # Set of previously seen hashes
self.logger = logging.getLogger("deduplicator")
async def is_duplicate(self, news_item: Dict[str, Any], existing_items: Optional[Union[Dict[str, Any], list]] = None) -> bool:
"""Check if news item is a duplicate.
Args:
news_item: News item to check
existing_items: Optional list of existing items to compare against
Returns:
True if item is a duplicate, False otherwise
"""
# Create text representation for comparison
text = self._create_comparison_text(news_item)
# Calculate SimHash for the item
item_hash = self.calculate_simhash(text)
# Check against cached hashes first
if item_hash in self.seen_hashes:
return True
# If existing_items provided, compare against them
if existing_items:
# Handle single item or list of items
items_to_check = [existing_items] if isinstance(existing_items, dict) else existing_items
for existing_item in items_to_check:
existing_text = self._create_comparison_text(existing_item)
existing_hash = self.calculate_simhash(existing_text)
similarity = self.calculate_similarity(item_hash, existing_hash)
if similarity >= self.threshold:
return True
# Store hash for future comparisons
self.seen_hashes.add(item_hash)
self.hash_cache[text] = item_hash
return False
def _create_comparison_text(self, news_item: Dict[str, Any]) -> str:
"""Create text representation for comparison.
Args:
news_item: News item dictionary
Returns:
Combined text for comparison
"""
parts = []
# Add title (most important)
if 'title' in news_item and news_item['title']:
parts.append(news_item['title'].strip())
# Add first part of content
if 'content' in news_item and news_item['content']:
content = news_item['content'].strip()
# Use first 500 characters of content
parts.append(content[:500])
return ' '.join(parts)
def calculate_simhash(self, text: str) -> int:
"""Calculate SimHash for given text.
Args:
text: Input text
Returns:
64-bit SimHash as integer
"""
if not text:
return 0
# Check cache first
if text in self.hash_cache:
return self.hash_cache[text]
# Tokenize and clean text
tokens = self._tokenize(text)
if not tokens:
return 0
# Count token frequencies
token_counts = Counter(tokens)
# Initialize bit vector
bit_vector = [0] * 64
# Process each unique token
for token, count in token_counts.items():
# Hash the token
token_hash = self._hash_token(token)
# For each bit position
for i in range(64):
bit = (token_hash >> i) & 1
if bit:
bit_vector[i] += count
else:
bit_vector[i] -= count
# Generate final hash
simhash = 0
for i in range(64):
if bit_vector[i] > 0:
simhash |= (1 << i)
# Cache the result
self.hash_cache[text] = simhash
return simhash
def _tokenize(self, text: str) -> list:
"""Tokenize text for SimHash calculation.
Args:
text: Input text
Returns:
List of tokens
"""
# Convert to lowercase
text = text.lower()
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Extract Korean, English words and numbers
tokens = re.findall(r'[가-힣]+|[a-zA-Z]+|[0-9]+', text)
# Filter out very short tokens
tokens = [token for token in tokens if len(token) >= 2]
# Also create bigrams for better similarity detection
bigrams = []
for i in range(len(tokens) - 1):
bigrams.append(tokens[i] + tokens[i + 1])
return tokens + bigrams
def _hash_token(self, token: str) -> int:
"""Hash a single token.
Args:
token: Token to hash
Returns:
64-bit hash as integer
"""
# Use MD5 for speed (security not critical here)
hash_bytes = hashlib.md5(token.encode('utf-8')).digest()
# Convert first 8 bytes to 64-bit integer
return int.from_bytes(hash_bytes[:8], byteorder='big')
def calculate_similarity(self, hash1: int, hash2: int) -> float:
"""Calculate similarity between two SimHashes.
Args:
hash1: First hash
hash2: Second hash
Returns:
Similarity score between 0.0 and 1.0
"""
# XOR to find differing bits
xor_result = hash1 ^ hash2
# Count number of differing bits (Hamming distance)
hamming_distance = bin(xor_result).count('1')
# Convert to similarity (0.0 = completely different, 1.0 = identical)
similarity = 1.0 - (hamming_distance / 64.0)
return similarity
def clear_cache(self):
"""Clear the hash cache and seen hashes."""
self.hash_cache.clear()
self.seen_hashes.clear()
self.logger.info("Deduplicator cache cleared")
def get_cache_size(self) -> int:
"""Get current cache size.
Returns:
Number of cached items
"""
return len(self.hash_cache)
def set_threshold(self, threshold: float):
"""Update similarity threshold.
Args:
threshold: New threshold value (0.0 - 1.0)
"""
if not 0.0 <= threshold <= 1.0:
raise ValueError("Threshold must be between 0.0 and 1.0")
self.threshold = threshold
self.logger.info(f"Similarity threshold updated to {threshold}")