Skip to main content
Glama

MCP Memory Service

#!/usr/bin/env python3 # Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Migration Validation Script for MCP Memory Service This script validates that a migration from ChromaDB to SQLite-vec was successful by checking data integrity, required fields, and identifying any corruption. """ import argparse import asyncio import hashlib import json import os import sqlite3 import sys from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Optional, Tuple # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "src")) try: from mcp_memory_service.storage.chroma import ChromaMemoryStorage from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage from mcp_memory_service.utils.hashing import generate_content_hash IMPORTS_AVAILABLE = True except ImportError: IMPORTS_AVAILABLE = False print("Warning: MCP modules not available. Running in standalone mode.") class ValidationReport: """Container for validation results.""" def __init__(self): self.total_memories = 0 self.valid_memories = 0 self.issues = [] self.warnings = [] self.tag_issues = [] self.hash_mismatches = [] self.missing_fields = [] self.timestamp_issues = [] self.encoding_issues = [] def add_issue(self, issue: str): """Add a critical issue.""" self.issues.append(issue) def add_warning(self, warning: str): """Add a warning.""" self.warnings.append(warning) def is_valid(self) -> bool: """Check if validation passed.""" return len(self.issues) == 0 def print_report(self): """Print the validation report.""" print("\n" + "="*60) print("MIGRATION VALIDATION REPORT") print("="*60) print(f"\n📊 Statistics:") print(f" Total memories: {self.total_memories}") print(f" Valid memories: {self.valid_memories}") print(f" Validation rate: {self.valid_memories/self.total_memories*100:.1f}%" if self.total_memories > 0 else "N/A") if self.issues: print(f"\n❌ Critical Issues ({len(self.issues)}):") for i, issue in enumerate(self.issues[:10], 1): print(f" {i}. {issue}") if len(self.issues) > 10: print(f" ... and {len(self.issues) - 10} more") if self.warnings: print(f"\n⚠️ Warnings ({len(self.warnings)}):") for i, warning in enumerate(self.warnings[:10], 1): print(f" {i}. {warning}") if len(self.warnings) > 10: print(f" ... and {len(self.warnings) - 10} more") if self.tag_issues: print(f"\n🏷️ Tag Issues ({len(self.tag_issues)}):") for i, issue in enumerate(self.tag_issues[:5], 1): print(f" {i}. {issue}") if len(self.tag_issues) > 5: print(f" ... and {len(self.tag_issues) - 5} more") if self.hash_mismatches: print(f"\n🔑 Hash Mismatches ({len(self.hash_mismatches)}):") for i, mismatch in enumerate(self.hash_mismatches[:5], 1): print(f" {i}. {mismatch}") if len(self.hash_mismatches) > 5: print(f" ... and {len(self.hash_mismatches) - 5} more") if self.timestamp_issues: print(f"\n🕐 Timestamp Issues ({len(self.timestamp_issues)}):") for i, issue in enumerate(self.timestamp_issues[:5], 1): print(f" {i}. {issue}") if len(self.timestamp_issues) > 5: print(f" ... and {len(self.timestamp_issues) - 5} more") # Final verdict print("\n" + "="*60) if self.is_valid(): print("✅ VALIDATION PASSED") if self.warnings: print(f" (with {len(self.warnings)} warnings)") else: print("❌ VALIDATION FAILED") print(f" Found {len(self.issues)} critical issues") print("="*60) class MigrationValidator: """Tool for validating migrated data.""" def __init__(self, sqlite_path: str, chroma_path: Optional[str] = None): self.sqlite_path = sqlite_path self.chroma_path = chroma_path self.report = ValidationReport() def validate_content_hash(self, content: str, stored_hash: str) -> bool: """Validate that content hash is correct.""" if not stored_hash: return False # Generate expected hash expected_hash = hashlib.sha256(content.encode()).hexdigest() return expected_hash == stored_hash def validate_tags(self, tags_str: str) -> Tuple[bool, List[str]]: """Validate tag format and return cleaned tags.""" if not tags_str: return True, [] try: # Tags should be comma-separated tags = [tag.strip() for tag in tags_str.split(',') if tag.strip()] # Check for common corruption patterns issues = [] for tag in tags: if '\n' in tag or '\r' in tag: issues.append(f"Newline in tag: {repr(tag)}") if len(tag) > 100: issues.append(f"Tag too long: {tag[:50]}...") if tag.startswith('[') or tag.endswith(']'): issues.append(f"Array syntax in tag: {tag}") return len(issues) == 0, tags except Exception as e: return False, [] def validate_timestamp(self, timestamp: Any, field_name: str) -> bool: """Validate timestamp value.""" if timestamp is None: return False try: if isinstance(timestamp, (int, float)): # Check if timestamp is reasonable (between 2000 and 2100) if 946684800 <= float(timestamp) <= 4102444800: return True return False except: return False def validate_metadata(self, metadata_str: str) -> Tuple[bool, Dict]: """Validate metadata JSON.""" if not metadata_str: return True, {} try: metadata = json.loads(metadata_str) if not isinstance(metadata, dict): return False, {} return True, metadata except json.JSONDecodeError: return False, {} async def validate_sqlite_database(self) -> bool: """Validate the SQLite-vec database.""" print(f"Validating SQLite database: {self.sqlite_path}") if not Path(self.sqlite_path).exists(): self.report.add_issue(f"Database file not found: {self.sqlite_path}") return False try: conn = sqlite3.connect(self.sqlite_path) # Check if tables exist tables = conn.execute( "SELECT name FROM sqlite_master WHERE type='table'" ).fetchall() table_names = [t[0] for t in tables] if 'memories' not in table_names: self.report.add_issue("Missing 'memories' table") return False # Check schema schema = conn.execute("PRAGMA table_info(memories)").fetchall() column_names = [col[1] for col in schema] required_columns = [ 'content_hash', 'content', 'tags', 'memory_type', 'metadata', 'created_at', 'updated_at' ] for col in required_columns: if col not in column_names: self.report.add_issue(f"Missing required column: {col}") # Validate each memory cursor = conn.execute(""" SELECT id, content_hash, content, tags, memory_type, metadata, created_at, updated_at FROM memories ORDER BY id """) for row in cursor: memory_id, content_hash, content, tags, memory_type = row[:5] metadata, created_at, updated_at = row[5:] self.report.total_memories += 1 memory_valid = True # Validate content and hash if not content: self.report.missing_fields.append(f"Memory {memory_id}: missing content") memory_valid = False elif not content_hash: self.report.missing_fields.append(f"Memory {memory_id}: missing content_hash") memory_valid = False elif not self.validate_content_hash(content, content_hash): self.report.hash_mismatches.append( f"Memory {memory_id}: hash mismatch (hash: {content_hash[:8]}...)" ) memory_valid = False # Validate tags if tags: valid_tags, cleaned_tags = self.validate_tags(tags) if not valid_tags: self.report.tag_issues.append( f"Memory {memory_id}: malformed tags: {tags[:50]}..." ) # Validate timestamps if not self.validate_timestamp(created_at, 'created_at'): self.report.timestamp_issues.append( f"Memory {memory_id}: invalid created_at: {created_at}" ) memory_valid = False if not self.validate_timestamp(updated_at, 'updated_at'): self.report.timestamp_issues.append( f"Memory {memory_id}: invalid updated_at: {updated_at}" ) # Validate metadata if metadata: valid_meta, meta_dict = self.validate_metadata(metadata) if not valid_meta: self.report.warnings.append( f"Memory {memory_id}: invalid metadata JSON" ) # Check for encoding issues try: content.encode('utf-8') except UnicodeEncodeError: self.report.encoding_issues.append( f"Memory {memory_id}: encoding issues in content" ) memory_valid = False if memory_valid: self.report.valid_memories += 1 conn.close() # Check for critical issues if self.report.total_memories == 0: self.report.add_issue("No memories found in database") elif self.report.valid_memories < self.report.total_memories * 0.5: self.report.add_issue( f"Less than 50% of memories are valid ({self.report.valid_memories}/{self.report.total_memories})" ) return True except Exception as e: self.report.add_issue(f"Database validation error: {e}") return False async def compare_with_chroma(self) -> bool: """Compare migrated data with original ChromaDB.""" if not self.chroma_path or not IMPORTS_AVAILABLE: print("Skipping ChromaDB comparison (not available)") return True print(f"Comparing with ChromaDB: {self.chroma_path}") try: # Load ChromaDB memories chroma_storage = ChromaMemoryStorage(path=self.chroma_path) collection = chroma_storage.collection if not collection: self.report.add_warning("Could not access ChromaDB collection") return True # Get count from ChromaDB chroma_results = collection.get() chroma_count = len(chroma_results.get('ids', [])) print(f" ChromaDB memories: {chroma_count}") print(f" SQLite memories: {self.report.total_memories}") if chroma_count > 0: migration_rate = self.report.total_memories / chroma_count * 100 if migration_rate < 95: self.report.add_warning( f"Only {migration_rate:.1f}% of ChromaDB memories were migrated" ) elif migration_rate > 105: self.report.add_warning( f"SQLite has {migration_rate:.1f}% of ChromaDB count (possible duplicates)" ) return True except Exception as e: self.report.add_warning(f"Could not compare with ChromaDB: {e}") return True async def run(self) -> bool: """Run the validation process.""" print("\n" + "="*60) print("MCP Memory Service - Migration Validator") print("="*60) # Validate SQLite database if not await self.validate_sqlite_database(): self.report.print_report() return False # Compare with ChromaDB if available if self.chroma_path: await self.compare_with_chroma() # Print report self.report.print_report() return self.report.is_valid() def find_databases() -> Tuple[Optional[str], Optional[str]]: """Try to find SQLite and ChromaDB databases.""" sqlite_path = None chroma_path = None # Check environment variables sqlite_path = os.environ.get('MCP_MEMORY_SQLITE_PATH') chroma_path = os.environ.get('MCP_MEMORY_CHROMA_PATH') # Check default locations home = Path.home() if sys.platform == 'darwin': # macOS default_base = home / 'Library' / 'Application Support' / 'mcp-memory' elif sys.platform == 'win32': # Windows default_base = Path(os.getenv('LOCALAPPDATA', '')) / 'mcp-memory' else: # Linux default_base = home / '.local' / 'share' / 'mcp-memory' if not sqlite_path: # Try to find SQLite database possible_sqlite = [ default_base / 'sqlite_vec.db', default_base / 'sqlite_vec_migrated.db', default_base / 'memory_migrated.db', Path.cwd() / 'sqlite_vec.db', ] for path in possible_sqlite: if path.exists(): sqlite_path = str(path) print(f"Found SQLite database: {path}") break if not chroma_path: # Try to find ChromaDB possible_chroma = [ home / '.mcp_memory_chroma', default_base / 'chroma_db', Path.cwd() / 'chroma_db', ] for path in possible_chroma: if path.exists(): chroma_path = str(path) print(f"Found ChromaDB: {path}") break return sqlite_path, chroma_path def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Validate ChromaDB to SQLite-vec migration", formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( 'sqlite_path', nargs='?', help='Path to SQLite-vec database (default: auto-detect)' ) parser.add_argument( '--chroma-path', help='Path to original ChromaDB for comparison' ) parser.add_argument( '--compare', action='store_true', help='Compare with ChromaDB (requires ChromaDB path)' ) parser.add_argument( '--fix', action='store_true', help='Attempt to fix common issues (experimental)' ) args = parser.parse_args() # Find databases if args.sqlite_path: sqlite_path = args.sqlite_path else: sqlite_path, detected_chroma = find_databases() if not args.chroma_path and detected_chroma: args.chroma_path = detected_chroma if not sqlite_path: print("Error: Could not find SQLite database") print("Please specify the path or set MCP_MEMORY_SQLITE_PATH") return 1 # Run validation validator = MigrationValidator(sqlite_path, args.chroma_path) success = asyncio.run(validator.run()) return 0 if success else 1 if __name__ == "__main__": sys.exit(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server