find_duplicates.py•14.8 kB
#!/usr/bin/env python3
"""
Find and remove duplicate memories from the database.
Duplicates can occur when:
1. Same content was ingested multiple times
2. Re-ingestion after encoding fixes created duplicates
3. Manual storage of similar content
"""
import sqlite3
import json
import sys
import hashlib
import urllib.request
import urllib.parse
import ssl
from pathlib import Path
from collections import defaultdict
from datetime import datetime
def load_config():
    """Load configuration from Claude hooks config file."""
    config_path = Path.home() / '.claude' / 'hooks' / 'config.json'
    if config_path.exists():
        with open(config_path) as f:
            return json.load(f)
    return None
def get_memories_from_api(endpoint, api_key):
    """Retrieve all memories from the API endpoint using pagination."""
    try:
        # Create SSL context that allows self-signed certificates
        ssl_context = ssl.create_default_context()
        ssl_context.check_hostname = False
        ssl_context.verify_mode = ssl.CERT_NONE
        
        all_memories = []
        page = 1
        page_size = 100  # Use reasonable page size
        
        while True:
            # Create request for current page
            url = f"{endpoint}/api/memories?page={page}&page_size={page_size}"
            req = urllib.request.Request(url)
            req.add_header('Authorization', f'Bearer {api_key}')
            
            # Make request
            with urllib.request.urlopen(req, context=ssl_context, timeout=30) as response:
                if response.status != 200:
                    print(f"❌ API request failed: {response.status}")
                    return []
                
                data = response.read().decode('utf-8')
                api_response = json.loads(data)
            
            # Extract memories from this page
            page_memories = api_response.get('memories', [])
            total = api_response.get('total', 0)
            has_more = api_response.get('has_more', False)
            
            all_memories.extend(page_memories)
            print(f"Retrieved page {page}: {len(page_memories)} memories (total so far: {len(all_memories)}/{total})")
            
            if not has_more:
                break
                
            page += 1
        
        print(f"✅ Retrieved all {len(all_memories)} memories from API")
        
        # Convert API format to internal format
        converted_memories = []
        for mem in all_memories:
            converted_memories.append((
                mem.get('content_hash', ''),
                mem.get('content', ''),
                json.dumps(mem.get('tags', [])),
                mem.get('created_at', ''),
                json.dumps(mem.get('metadata', {}))
            ))
        
        return converted_memories
        
    except Exception as e:
        print(f"❌ Error retrieving memories from API: {e}")
        return []
def content_similarity_hash(content):
    """Create a hash for content similarity detection."""
    # Normalize content for comparison
    normalized = content.strip().lower()
    # Remove extra whitespace
    normalized = ' '.join(normalized.split())
    return hashlib.sha256(normalized.encode('utf-8')).hexdigest()[:16]
def find_duplicates(memories_source, similarity_threshold=0.95):
    """
    Find duplicate memories from either database or API.
    
    Args:
        memories_source: Either a database path (str) or list of memories from API
        similarity_threshold: Threshold for considering memories duplicates (0.0-1.0)
    
    Returns:
        Dict of duplicate groups
    """
    if isinstance(memories_source, str):
        # Database path provided
        conn = sqlite3.connect(memories_source)
        cursor = conn.cursor()
        
        print("Scanning for duplicate memories...")
        
        # Get all memories
        cursor.execute("""
            SELECT content_hash, content, tags, created_at, metadata
            FROM memories 
            ORDER BY created_at DESC
        """)
        
        all_memories = cursor.fetchall()
        conn.close()
    else:
        # API memories provided
        print("Analyzing memories from API...")
        all_memories = memories_source
    
    print(f"Found {len(all_memories)} total memories")
    
    # Group by content similarity
    content_groups = defaultdict(list)
    exact_content_groups = defaultdict(list)
    
    for memory in all_memories:
        content_hash, content, tags_json, created_at, metadata_json = memory
        
        # Parse tags and metadata
        try:
            tags = json.loads(tags_json) if tags_json else []
        except:
            tags = []
            
        try:
            metadata = json.loads(metadata_json) if metadata_json else {}
        except:
            metadata = {}
        
        # Exact content match
        exact_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()
        exact_content_groups[exact_hash].append({
            'hash': content_hash,
            'content': content,
            'tags': tags,
            'created_at': created_at,
            'metadata': metadata,
            'content_length': len(content)
        })
        
        # Similar content match (normalized)
        similarity_hash = content_similarity_hash(content)
        content_groups[similarity_hash].append({
            'hash': content_hash,
            'content': content,
            'tags': tags,
            'created_at': created_at,
            'metadata': metadata,
            'content_length': len(content)
        })
    
    # Find actual duplicates (groups with > 1 memory)
    exact_duplicates = {k: v for k, v in exact_content_groups.items() if len(v) > 1}
    similar_duplicates = {k: v for k, v in content_groups.items() if len(v) > 1}
    
    return {
        'exact': exact_duplicates,
        'similar': similar_duplicates,
        'total_memories': len(all_memories)
    }
