Skip to main content
Glama

MCP Memory Service

#!/usr/bin/env python3 # Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Export memories from SQLite-vec database to JSON format. This script exports all memories from a local SQLite-vec database, preserving timestamps, metadata, and adding source tracking for multi-machine synchronization. """ import asyncio import sys import logging import argparse import platform from pathlib import Path from datetime import datetime # Add project src to path project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root / "src")) from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage from mcp_memory_service.sync.exporter import MemoryExporter from mcp_memory_service.config import SQLITE_VEC_PATH, STORAGE_BACKEND # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def get_default_db_path() -> Path: """Get the default database path for this platform.""" if STORAGE_BACKEND == 'sqlite_vec' and SQLITE_VEC_PATH: return Path(SQLITE_VEC_PATH) else: # Fallback to BASE_DIR if not using sqlite_vec backend from mcp_memory_service.config import BASE_DIR return Path(BASE_DIR) / "sqlite_vec.db" def get_default_output_filename() -> str: """Generate a default output filename based on machine and timestamp.""" machine_name = platform.node().lower().replace(' ', '-') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') return f"{machine_name}_memories_export_{timestamp}.json" async def export_memories( db_path: Path, output_file: Path, include_embeddings: bool = False, filter_tags: list = None ): """Export memories from database to JSON file.""" logger.info(f"Starting memory export from {db_path}") # Check if database exists if not db_path.exists(): logger.error(f"Database not found at {db_path}") logger.info("Available storage locations:") logger.info(f" Default: {get_default_db_path()}") return False try: # Initialize storage logger.info("Initializing SQLite-vec storage...") storage = SqliteVecMemoryStorage(str(db_path)) await storage.initialize() # Create exporter exporter = MemoryExporter(storage) # Show summary first logger.info("Analyzing database...") summary = await exporter.export_summary() logger.info(f"Database analysis:") logger.info(f" Total memories: {summary['total_memories']}") logger.info(f" Machine: {summary['machine_name']}") logger.info(f" Date range: {summary['date_range']['earliest']} to {summary['date_range']['latest']}") logger.info(f" Memory types: {summary['memory_types']}") logger.info(f" Top tags: {list(summary['tag_counts'].items())[:5]}") logger.info(f" Estimated size: {summary['estimated_json_size_mb']:.1f} MB") # Perform export logger.info(f"Exporting to {output_file}...") result = await exporter.export_to_json( output_file=output_file, include_embeddings=include_embeddings, filter_tags=filter_tags ) if result["success"]: logger.info("Export completed successfully!") logger.info(f" Exported: {result['exported_count']} memories") logger.info(f" Output file: {result['output_file']}") logger.info(f" File size: {result['file_size_bytes'] / 1024 / 1024:.2f} MB") logger.info(f" Source machine: {result['source_machine']}") # Show next steps logger.info("") logger.info("Next steps:") logger.info("1. Transfer this JSON file to your central server") logger.info("2. Run import_memories.py on the central server") logger.info("3. Set up Litestream for ongoing synchronization") return True else: logger.error("Export failed") return False except Exception as e: logger.error(f"Export failed: {str(e)}") return False async def main(): """Main function.""" parser = argparse.ArgumentParser( description="Export memories from SQLite-vec database to JSON", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Export all memories with default settings python export_memories.py # Export from specific database python export_memories.py --db-path /path/to/sqlite_vec.db # Export to specific file python export_memories.py --output my_export.json # Export only memories with specific tags python export_memories.py --filter-tags claude-code,architecture # Include embedding vectors (increases file size significantly) python export_memories.py --include-embeddings """ ) parser.add_argument( "--db-path", type=Path, default=get_default_db_path(), help=f"Path to SQLite-vec database (default: {get_default_db_path()})" ) parser.add_argument( "--output", type=Path, default=get_default_output_filename(), help=f"Output JSON file (default: {get_default_output_filename()})" ) parser.add_argument( "--include-embeddings", action="store_true", help="Include embedding vectors in export (increases file size)" ) parser.add_argument( "--filter-tags", nargs="*", help="Only export memories with these tags" ) parser.add_argument( "--verbose", action="store_true", help="Enable verbose logging" ) args = parser.parse_args() # Set logging level if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Show configuration logger.info("Memory Export Configuration:") logger.info(f" Database: {args.db_path}") logger.info(f" Output: {args.output}") logger.info(f" Include embeddings: {args.include_embeddings}") logger.info(f" Filter tags: {args.filter_tags}") logger.info(f" Platform: {platform.system()} {platform.release()}") logger.info("") # Run export success = await export_memories( db_path=args.db_path, output_file=args.output, include_embeddings=args.include_embeddings, filter_tags=args.filter_tags ) sys.exit(0 if success else 1) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server