Skip to main content
Glama

MCP Memory Service

#!/usr/bin/env python3 # Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Import memories from JSON exports into SQLite-vec database. This script imports memories from one or more JSON export files into a central SQLite-vec database, handling deduplication and preserving original timestamps and metadata. """ import asyncio import sys import logging import argparse import json from pathlib import Path from datetime import datetime # Add project src to path project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root / "src")) from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage from mcp_memory_service.sync.importer import MemoryImporter from mcp_memory_service.config import SQLITE_VEC_PATH, STORAGE_BACKEND # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def get_default_db_path() -> Path: """Get the default database path for this platform.""" if STORAGE_BACKEND == 'sqlite_vec' and SQLITE_VEC_PATH: return Path(SQLITE_VEC_PATH) else: # Fallback to BASE_DIR if not using sqlite_vec backend from mcp_memory_service.config import BASE_DIR return Path(BASE_DIR) / "sqlite_vec.db" async def import_memories( json_files: list, db_path: Path, deduplicate: bool = True, add_source_tags: bool = True, dry_run: bool = False ): """Import memories from JSON files into database.""" logger.info(f"Starting memory import to {db_path}") logger.info(f"JSON files: {[str(f) for f in json_files]}") # Validate input files for json_file in json_files: if not json_file.exists(): logger.error(f"JSON file not found: {json_file}") return False # Quick validation of JSON format try: with open(json_file, 'r') as f: data = json.load(f) if "export_metadata" not in data or "memories" not in data: logger.error(f"Invalid export format in {json_file}") return False except Exception as e: logger.error(f"Error reading {json_file}: {str(e)}") return False try: # Initialize storage logger.info("Initializing SQLite-vec storage...") storage = SqliteVecMemoryStorage(str(db_path)) await storage.initialize() # Create importer importer = MemoryImporter(storage) # Show analysis first logger.info("Analyzing import files...") analysis = await importer.analyze_import(json_files) logger.info(f"Import Analysis:") logger.info(f" Total memories to process: {analysis['total_memories']}") logger.info(f" Unique memories: {analysis['unique_memories']}") logger.info(f" Potential duplicates: {analysis['potential_duplicates']}") logger.info(f" Import conflicts: {len(analysis['conflicts'])}") logger.info(f" Sources:") for source, stats in analysis['sources'].items(): logger.info(f" {source}: {stats['new_memories']}/{stats['total_memories']} new memories") if analysis['conflicts']: logger.warning(f"Found {len(analysis['conflicts'])} conflicts between import files") # Ask for confirmation if not dry run if not dry_run: logger.info("") response = input("Proceed with import? (y/N): ") if response.lower() != 'y': logger.info("Import cancelled by user") return False # Perform import logger.info(f"{'[DRY RUN] ' if dry_run else ''}Starting import...") result = await importer.import_from_json( json_files=json_files, deduplicate=deduplicate, add_source_tags=add_source_tags, dry_run=dry_run ) # Show results logger.info(f"Import {'simulation ' if dry_run else ''}completed!") logger.info(f" Files processed: {result['files_processed']}") logger.info(f" Total processed: {result['total_processed']}") logger.info(f" Successfully imported: {result['imported']}") logger.info(f" Duplicates skipped: {result['duplicates_skipped']}") logger.info(f" Errors: {result['errors']}") logger.info(f" Source breakdown:") for source, stats in result['sources'].items(): logger.info(f" {source}: {stats['imported']}/{stats['total']} imported, {stats['duplicates']} duplicates") if not dry_run and result['imported'] > 0: # Show next steps logger.info("") logger.info("Next steps:") logger.info("1. Verify the imported memories using the web interface or API") logger.info("2. Set up Litestream for ongoing synchronization") logger.info("3. Configure replica nodes to sync from this central database") return result['errors'] == 0 except Exception as e: logger.error(f"Import failed: {str(e)}") return False async def main(): """Main function.""" parser = argparse.ArgumentParser( description="Import memories from JSON exports into SQLite-vec database", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Import single JSON file python import_memories.py windows_export.json # Import multiple JSON files python import_memories.py windows_export.json macbook_export.json # Import to specific database python import_memories.py --db-path /path/to/sqlite_vec.db exports/*.json # Dry run to see what would be imported python import_memories.py --dry-run exports/*.json # Import without deduplication (allow duplicates) python import_memories.py --no-deduplicate exports/*.json # Import without adding source tags python import_memories.py --no-source-tags exports/*.json """ ) parser.add_argument( "json_files", nargs="+", type=Path, help="JSON export files to import" ) parser.add_argument( "--db-path", type=Path, default=get_default_db_path(), help=f"Path to SQLite-vec database (default: {get_default_db_path()})" ) parser.add_argument( "--dry-run", action="store_true", help="Analyze imports without actually storing data" ) parser.add_argument( "--no-deduplicate", action="store_true", help="Allow duplicate memories (don't skip based on content hash)" ) parser.add_argument( "--no-source-tags", action="store_true", help="Don't add source machine tags to imported memories" ) parser.add_argument( "--verbose", action="store_true", help="Enable verbose logging" ) args = parser.parse_args() # Set logging level if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Show configuration logger.info("Memory Import Configuration:") logger.info(f" Database: {args.db_path}") logger.info(f" JSON files: {[str(f) for f in args.json_files]}") logger.info(f" Dry run: {args.dry_run}") logger.info(f" Deduplicate: {not args.no_deduplicate}") logger.info(f" Add source tags: {not args.no_source_tags}") logger.info("") # Validate JSON files exist missing_files = [f for f in args.json_files if not f.exists()] if missing_files: logger.error(f"Missing JSON files: {missing_files}") sys.exit(1) # Run import success = await import_memories( json_files=args.json_files, db_path=args.db_path, deduplicate=not args.no_deduplicate, add_source_tags=not args.no_source_tags, dry_run=args.dry_run ) sys.exit(0 if success else 1) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server