Skip to main content
Glama

MCP Memory Service

#!/usr/bin/env python3 """ Check for memories without timestamps in the MCP memory service database. This script analyzes the storage backend for entries missing timestamp data. """ import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) import asyncio import json from typing import List, Dict, Any from datetime import datetime from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage from mcp_memory_service.models.memory import Memory class TimestampAnalyzer: """Analyze memory database for missing timestamp entries.""" def __init__(self, storage_backend: str = "sqlite_vec", db_path: str = None): self.storage_backend = storage_backend self.db_path = db_path or os.path.expanduser("~/.mcp_memory_service/storage.db") self.storage = None async def setup(self): """Initialize storage backend.""" print(f"=== Analyzing {self.storage_backend} database for timestamp issues ===") print(f"Database path: {self.db_path}") if not os.path.exists(self.db_path): print(f"❌ Database file not found: {self.db_path}") print("Possible locations:") common_paths = [ os.path.expanduser("~/.mcp_memory_service/storage.db"), os.path.expanduser("~/.mcp_memory_service/memory.db"), "storage.db", "memory.db" ] for path in common_paths: if os.path.exists(path): print(f" ✅ Found: {path}") else: print(f" ❌ Not found: {path}") return False try: self.storage = SqliteVecMemoryStorage( db_path=self.db_path, embedding_model="all-MiniLM-L6-v2" ) await self.storage.initialize() print("✅ Storage initialized successfully") return True except Exception as e: print(f"❌ Failed to initialize storage: {e}") return False async def get_all_memories(self) -> List[Memory]: """Retrieve all memories from the database.""" try: # SQLite-Vec has a limit of 4096 for k in knn queries, so use direct database access return await self.get_memories_direct_query() except Exception as e: print(f"❌ Error with direct query, trying search approach: {e}") return await self.get_memories_via_search() async def get_memories_direct_query(self) -> List[Memory]: """Get all memories using direct database queries.""" import sqlite3 memories = [] try: # Connect directly to SQLite database conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row # Get all memory records cursor = conn.execute(""" SELECT id, content, content_hash, tags, memory_type, created_at, created_at_iso, updated_at, updated_at_iso, metadata FROM memories ORDER BY created_at DESC """) rows = cursor.fetchall() print(f"📊 Found {len(rows)} memory records in database") for i, row in enumerate(rows): try: # Safely parse JSON fields tags = [] if row['tags']: try: tags = json.loads(row['tags']) except (json.JSONDecodeError, TypeError): tags = [] metadata = {} if row['metadata']: try: metadata = json.loads(row['metadata']) except (json.JSONDecodeError, TypeError): metadata = {} # Reconstruct Memory object from database row memory_dict = { 'content': row['content'] or '', 'content_hash': row['content_hash'] or '', 'tags': tags, 'memory_type': row['memory_type'] or 'unknown', 'created_at': row['created_at'], 'created_at_iso': row['created_at_iso'], 'updated_at': row['updated_at'], 'updated_at_iso': row['updated_at_iso'], 'metadata': metadata } memory = Memory.from_dict(memory_dict) memories.append(memory) except Exception as e: print(f"⚠️ Error processing memory {i+1}: {e}") # Continue processing other memories continue conn.close() return memories except Exception as e: print(f"❌ Direct query failed: {e}") if 'conn' in locals(): conn.close() return [] async def get_memories_via_search(self) -> List[Memory]: """Get memories using search with smaller batches.""" memories = [] try: # Try different search approaches with smaller limits search_queries = ["", "memory", "note", "session"] for query in search_queries: try: results = await self.storage.retrieve(query, n_results=1000) # Well under 4096 limit batch_memories = [result.memory for result in results] # Deduplicate based on content_hash existing_hashes = {m.content_hash for m in memories} new_memories = [m for m in batch_memories if m.content_hash not in existing_hashes] memories.extend(new_memories) print(f"📊 Query '{query}': {len(batch_memories)} results, {len(new_memories)} new") except Exception as e: print(f"⚠️ Query '{query}' failed: {e}") continue print(f"📊 Total unique memories retrieved: {len(memories)}") return memories except Exception as e: print(f"❌ All search approaches failed: {e}") return [] def analyze_timestamp_fields(self, memories: List[Memory]) -> Dict[str, Any]: """Analyze timestamp fields across all memories.""" analysis = { "total_memories": len(memories), "missing_created_at": 0, "missing_created_at_iso": 0, "missing_both_timestamps": 0, "invalid_timestamps": 0, "problematic_memories": [], "timestamp_formats": set(), "timestamp_range": {"earliest": None, "latest": None} } for memory in memories: has_created_at = memory.created_at is not None has_created_at_iso = memory.created_at_iso is not None # Track missing timestamp fields if not has_created_at: analysis["missing_created_at"] += 1 if not has_created_at_iso: analysis["missing_created_at_iso"] += 1 if not has_created_at and not has_created_at_iso: analysis["missing_both_timestamps"] += 1 analysis["problematic_memories"].append({ "content_hash": memory.content_hash, "content_preview": memory.content[:100] + "..." if len(memory.content) > 100 else memory.content, "tags": memory.tags, "memory_type": memory.memory_type, "issue": "missing_both_timestamps" }) # Track timestamp formats and ranges if has_created_at_iso: analysis["timestamp_formats"].add(type(memory.created_at_iso).__name__) if has_created_at: try: if analysis["timestamp_range"]["earliest"] is None or memory.created_at < analysis["timestamp_range"]["earliest"]: analysis["timestamp_range"]["earliest"] = memory.created_at if analysis["timestamp_range"]["latest"] is None or memory.created_at > analysis["timestamp_range"]["latest"]: analysis["timestamp_range"]["latest"] = memory.created_at except: analysis["invalid_timestamps"] += 1 analysis["problematic_memories"].append({ "content_hash": memory.content_hash, "content_preview": memory.content[:100] + "..." if len(memory.content) > 100 else memory.content, "created_at": str(memory.created_at), "issue": "invalid_timestamp" }) # Convert set to list for JSON serialization analysis["timestamp_formats"] = list(analysis["timestamp_formats"]) return analysis def print_analysis_report(self, analysis: Dict[str, Any]): """Print a detailed analysis report.""" print("\n" + "="*70) print("TIMESTAMP ANALYSIS REPORT") print("="*70) total = analysis["total_memories"] print(f"\n📊 OVERVIEW:") print(f" Total memories analyzed: {total}") print(f" Missing created_at (float): {analysis['missing_created_at']}") print(f" Missing created_at_iso (ISO string): {analysis['missing_created_at_iso']}") print(f" Missing both timestamps: {analysis['missing_both_timestamps']}") print(f" Invalid timestamp values: {analysis['invalid_timestamps']}") if total > 0: print(f"\n📈 PERCENTAGES:") print(f" Missing created_at: {analysis['missing_created_at']/total*100:.1f}%") print(f" Missing created_at_iso: {analysis['missing_created_at_iso']/total*100:.1f}%") print(f" Missing both: {analysis['missing_both_timestamps']/total*100:.1f}%") print(f" Invalid timestamps: {analysis['invalid_timestamps']/total*100:.1f}%") print(f"\n🕐 TIMESTAMP RANGE:") if analysis["timestamp_range"]["earliest"] and analysis["timestamp_range"]["latest"]: earliest = datetime.fromtimestamp(analysis["timestamp_range"]["earliest"]) latest = datetime.fromtimestamp(analysis["timestamp_range"]["latest"]) print(f" Earliest: {earliest} ({analysis['timestamp_range']['earliest']})") print(f" Latest: {latest} ({analysis['timestamp_range']['latest']})") else: print(" No valid timestamps found") print(f"\n📝 TIMESTAMP FORMATS DETECTED:") for fmt in analysis["timestamp_formats"]: print(f" - {fmt}") if analysis["problematic_memories"]: print(f"\n⚠️ PROBLEMATIC MEMORIES ({len(analysis['problematic_memories'])}):") for i, memory in enumerate(analysis["problematic_memories"][:10]): # Show first 10 print(f" {i+1}. Issue: {memory['issue']}") print(f" Content: {memory['content_preview']}") print(f" Hash: {memory['content_hash']}") if 'tags' in memory: print(f" Tags: {memory.get('tags', [])}") print() if len(analysis["problematic_memories"]) > 10: print(f" ... and {len(analysis['problematic_memories']) - 10} more") # Health assessment print(f"\n🏥 DATABASE HEALTH ASSESSMENT:") if analysis["missing_both_timestamps"] == 0: print(" ✅ EXCELLENT: All memories have at least one timestamp field") elif analysis["missing_both_timestamps"] < total * 0.1: print(f" ⚠️ GOOD: Only {analysis['missing_both_timestamps']} memories missing all timestamps") elif analysis["missing_both_timestamps"] < total * 0.5: print(f" ⚠️ CONCERNING: {analysis['missing_both_timestamps']} memories missing all timestamps") else: print(f" ❌ CRITICAL: {analysis['missing_both_timestamps']} memories missing all timestamps") if analysis["missing_created_at"] > 0 or analysis["missing_created_at_iso"] > 0: print(" 💡 RECOMMENDATION: Run timestamp migration script to fix missing fields") async def run_analysis(self): """Run the complete timestamp analysis.""" if not await self.setup(): return False memories = await self.get_all_memories() if not memories: print("⚠️ No memories found in database") return False analysis = self.analyze_timestamp_fields(memories) self.print_analysis_report(analysis) # Save detailed report to file report_file = "timestamp_analysis_report.json" with open(report_file, 'w') as f: # Convert any datetime objects to strings for JSON serialization json_analysis = analysis.copy() if json_analysis["timestamp_range"]["earliest"]: json_analysis["timestamp_range"]["earliest_iso"] = datetime.fromtimestamp(json_analysis["timestamp_range"]["earliest"]).isoformat() if json_analysis["timestamp_range"]["latest"]: json_analysis["timestamp_range"]["latest_iso"] = datetime.fromtimestamp(json_analysis["timestamp_range"]["latest"]).isoformat() json.dump(json_analysis, f, indent=2, default=str) print(f"\n📄 Detailed report saved to: {report_file}") return analysis["missing_both_timestamps"] == 0 async def main(): """Main analysis execution.""" import argparse parser = argparse.ArgumentParser(description="Check for memories without timestamps") parser.add_argument("--db-path", help="Path to database file") parser.add_argument("--storage", default="sqlite_vec", choices=["sqlite_vec"], help="Storage backend to analyze") args = parser.parse_args() analyzer = TimestampAnalyzer( storage_backend=args.storage, db_path=args.db_path ) success = await analyzer.run_analysis() return 0 if success else 1 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server