Skip to main content
Glama

MCP Memory Service

#!/usr/bin/env python3 # Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Enhanced SQLite migration script to fix timestamp formats in ChromaDB. This improved version populates all timestamp columns with appropriate values. """ import sqlite3 import logging import os import sys import platform from pathlib import Path import json import datetime # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) logger = logging.getLogger("sqlite_migration") def find_claude_chroma_db(): """ Finds the Claude desktop ChromaDB storage location based on the operating system. """ system = platform.system() home = Path.home() # List of potential ChromaDB locations for Claude desktop possible_locations = [] if system == "Darwin": # macOS # Standard iCloud Drive location icloud_path = home / "Library" / "Mobile Documents" / "com~apple~CloudDocs" / "AI" / "claude-memory" / "chroma_db" possible_locations.append(icloud_path) # Local AppData location local_path = home / "Library" / "Application Support" / "Claude" / "claude-memory" / "chroma_db" possible_locations.append(local_path) elif system == "Windows": # Standard Windows location appdata_path = Path(os.environ.get("LOCALAPPDATA", "")) / "Claude" / "claude-memory" / "chroma_db" possible_locations.append(appdata_path) # OneDrive potential path onedrive_path = home / "OneDrive" / "Documents" / "Claude" / "claude-memory" / "chroma_db" possible_locations.append(onedrive_path) elif system == "Linux": # Standard Linux location linux_path = home / ".config" / "Claude" / "claude-memory" / "chroma_db" possible_locations.append(linux_path) # Try to find config file that might tell us the location config_locations = [] if system == "Darwin": config_locations.append(home / "Library" / "Application Support" / "Claude" / "config.json") elif system == "Windows": config_locations.append(Path(os.environ.get("APPDATA", "")) / "Claude" / "config.json") elif system == "Linux": config_locations.append(home / ".config" / "Claude" / "config.json") # Check if config file exists and try to read DB path from it for config_path in config_locations: if config_path.exists(): try: with open(config_path, 'r') as f: config = json.load(f) if 'memoryStoragePath' in config: mem_path = Path(config['memoryStoragePath']) / "chroma_db" possible_locations.insert(0, mem_path) # Prioritize this path logger.info(f"Found memory path in config: {mem_path}") except Exception as e: logger.warning(f"Error reading config file {config_path}: {e}") # Check all possible locations for location in possible_locations: db_path = location / "chroma.sqlite3" if db_path.exists(): logger.info(f"Found ChromaDB at: {db_path}") return str(db_path) logger.error("Could not find Claude's ChromaDB storage location") return None def get_table_schema(cursor, table_name): """ Retrieves the schema of a table to understand available columns. """ cursor.execute(f"PRAGMA table_info({table_name})") columns = cursor.fetchall() return {col[1]: col for col in columns} # returns dict with column name as key def timestamp_to_all_types(timestamp_value): """ Convert a timestamp to all possible formats: - integer (unix timestamp) - float (unix timestamp with milliseconds) - string (ISO format) """ # Handle different input types timestamp_int = None if isinstance(timestamp_value, int): timestamp_int = timestamp_value elif isinstance(timestamp_value, float): timestamp_int = int(timestamp_value) elif isinstance(timestamp_value, str): try: # Try to parse as float first timestamp_int = int(float(timestamp_value)) except ValueError: # Try to parse as ISO date try: dt = datetime.datetime.fromisoformat(timestamp_value.replace('Z', '+00:00')) timestamp_int = int(dt.timestamp()) except ValueError: # Try different date formats for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y/%m/%d %H:%M:%S"]: try: dt = datetime.datetime.strptime(timestamp_value, fmt) timestamp_int = int(dt.timestamp()) break except ValueError: continue if timestamp_int is None: raise ValueError(f"Could not convert timestamp value: {timestamp_value}") # Generate all formats timestamp_float = float(timestamp_int) # ISO format string representation dt = datetime.datetime.fromtimestamp(timestamp_int, tz=datetime.timezone.utc) timestamp_str = dt.isoformat().replace('+00:00', 'Z') return { 'int': timestamp_int, 'float': timestamp_float, 'string': timestamp_str } def migrate_timestamps_in_sqlite(db_path): """ Enhanced migration that identifies timestamp data across all columns and populates all columns with consistent type values. """ logger.info(f"Connecting to SQLite database at {db_path}") if not os.path.exists(db_path): logger.error(f"Database file not found: {db_path}") return False try: # Connect to the SQLite database conn = sqlite3.connect(db_path) cursor = conn.cursor() # Check if the embedding_metadata table exists cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='embedding_metadata'") if not cursor.fetchone(): logger.error("Table embedding_metadata not found in database.") conn.close() return False # Get table schema to understand column structure schema = get_table_schema(cursor, "embedding_metadata") logger.info(f"Table schema: {schema}") # Find all timestamp entries from any column logger.info("Identifying all timestamp entries...") cursor.execute(""" SELECT id, key, string_value, int_value, float_value FROM embedding_metadata WHERE key = 'timestamp' """) all_rows = cursor.fetchall() if not all_rows: logger.warning("No timestamp entries found in the database.") conn.close() return True logger.info(f"Found {len(all_rows)} timestamp entries to process") # Process each timestamp row processed_count = 0 failed_count = 0 for row in all_rows: id_val, key, string_val, int_val, float_val = row source_value = None source_type = None # Find which column has a non-NULL value if int_val is not None: source_value = int_val source_type = 'int' elif float_val is not None: source_value = float_val source_type = 'float' elif string_val is not None: source_value = string_val source_type = 'string' if source_value is None: logger.warning(f"Row ID {id_val} has no timestamp value in any column") failed_count += 1 continue try: # Convert to all types logger.info(f"Processing ID {id_val}: {source_type} value {source_value}") all_formats = timestamp_to_all_types(source_value) # Update the row with all formats cursor.execute(""" UPDATE embedding_metadata SET int_value = ?, float_value = ?, string_value = ? WHERE id = ? AND KEY ='timestamp' """, (all_formats['int'], all_formats['float'], all_formats['string'], id_val)) processed_count += 1 except Exception as e: logger.error(f"Error processing timestamp for ID {id_val}: {e}") failed_count += 1 # Commit all changes conn.commit() # Verify the changes cursor.execute(""" SELECT COUNT(*) FROM embedding_metadata WHERE key = 'timestamp' AND (int_value IS NULL OR float_value IS NULL OR string_value IS NULL) """) incomplete = cursor.fetchone()[0] cursor.execute(""" SELECT COUNT(*) FROM embedding_metadata WHERE key = 'timestamp' AND int_value IS NOT NULL AND float_value IS NOT NULL AND string_value IS NOT NULL """) complete = cursor.fetchone()[0] logger.info(f"Migration summary:") logger.info(f" - {processed_count} timestamp entries processed successfully") logger.info(f" - {failed_count} timestamp entries failed to process") logger.info(f" - {complete} timestamp entries now have values in all columns") logger.info(f" - {incomplete} timestamp entries still have NULL values in some columns") # Show some examples of remaining problematic entries if any if incomplete > 0: cursor.execute(""" SELECT id, key, string_value, int_value, float_value FROM embedding_metadata WHERE key = 'timestamp' AND (int_value IS NULL OR float_value IS NULL OR string_value IS NULL) LIMIT 5 """) problem_rows = cursor.fetchall() logger.info(f"Examples of incomplete entries: {problem_rows}") conn.close() return incomplete == 0 except Exception as e: logger.error(f"Error during SQLite migration: {e}") return False def main(): # Check if a database path was provided as a command-line argument if len(sys.argv) >= 2: db_path = sys.argv[1] else: # Try to automatically find the ChromaDB location db_path = find_claude_chroma_db() if not db_path: print("Could not automatically find Claude's ChromaDB location.") print("Please provide the path as a command-line argument:") print("python migrate_timestamps.py /path/to/chroma.sqlite3") sys.exit(1) print(f"Using database: {db_path}") success = migrate_timestamps_in_sqlite(db_path) if success: print("\n✅ Migration completed successfully!") print("All timestamps now have consistent values in all columns (int_value, float_value, and string_value).") sys.exit(0) else: print("\n⚠️ Migration completed with issues. Check the logs for details.") sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server