migrate_to_postgres.py•4.93 kB
"""
Migration script to move from SQLite to PostgreSQL memory system
"""
import asyncio
import json
import sys
from pathlib import Path
from chatmemorysystem import ChatMemorySystem
from postgres_memory_system import PostgresChatMemorySystem
async def migrate_sqlite_to_postgres(sqlite_db_path: str = "chat_memory.db", postgres_config_key: str = "db3"):
"""
Migrate data from SQLite ChatMemorySystem to PostgreSQL
Args:
sqlite_db_path: Path to SQLite database file
postgres_config_key: Key from config.py for PostgreSQL connection
"""
print("Starting migration from SQLite to PostgreSQL...")
# Check if SQLite database exists
if not Path(sqlite_db_path).exists():
print(f"SQLite database {sqlite_db_path} not found. Nothing to migrate.")
return
try:
# Initialize systems
print("Initializing SQLite memory system...")
sqlite_memory = ChatMemorySystem(sqlite_db_path)
print("Initializing PostgreSQL memory system...")
postgres_memory = PostgresChatMemorySystem(postgres_config_key)
await postgres_memory.initialize()
# Get all conversation summaries from SQLite
print("Retrieving conversation list from SQLite...")
conversations = await sqlite_memory.get_conversation_list(limit=1000)
print(f"Found {len(conversations)} conversations to migrate")
# Migrate each conversation
for i, conversation in enumerate(conversations, 1):
conversation_id = conversation.conversation_id
print(f"Migrating conversation {i}/{len(conversations)}: {conversation_id}")
# Get all memories for this conversation
memories = await sqlite_memory.get_memory(conversation_id, limit=None)
# Store each memory in PostgreSQL
for memory in memories:
await postgres_memory.store_memory(
conversation_id=memory.conversation_id,
content=memory.content,
metadata=memory.metadata,
role=memory.role,
importance=memory.importance
)
print(f" Migrated {len(memories)} memories")
print("Migration completed successfully!")
# Verify migration
print("\nVerifying migration...")
postgres_conversations = await postgres_memory.get_conversation_list(limit=1000)
print(f"PostgreSQL now contains {len(postgres_conversations)} conversations")
# Close connections
await postgres_memory.close()
print("\nMigration verification complete!")
except Exception as e:
print(f"Migration failed: {e}")
sys.exit(1)
async def test_postgres_connection(postgres_config_key: str = "db3"):
"""Test PostgreSQL connection"""
print("Testing PostgreSQL connection...")
try:
postgres_memory = PostgresChatMemorySystem(postgres_config_key)
await postgres_memory.initialize()
# Test basic operations
test_conv_id = "migration_test"
# Store a test memory
memory_id = await postgres_memory.store_memory(
conversation_id=test_conv_id,
content="Test message for migration verification",
role="user"
)
# Retrieve it
memories = await postgres_memory.get_memory(test_conv_id)
if memories and len(memories) > 0:
print("✅ PostgreSQL connection and basic operations working!")
# Clean up test data
await postgres_memory.delete_conversation(test_conv_id)
print("✅ Test data cleaned up")
else:
print("❌ Failed to retrieve test memory")
await postgres_memory.close()
except Exception as e:
print(f"❌ PostgreSQL connection test failed: {e}")
sys.exit(1)
if __name__ == "__main__":
print("=== PostgreSQL Migration Tool ===\n")
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "test":
# Test connection only
asyncio.run(test_postgres_connection())
elif command == "migrate":
# Run full migration
sqlite_path = sys.argv[2] if len(sys.argv) > 2 else "chat_memory.db"
postgres_key = sys.argv[3] if len(sys.argv) > 3 else "db3"
asyncio.run(migrate_sqlite_to_postgres(sqlite_path, postgres_key))
else:
print("Unknown command. Use 'test' or 'migrate'")
else:
print("Usage:")
print(" python migrate_to_postgres.py test # Test PostgreSQL connection")
print(" python migrate_to_postgres.py migrate [db_path] [key] # Migrate from SQLite")
print("")
print("Examples:")
print(" python migrate_to_postgres.py test")
print(" python migrate_to_postgres.py migrate chat_memory.db db3")