Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
transaction.go9.73 kB
// Package cypher - Transaction support for Cypher queries. // // Implements BEGIN/COMMIT/ROLLBACK for Neo4j-compatible transaction control. package cypher import ( "context" "fmt" "strings" "github.com/orneryd/nornicdb/pkg/storage" ) // TransactionContext holds the active transaction for a Cypher session. type TransactionContext struct { tx interface{} // *storage.BadgerTransaction (MemoryEngine now wraps BadgerEngine) engine storage.Engine active bool } // parseTransactionStatement checks if query is BEGIN/COMMIT/ROLLBACK. func (e *StorageExecutor) parseTransactionStatement(cypher string) (*ExecuteResult, error) { upper := strings.ToUpper(strings.TrimSpace(cypher)) switch { case upper == "BEGIN" || upper == "BEGIN TRANSACTION": return e.handleBegin() case upper == "COMMIT" || upper == "COMMIT TRANSACTION": return e.handleCommit() case upper == "ROLLBACK" || upper == "ROLLBACK TRANSACTION": return e.handleRollback() default: return nil, nil // Not a transaction statement } } // handleBegin starts a new explicit transaction. func (e *StorageExecutor) handleBegin() (*ExecuteResult, error) { if e.txContext != nil && e.txContext.active { return nil, fmt.Errorf("transaction already active") } // Unwrap AsyncEngine and WALEngine to get underlying engine for transactions engine := e.storage if asyncEngine, ok := engine.(*storage.AsyncEngine); ok { engine = asyncEngine.GetEngine() } if walEngine, ok := engine.(*storage.WALEngine); ok { engine = walEngine.GetEngine() } // Start transaction based on engine type // Both BadgerEngine and MemoryEngine (which wraps BadgerEngine) use BadgerTransaction switch eng := engine.(type) { case *storage.BadgerEngine: tx, err := eng.BeginTransaction() if err != nil { return nil, fmt.Errorf("failed to start transaction: %w", err) } e.txContext = &TransactionContext{ tx: tx, engine: eng, active: true, } case *storage.MemoryEngine: tx, err := eng.BeginTransaction() if err != nil { return nil, fmt.Errorf("failed to start transaction: %w", err) } e.txContext = &TransactionContext{ tx: tx, engine: eng, active: true, } default: return nil, fmt.Errorf("engine does not support transactions") } return &ExecuteResult{ Columns: []string{"status"}, Rows: [][]interface{}{{"Transaction started"}}, }, nil } // handleCommit commits the active transaction. func (e *StorageExecutor) handleCommit() (*ExecuteResult, error) { if e.txContext == nil || !e.txContext.active { return nil, fmt.Errorf("no active transaction") } // Commit based on transaction type // All engines now use BadgerTransaction (MemoryEngine wraps BadgerEngine) var err error switch tx := e.txContext.tx.(type) { case *storage.BadgerTransaction: err = tx.Commit() default: return nil, fmt.Errorf("unknown transaction type") } e.txContext.active = false e.txContext = nil if err != nil { return nil, fmt.Errorf("commit failed: %w", err) } return &ExecuteResult{ Columns: []string{"status"}, Rows: [][]interface{}{{"Transaction committed"}}, }, nil } // handleRollback rolls back the active transaction. func (e *StorageExecutor) handleRollback() (*ExecuteResult, error) { if e.txContext == nil || !e.txContext.active { return nil, fmt.Errorf("no active transaction") } // Rollback based on transaction type // All engines now use BadgerTransaction (MemoryEngine wraps BadgerEngine) var err error switch tx := e.txContext.tx.(type) { case *storage.BadgerTransaction: err = tx.Rollback() default: return nil, fmt.Errorf("unknown transaction type") } e.txContext.active = false e.txContext = nil if err != nil { return nil, fmt.Errorf("rollback failed: %w", err) } return &ExecuteResult{ Columns: []string{"status"}, Rows: [][]interface{}{{"Transaction rolled back"}}, }, nil } // executeInTransaction executes a query within the active transaction. func (e *StorageExecutor) executeInTransaction(ctx context.Context, cypher string, upperQuery string) (*ExecuteResult, error) { // Temporarily swap storage with transaction for scoped operations originalStorage := e.storage // All engines now use BadgerTransaction (MemoryEngine wraps BadgerEngine) if _, ok := e.txContext.tx.(*storage.BadgerTransaction); ok { // Use transaction-scoped operations // For now, execute against original storage (limitation) // Full implementation would use a transaction-aware storage adapter result, err := e.executeQueryAgainstStorage(ctx, cypher, upperQuery) e.storage = originalStorage return result, err } return nil, fmt.Errorf("unknown transaction type") } // executeQueryAgainstStorage executes query with current storage context. func (e *StorageExecutor) executeQueryAgainstStorage(ctx context.Context, cypher string, upperQuery string) (*ExecuteResult, error) { // Route query to appropriate handler // upperQuery is passed in to avoid redundant conversion upper := upperQuery // Normalize whitespace for compound query detection normalizedUpper := strings.ReplaceAll(strings.ReplaceAll(upper, "\n", " "), "\t", " ") // Check for CREATE...WITH...DELETE pattern first (special handling) if strings.HasPrefix(upper, "CREATE") && findKeywordIndex(cypher, "WITH") > 0 && findKeywordIndex(cypher, "DELETE") > 0 { return e.executeCompoundCreateWithDelete(ctx, cypher) } // Check for MATCH...CREATE...WITH...DELETE pattern (special handling) // This MUST come before the generic DELETE check below if strings.HasPrefix(upper, "MATCH") && findKeywordIndex(cypher, "CREATE") > 0 && findKeywordIndex(cypher, "WITH") > 0 && findKeywordIndex(cypher, "DELETE") > 0 { return e.executeCompoundMatchCreate(ctx, cypher) } // Check for DELETE queries (MATCH...DELETE, DETACH DELETE) // But NOT if it's a MATCH...CREATE...DELETE pattern (handled above) hasCreate := findKeywordIndex(cypher, "CREATE") > 0 if !hasCreate && (strings.Contains(normalizedUpper, " DELETE ") || strings.HasSuffix(normalizedUpper, " DELETE") || strings.Contains(normalizedUpper, "DETACH DELETE")) { return e.executeDelete(ctx, cypher) } // Check for SET queries (MATCH...SET but NOT MERGE...SET which is handled by executeMerge) // Also exclude ON CREATE SET / ON MATCH SET from MERGE // IMPORTANT: Also exclude compound MATCH...MERGE...SET queries which are handled by executeCompoundMatchMerge if strings.Contains(normalizedUpper, " SET ") && !strings.HasPrefix(upper, "MERGE") && !strings.Contains(normalizedUpper, "ON CREATE SET") && !strings.Contains(normalizedUpper, "ON MATCH SET") && findKeywordIndex(cypher, "MERGE") <= 0 { return e.executeSet(ctx, cypher) } // Check for REMOVE queries (MATCH...REMOVE) if strings.Contains(normalizedUpper, " REMOVE ") { return e.executeRemove(ctx, cypher) } switch { case strings.HasPrefix(upper, "CREATE CONSTRAINT"), strings.HasPrefix(upper, "CREATE FULLTEXT INDEX"), strings.HasPrefix(upper, "CREATE VECTOR INDEX"), strings.HasPrefix(upper, "CREATE INDEX"): // Schema commands - constraints and indexes return e.executeSchemaCommand(ctx, cypher) case strings.HasPrefix(upper, "CREATE"): return e.executeCreate(ctx, cypher) case strings.HasPrefix(upper, "MATCH"): // Check for shortestPath queries first if isShortestPathQuery(cypher) { query, err := e.parseShortestPathQuery(cypher) if err != nil { return nil, err } return e.executeShortestPathQuery(query) } // Check for compound MATCH...OPTIONAL MATCH queries if findKeywordIndex(cypher, "OPTIONAL MATCH") > 0 { return e.executeCompoundMatchOptionalMatch(ctx, cypher) } // Check for compound MATCH...CREATE queries if findKeywordIndex(cypher, "CREATE") > 0 { return e.executeCompoundMatchCreate(ctx, cypher) } // Check for compound MATCH...MERGE queries if findKeywordIndex(cypher, "MERGE") > 0 { return e.executeCompoundMatchMerge(ctx, cypher) } return e.executeMatch(ctx, cypher) case strings.HasPrefix(upper, "OPTIONAL MATCH"): return e.executeOptionalMatch(ctx, cypher) case strings.HasPrefix(upper, "MERGE"): return e.executeMerge(ctx, cypher) case strings.HasPrefix(upper, "DELETE"), strings.HasPrefix(upper, "DETACH DELETE"): return e.executeDelete(ctx, cypher) case strings.HasPrefix(upper, "SET"): return e.executeSet(ctx, cypher) case strings.HasPrefix(upper, "RETURN"): return e.executeReturn(ctx, cypher) case strings.HasPrefix(upper, "CALL"): return e.executeCall(ctx, cypher) case strings.HasPrefix(upper, "SHOW"): // Handle SHOW commands (indexes, constraints, procedures, etc.) switch { case strings.HasPrefix(upper, "SHOW INDEX"): return e.executeShowIndexes(ctx, cypher) case strings.HasPrefix(upper, "SHOW CONSTRAINT"): return e.executeShowConstraints(ctx, cypher) case strings.HasPrefix(upper, "SHOW PROCEDURE"): return e.executeShowProcedures(ctx, cypher) case strings.HasPrefix(upper, "SHOW FUNCTION"): return e.executeShowFunctions(ctx, cypher) case strings.HasPrefix(upper, "SHOW DATABASE"): return e.executeShowDatabase(ctx, cypher) default: return nil, fmt.Errorf("unsupported SHOW command in transaction: %s", cypher) } case strings.HasPrefix(upper, "DROP"): // DROP INDEX/CONSTRAINT - treat as no-op (NornicDB manages indexes internally) return &ExecuteResult{Columns: []string{}, Rows: [][]interface{}{}}, nil case strings.HasPrefix(upper, "UNWIND"): return e.executeUnwind(ctx, cypher) case strings.HasPrefix(upper, "WITH"): return e.executeWith(ctx, cypher) case strings.HasPrefix(upper, "FOREACH"): return e.executeForeach(ctx, cypher) default: return nil, fmt.Errorf("unsupported query type in transaction: %s", cypher) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server