def analyze_duplicate_group(group):
    """Analyze a group of duplicate memories to determine which to keep."""
    if len(group) <= 1:
        return None
        
    # Sort by creation date (newest first)
    sorted_group = sorted(group, key=lambda x: x['created_at'], reverse=True)
    
    analysis = {
        'group_size': len(group),
        'recommended_keep': None,
        'recommended_delete': [],
        'reasons': []
    }
    
    # Prefer memories with utf8-fixed tag (these are the corrected versions)
    utf8_fixed = [m for m in sorted_group if 'utf8-fixed' in m['tags']]
    if utf8_fixed:
        analysis['recommended_keep'] = utf8_fixed[0]
        analysis['recommended_delete'] = [m for m in sorted_group if m != utf8_fixed[0]]
        analysis['reasons'].append('Keeping UTF8-fixed version')
        return analysis
    
    # Prefer newer memories
    analysis['recommended_keep'] = sorted_group[0]  # Newest
    analysis['recommended_delete'] = sorted_group[1:]  # Older ones
    analysis['reasons'].append('Keeping newest version')
    
    # Check for different tag sets
    keep_tags = set(analysis['recommended_keep']['tags'])
    for delete_mem in analysis['recommended_delete']:
        delete_tags = set(delete_mem['tags'])
        if delete_tags != keep_tags:
            analysis['reasons'].append(f'Tag differences: {delete_tags - keep_tags}')
    
    return analysis
