import_data.py•18.9 kB
#!/usr/bin/env python3
"""
Neo4j Data Import Script for MCP Standalone Initialization
This script imports seed data into the MCP standalone Neo4j database
and loads the schema to create a fully functional psychological intelligence system.
Usage:
    python import_data.py --seed-data seed_data.cypher --schema ../../graphs/lila-graph-schema-v8.json
"""
import os
import sys
import argparse
import json
import time
from pathlib import Path
from neo4j import GraphDatabase
from typing import Dict, Any
class Neo4jDataImporter:
    """Imports psychological intelligence data and schema into Neo4j for MCP standalone."""
    def __init__(self, uri: str, user: str, password: str, max_retries: int = 30):
        """Initialize Neo4j connection with retry logic."""
        self.uri = uri
        self.user = user
        self.password = password
        self.max_retries = max_retries
        self.driver = None
        self._connect_with_retry()
    def _connect_with_retry(self):
        """Connect to Neo4j with retry logic for container startup."""
        for attempt in range(self.max_retries):
            try:
                self.driver = GraphDatabase.driver(self.uri, auth=(self.user, self.password))
                # Test the connection
                with self.driver.session() as session:
                    session.run("RETURN 1")
                print(f"✓ Connected to Neo4j at {self.uri}")
                return
            except Exception as e:
                if attempt < self.max_retries - 1:
                    print(f"⏳ Neo4j not ready (attempt {attempt + 1}/{self.max_retries}), waiting...")
                    time.sleep(2)
                else:
                    raise Exception(f"Failed to connect to Neo4j after {self.max_retries} attempts: {e}")
    def close(self):
        """Close Neo4j connection."""
        if self.driver:
            self.driver.close()
    def clear_database(self):
        """Clear all existing data from the database."""
        print("🗑️ Clearing existing database...")
        with self.driver.session() as session:
            session.run("MATCH (n) DETACH DELETE n")
        print("✓ Database cleared")
    def load_schema(self, schema_path: Path):
        """Load schema constraints, indexes, and actual persona data from JSON file."""
        print(f"📋 Loading schema from {schema_path}")
        if not schema_path.exists():
            print(f"⚠️ Schema file not found: {schema_path}")
            return
        try:
            with open(schema_path, 'r') as f:
                schema = json.load(f)
            with self.driver.session() as session:
                # Create constraints for PersonaAgent
                constraints = [
                    "CREATE CONSTRAINT persona_id_unique IF NOT EXISTS FOR (p:PersonaAgent) REQUIRE p.persona_id IS UNIQUE",
                    "CREATE CONSTRAINT persona_name_unique IF NOT EXISTS FOR (p:PersonaAgent) REQUIRE p.name IS UNIQUE",
                ]
                # Create constraints for Memory
                constraints.extend([
                    "CREATE CONSTRAINT memory_id_unique IF NOT EXISTS FOR (m:Memory) REQUIRE m.memory_id IS UNIQUE",
                ])
                # Create constraints for Goal
                constraints.extend([
                    "CREATE CONSTRAINT goal_id_unique IF NOT EXISTS FOR (g:Goal) REQUIRE g.goal_id IS UNIQUE",
                ])
                # Create indexes for better performance
                indexes = [
                    "CREATE INDEX persona_attachment_style IF NOT EXISTS FOR (p:PersonaAgent) ON (p.attachment_style)",
                    "CREATE INDEX memory_type IF NOT EXISTS FOR (m:Memory) ON (m.memory_type)",
                    "CREATE INDEX goal_type IF NOT EXISTS FOR (g:Goal) ON (g.goal_type)",
                    "CREATE INDEX relationship_type IF NOT EXISTS FOR ()-[r:RELATIONSHIP]-() ON (r.relationship_type)",
                ]
                # Execute constraints
                for constraint in constraints:
                    try:
                        session.run(constraint)
                        print(f"✓ Created constraint: {constraint.split('FOR')[1].split('REQUIRE')[0].strip()}")
                    except Exception as e:
                        print(f"⚠️ Constraint creation failed (may already exist): {e}")
                # Execute indexes
                for index in indexes:
                    try:
                        session.run(index)
                        print(f"✓ Created index: {index.split('ON')[1].strip()}")
                    except Exception as e:
                        print(f"⚠️ Index creation failed (may already exist): {e}")
                # Load actual persona data from family_graph
                self._load_family_graph_data(schema, session)
        except Exception as e:
            print(f"❌ Schema loading failed: {e}")
    def _load_family_graph_data(self, schema: dict, session):
        """Load personas and relationships from family_graph JSON structure."""
        family_graph = schema.get("family_graph", {})
        nodes = family_graph.get("nodes", [])
        edges = family_graph.get("edges", [])
        if not nodes:
            print("⚠️ No family_graph nodes found in schema")
            return
        print(f"👥 Loading {len(nodes)} personas from schema...")
        # Create personas
        for node in nodes:
            try:
                persona_id = node['name'].lower()
                # Map behavioral style to Big Five traits
                personality = self._map_behavioral_to_bigfive(node.get('behavioral_style', ''))
                # Parse attachment style
                attachment_style = node['attachment_style'].lower().split()[0]  # "secure", "anxious", etc.
                create_query = """
                CREATE (p:PersonaAgent {
                    persona_id: $persona_id,
                    name: $name,
                    age: $age,
                    role: $role,
                    description: $description,
                    attachment_style: $attachment_style,
                    journey_stage: $journey_stage,
                    behavioral_style: $behavioral_style,
                    current_challenge: $current_challenge,
                    knowsAbout: $knowsAbout,
                    openness: $openness,
                    conscientiousness: $conscientiousness,
                    extraversion: $extraversion,
                    agreeableness: $agreeableness,
                    neuroticism: $neuroticism,
                    trust_level: 0.7,
                    communication_style: $communication_style,
                    created_at: datetime(),
                    updated_at: datetime()
                })
                """
                # Determine communication style from behavioral style
                comm_style = "empathetic" if "S" in node.get('behavioral_style', '') else "analytical"
                params = {
                    'persona_id': persona_id,
                    'name': node['name'],
                    'age': node['age'],
                    'role': node['role'],
                    'description': node['description'],
                    'attachment_style': attachment_style,
                    'journey_stage': node.get('journey_stage', ''),
                    'behavioral_style': node.get('behavioral_style', ''),
                    'current_challenge': node.get('current_challenge', ''),
                    'knowsAbout': node.get('knowsAbout', ''),
                    'openness': personality['openness'],
                    'conscientiousness': personality['conscientiousness'],
                    'extraversion': personality['extraversion'],
                    'agreeableness': personality['agreeableness'],
                    'neuroticism': personality['neuroticism'],
                    'communication_style': comm_style
                }
                session.run(create_query, params)
                print(f"  ✓ Created persona: {node['name']} ({persona_id})")
            except Exception as e:
                print(f"❌ Failed to create persona {node.get('name', 'unknown')}: {e}")
        # Create relationships
        if edges:
            print(f"🔗 Loading {len(edges)} relationships from schema...")
            for edge in edges:
                try:
                    from_id = edge['from'].lower()
                    to_id = edge['to'].lower()
                    # Create bidirectional relationship
                    rel_query = """
                    MATCH (p1:PersonaAgent {persona_id: $from_id}), (p2:PersonaAgent {persona_id: $to_id})
                    CREATE (p1)-[:RELATIONSHIP {
                        trust_level: $trust_level,
                        intimacy_level: $intimacy_level,
                        relationship_strength: $strength,
                        relationship_type: $rel_type,
                        emotional_valence: $emotional_valence,
                        interaction_count: 0,
                        created_at: datetime(),
                        updated_at: datetime()
                    }]->(p2)
                    """
                    # Parse relationship type
                    rel_type = "intimate" if "intimate" in edge.get('type', '') else "friendship"
                    params = {
                        'from_id': from_id,
                        'to_id': to_id,
                        'trust_level': float(edge.get('trust_level', 7.0)),
                        'intimacy_level': float(edge.get('strength', 7.0)),  # Using strength as intimacy proxy
                        'strength': float(edge.get('strength', 7.0)),
                        'rel_type': rel_type,
                        'emotional_valence': float(edge.get('union_metric', 7.0)) / 10.0  # Normalize to 0-1
                    }
                    session.run(rel_query, params)
                    print(f"  ✓ Created relationship: {edge['from']} ↔ {edge['to']}")
                except Exception as e:
                    print(f"❌ Failed to create relationship {edge.get('from', '')} → {edge.get('to', '')}: {e}")
    def _map_behavioral_to_bigfive(self, behavioral_style: str) -> dict:
        """Map DISC behavioral style to Big Five personality traits."""
        # Default values
        traits = {
            'openness': 0.5,
            'conscientiousness': 0.5,
            'extraversion': 0.5,
            'agreeableness': 0.5,
            'neuroticism': 0.3
        }
        if not behavioral_style:
            return traits
        style = behavioral_style.upper()
        # Map based on DISC components
        if 'D' in style:  # Dominance
            traits['extraversion'] += 0.2
            traits['agreeableness'] -= 0.1
            traits['openness'] += 0.1
        if 'I' in style:  # Influence
            traits['extraversion'] += 0.3
            traits['openness'] += 0.2
            traits['agreeableness'] += 0.1
        if 'S' in style:  # Steadiness
            traits['agreeableness'] += 0.3
            traits['conscientiousness'] += 0.2
            traits['neuroticism'] -= 0.1
        if 'C' in style:  # Conscientiousness
            traits['conscientiousness'] += 0.3
            traits['openness'] += 0.1
            traits['neuroticism'] += 0.1
        # Normalize to 0-1 range
        for key in traits:
            traits[key] = max(0.0, min(1.0, traits[key]))
        return traits
    def import_seed_data(self, seed_data_path: Path):
        """Import seed data from Cypher file."""
        print(f"📊 Importing seed data from {seed_data_path}")
        if not seed_data_path.exists():
            print(f"⚠️ Seed data file not found: {seed_data_path}")
            return
        try:
            with open(seed_data_path, 'r') as f:
                cypher_script = f.read()
            # Split into individual statements
            statements = [stmt.strip() for stmt in cypher_script.split(';') if stmt.strip()]
            with self.driver.session() as session:
                for i, statement in enumerate(statements):
                    if statement and not statement.startswith('//'):
                        try:
                            session.run(statement)
                            if i % 10 == 0:  # Progress indicator
                                print(f"  Executed {i + 1}/{len(statements)} statements...")
                        except Exception as e:
                            print(f"⚠️ Statement failed: {statement[:50]}... Error: {e}")
            print(f"✓ Imported {len(statements)} statements")
        except Exception as e:
            print(f"❌ Seed data import failed: {e}")
    def create_default_personas(self):
        """Create default personas if no data was imported."""
        print("👥 Creating default personas...")
        default_personas = [
            {
                "persona_id": "lila",
                "name": "Lila",
                "age": 28,
                "role": "AI Research Assistant",
                "description": "A curious and empathetic AI assistant focused on understanding human psychology and relationships.",
                "attachment_style": "secure",
                "openness": 0.85,
                "conscientiousness": 0.80,
                "extraversion": 0.70,
                "agreeableness": 0.90,
                "neuroticism": 0.25,
                "trust_level": 0.80,
                "communication_style": "empathetic",
            },
            {
                "persona_id": "alex",
                "name": "Alex",
                "age": 32,
                "role": "Software Engineer",
                "description": "A thoughtful and analytical software engineer who values deep connections and authentic communication.",
                "attachment_style": "secure",
                "openness": 0.75,
                "conscientiousness": 0.85,
                "extraversion": 0.60,
                "agreeableness": 0.75,
                "neuroticism": 0.30,
                "trust_level": 0.75,
                "communication_style": "analytical",
            }
        ]
        with self.driver.session() as session:
            for persona in default_personas:
                # Create persona
                props_list = []
                for key, value in persona.items():
                    if isinstance(value, str):
                        props_list.append(f"{key}: '{value}'")
                    else:
                        props_list.append(f"{key}: {value}")
                props_str = ", ".join(props_list)
                create_query = f"CREATE (:PersonaAgent {{{props_str}}})"
                session.run(create_query)
            # Create a relationship between them
            relationship_query = """
            MATCH (lila:PersonaAgent {persona_id: 'lila'}), (alex:PersonaAgent {persona_id: 'alex'})
            CREATE (lila)-[:RELATIONSHIP {
                trust_level: 0.70,
                intimacy_level: 0.60,
                relationship_strength: 0.65,
                interaction_count: 5,
                relationship_type: 'friendship',
                emotional_valence: 0.75
            }]->(alex)
            """
            session.run(relationship_query)
        print("✓ Created default personas (Lila and Alex) with relationship")
    def verify_import(self):
        """Verify that data was imported successfully."""
        print("🔍 Verifying import...")
        with self.driver.session() as session:
            # Count personas
            result = session.run("MATCH (p:PersonaAgent) RETURN count(p) as count")
            persona_count = result.single()["count"]
            # Count relationships
            result = session.run("MATCH ()-[r:RELATIONSHIP]->() RETURN count(r) as count")
            relationship_count = result.single()["count"]
            # Count memories
            result = session.run("MATCH (m:Memory) RETURN count(m) as count")
            memory_count = result.single()["count"]
            # Count goals
            result = session.run("MATCH (g:Goal) RETURN count(g) as count")
            goal_count = result.single()["count"]
            print(f"✅ Import verification:")
            print(f"   - {persona_count} personas")
            print(f"   - {relationship_count} relationships")
            print(f"   - {memory_count} memories")
            print(f"   - {goal_count} goals")
            return persona_count > 0
def main():
    """Main import function."""
    parser = argparse.ArgumentParser(description="Import data into MCP standalone Neo4j")
    parser.add_argument("--seed-data", default="seed_data.cypher",
                       help="Seed data Cypher file (default: seed_data.cypher)")
    parser.add_argument("--schema", default="graphs/lila-graph-schema-v8.json",
                       help="Schema JSON file (default: graphs/lila-graph-schema-v8.json)")
    parser.add_argument("--uri", default="bolt://localhost:7687",
                       help="Neo4j URI (default: bolt://localhost:7687)")
    parser.add_argument("--user", default="neo4j",
                       help="Neo4j username (default: neo4j)")
    parser.add_argument("--password",
                       help="Neo4j password (or set NEO4J_PASSWORD env var)")
    parser.add_argument("--create-defaults", action="store_true",
                       help="Create default personas if no seed data found")
    args = parser.parse_args()
    # Get password from argument or environment
    password = args.password or os.getenv("NEO4J_PASSWORD", "passw0rd")
    print(f"🚀 Initializing MCP Standalone Neo4j Database")
    print(f"🔗 Connecting to Neo4j at {args.uri}")
    try:
        importer = Neo4jDataImporter(args.uri, args.user, password)
        # Load schema first
        schema_path = Path(args.schema)
        importer.load_schema(schema_path)
        # Import seed data if available
        seed_data_path = Path(args.seed_data)
        if seed_data_path.exists():
            importer.import_seed_data(seed_data_path)
        elif args.create_defaults:
            print("📦 No seed data found, creating default personas...")
            importer.create_default_personas()
        else:
            print("⚠️ No seed data found and --create-defaults not specified")
        # Verify the import
        success = importer.verify_import()
        importer.close()
        if success:
            print("🎉 MCP Standalone database initialization complete!")
        else:
            print("⚠️ Database initialization completed but no data was imported")
    except Exception as e:
        print(f"❌ Import failed: {e}")
        sys.exit(1)
if __name__ == "__main__":
    main()