def remove_duplicates(db_path, duplicate_groups, dry_run=True):
    """
    Remove duplicate memories from the database.
    
    Args:
        db_path: Path to the SQLite database
        duplicate_groups: Dict of duplicate groups from find_duplicates()
        dry_run: If True, only show what would be deleted
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    total_to_delete = 0
    deletion_plan = []
    
    print(f"\n{'DRY RUN - ' if dry_run else ''}Analyzing duplicate groups...")
    
    # Process exact duplicates first
    print(f"\n=== EXACT DUPLICATES ===")
    for content_hash, group in duplicate_groups['exact'].items():
        analysis = analyze_duplicate_group(group)
        if analysis:
            total_to_delete += len(analysis['recommended_delete'])
            deletion_plan.extend(analysis['recommended_delete'])
            
            print(f"\nDuplicate group: {len(group)} memories")
            print(f"  Keep: {analysis['recommended_keep']['hash'][:20]}... ({analysis['recommended_keep']['created_at']})")
            print(f"  Tags: {', '.join(analysis['recommended_keep']['tags'][:3])}")
            print(f"  Delete: {len(analysis['recommended_delete'])} older versions")
            for reason in analysis['reasons']:
                print(f"  Reason: {reason}")
    
    # Process similar duplicates (but not exact)
    print(f"\n=== SIMILAR DUPLICATES ===")
    processed_exact_hashes = set()
    for group in duplicate_groups['exact'].values():
        for mem in group:
            processed_exact_hashes.add(mem['hash'])
    
    for similarity_hash, group in duplicate_groups['similar'].items():
        # Skip if these are exact duplicates we already processed
        group_hashes = {mem['hash'] for mem in group}
        if group_hashes.issubset(processed_exact_hashes):
            continue
            
        analysis = analyze_duplicate_group(group)
        if analysis:
            print(f"\nSimilar group: {len(group)} memories")
            print(f"  Keep: {analysis['recommended_keep']['hash'][:20]}... ({analysis['recommended_keep']['created_at']})")
            print(f"  Content preview: {analysis['recommended_keep']['content'][:100]}...")
            print(f"  Would delete: {len(analysis['recommended_delete'])} similar versions")
            # Don't auto-delete similar (only exact) in this version
    
    print(f"\n{'DRY RUN SUMMARY' if dry_run else 'DELETION SUMMARY'}:")
    print(f"Total exact duplicates to delete: {total_to_delete}")
    print(f"Current total memories: {duplicate_groups['total_memories']}")
    print(f"After cleanup: {duplicate_groups['total_memories'] - total_to_delete}")
    
    if not dry_run and total_to_delete > 0:
        print(f"\n{'='*50}")
        print("DELETING DUPLICATE MEMORIES...")
        
        deleted_count = 0
        for mem_to_delete in deletion_plan:
            try:
                # Delete from memories table
                cursor.execute("DELETE FROM memories WHERE content_hash = ?", (mem_to_delete['hash'],))
                
                # Also try to delete from embeddings if it exists
                try:
                    cursor.execute("DELETE FROM memory_embeddings WHERE rowid = ?", (mem_to_delete['hash'],))
                except:
                    pass  # Embeddings table might use different structure
                    
                deleted_count += 1
                if deleted_count % 10 == 0:
                    print(f"  Deleted {deleted_count}/{total_to_delete}...")
                    
            except Exception as e:
                print(f"  Error deleting {mem_to_delete['hash'][:20]}: {e}")
        
        conn.commit()
        print(f"✅ Successfully deleted {deleted_count} duplicate memories")
        
        # Verify final count
        cursor.execute("SELECT COUNT(*) FROM memories")
        final_count = cursor.fetchone()[0]
        print(f"📊 Final memory count: {final_count}")
    
    elif dry_run and total_to_delete > 0:
        print(f"\nTo actually delete these {total_to_delete} duplicates, run with --execute flag")
    
    conn.close()
    return total_to_delete
def main():
    """Main entry point."""
    import argparse
    
    parser = argparse.ArgumentParser(description='Find and remove duplicate memories')
    parser.add_argument('--db-path', type=str,
                        help='Path to SQLite database (if not using API)')
    parser.add_argument('--use-api', action='store_true',
                        help='Use API endpoint from config instead of database')
    parser.add_argument('--execute', action='store_true',
                        help='Actually delete the duplicates (default is dry run)')
    parser.add_argument('--similarity-threshold', type=float, default=0.95,
                        help='Similarity threshold for duplicate detection (0.0-1.0)')
    
    args = parser.parse_args()
    
    # Try to load config first
    config = load_config()
    
    if args.use_api or (not args.db_path and config):
        if not config:
            print("❌ No configuration found. Use --db-path for local database or ensure config exists.")
            sys.exit(1)
        
        endpoint = config.get('memoryService', {}).get('endpoint')
        api_key = config.get('memoryService', {}).get('apiKey')
        
        if not endpoint or not api_key:
            print("❌ API endpoint or key not found in configuration")
            sys.exit(1)
        
        print(f"🌐 Using API endpoint: {endpoint}")
        
        # Get memories from API
        memories = get_memories_from_api(endpoint, api_key)
        if not memories:
            print("❌ Failed to retrieve memories from API")
            sys.exit(1)
        
        # Find duplicates
        duplicates = find_duplicates(memories, args.similarity_threshold)
        
        if not duplicates['exact'] and not duplicates['similar']:
            print("✅ No duplicates found!")
            return
        
        print(f"\nFound:")
        print(f"  - {len(duplicates['exact'])} exact duplicate groups")
        print(f"  - {len(duplicates['similar'])} similar content groups")
        
        if args.execute:
            print("⚠️  API-based deletion not yet implemented. Use database path for deletion.")
        else:
            # Show analysis only
            remove_duplicates(None, duplicates, dry_run=True)
            
    else:
        # Use database path
        db_path = args.db_path or '/home/hkr/.local/share/mcp-memory/sqlite_vec.db'
        
        if not Path(db_path).exists():
            print(f"❌ Database not found: {db_path}")
            print("💡 Try --use-api to use the API endpoint from config instead")
            sys.exit(1)
        
        # Find duplicates
        duplicates = find_duplicates(db_path, args.similarity_threshold)
        
        if not duplicates['exact'] and not duplicates['similar']:
            print("✅ No duplicates found!")
            return
        
        print(f"\nFound:")
        print(f"  - {len(duplicates['exact'])} exact duplicate groups")
        print(f"  - {len(duplicates['similar'])} similar content groups")
        
        # Remove duplicates
        remove_duplicates(db_path, duplicates, dry_run=not args.execute)
if __name__ == "__main__":
    main()