Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
replicator.go11.3 kB
package replication import ( "context" "errors" "sync" "time" ) // Errors returned by replication operations. var ( // ErrNotLeader is returned when a write is attempted on a non-leader node. ErrNotLeader = errors.New("not leader") // ErrNoLeader is returned when no leader is available in the cluster. ErrNoLeader = errors.New("no leader available") // ErrTimeout is returned when an operation times out. ErrTimeout = errors.New("operation timed out") // ErrClosed is returned when operating on a closed replicator. ErrClosed = errors.New("replicator is closed") // ErrStandbyMode is returned when writes are attempted on a standby node. ErrStandbyMode = errors.New("node is in standby mode") // ErrNotReady is returned when the replicator hasn't finished initialization. ErrNotReady = errors.New("replicator not ready") ) // Replicator is the unified interface for all replication modes. // It abstracts the complexity of different replication strategies behind // a simple interface that the rest of NornicDB uses. // // The Replicator handles: // - Write routing (to leader) // - Read routing (based on consistency level) // - Health monitoring // - Failover orchestration // // Example: // // replicator, _ := replication.NewReplicator(config, storage) // replicator.Start(ctx) // defer replicator.Shutdown() // // // All writes go through replicator // if err := replicator.Apply(cmd); err != nil { // if errors.Is(err, replication.ErrNotLeader) { // // Forward to leader // } // } type Replicator interface { // Start initializes and starts the replicator. // This should be called after the storage engine is ready. Start(ctx context.Context) error // Apply applies a write command to the cluster. // Returns ErrNotLeader if this node cannot accept writes. // The command is replicated according to the replication mode. Apply(cmd *Command, timeout time.Duration) error // ApplyBatch applies multiple commands atomically. ApplyBatch(cmds []*Command, timeout time.Duration) error // IsLeader returns true if this node can accept writes. // For standalone mode, this always returns true. // For HA standby, this returns true only on primary. // For Raft, this returns true only on the Raft leader. IsLeader() bool // LeaderAddr returns the address of the current leader. // Returns empty string if unknown or in standalone mode. LeaderAddr() string // LeaderID returns the ID of the current leader. LeaderID() string // Health returns the current health status of the replicator. Health() *HealthStatus // WaitForLeader blocks until a leader is elected or context is cancelled. WaitForLeader(ctx context.Context) error // Shutdown gracefully shuts down the replicator. // This should be called before closing the storage engine. Shutdown() error // Mode returns the current replication mode. Mode() ReplicationMode // NodeID returns this node's unique identifier. NodeID() string } // Command represents a write operation to be replicated. type Command struct { // Type identifies the operation type. Type CommandType // Data is the serialized operation data. Data []byte // Timestamp when the command was created. Timestamp time.Time // RequestID for idempotency/deduplication. RequestID string } // CommandType identifies the type of write operation. type CommandType uint8 const ( // CmdUnknown is an unknown command type. CmdUnknown CommandType = iota // CmdCreateNode creates a new node. CmdCreateNode // CmdUpdateNode updates an existing node. CmdUpdateNode // CmdDeleteNode deletes a node. CmdDeleteNode // CmdCreateEdge creates a new edge. CmdCreateEdge // CmdDeleteEdge deletes an edge. CmdDeleteEdge // CmdSetProperty sets a property on a node. CmdSetProperty // CmdBatchWrite is a batch of multiple writes. CmdBatchWrite // CmdCypher is a Cypher write query. CmdCypher // CmdVoteRequest is a Raft vote request. CmdVoteRequest // CmdVoteResponse is a Raft vote response. CmdVoteResponse // CmdAppendEntries is a Raft append entries request. CmdAppendEntries // CmdAppendEntriesResponse is a Raft append entries response. CmdAppendEntriesResponse ) // HealthStatus represents the health state of the replicator. type HealthStatus struct { // Mode is the replication mode. Mode ReplicationMode `json:"mode"` // NodeID is this node's identifier. NodeID string `json:"node_id"` // Role is the current role (leader, follower, standby, etc.). Role string `json:"role"` // IsLeader indicates if this node accepts writes. IsLeader bool `json:"is_leader"` // LeaderID is the current leader's ID (if known). LeaderID string `json:"leader_id,omitempty"` // LeaderAddr is the current leader's address. LeaderAddr string `json:"leader_addr,omitempty"` // State is the current state (ready, initializing, etc.). State string `json:"state"` // Healthy indicates overall health. Healthy bool `json:"healthy"` // ReplicationLag is the lag behind leader (for followers). ReplicationLag time.Duration `json:"replication_lag,omitempty"` // LastContact is the last successful contact with leader/peer. LastContact time.Time `json:"last_contact,omitempty"` // Peers contains status of peer nodes. Peers []PeerStatus `json:"peers,omitempty"` // Region is the region ID (for multi-region). Region string `json:"region,omitempty"` // CommitIndex is the last committed log index (for Raft). CommitIndex uint64 `json:"commit_index,omitempty"` // AppliedIndex is the last applied log index. AppliedIndex uint64 `json:"applied_index,omitempty"` // Term is the current Raft term. Term uint64 `json:"term,omitempty"` } // PeerStatus represents the status of a peer node. type PeerStatus struct { // ID is the peer's identifier. ID string `json:"id"` // Address is the peer's network address. Address string `json:"address"` // Healthy indicates if the peer is reachable. Healthy bool `json:"healthy"` // Lag is the replication lag for this peer. Lag uint64 `json:"lag,omitempty"` // LastContact is the last successful contact with this peer. LastContact time.Time `json:"last_contact,omitempty"` // State is the peer's current state. State string `json:"state,omitempty"` } // Storage is the interface that the storage engine must implement // to work with replication. This is a subset of the full storage.Engine // interface, containing only what replication needs. type Storage interface { // ApplyCommand applies a replicated command to storage. ApplyCommand(cmd *Command) error // GetWALPosition returns the current WAL position. GetWALPosition() (uint64, error) // GetWALEntries returns WAL entries starting from the given position. GetWALEntries(fromPosition uint64, maxEntries int) ([]*WALEntry, error) // WriteSnapshot writes a full snapshot to the given writer. WriteSnapshot(w SnapshotWriter) error // RestoreSnapshot restores state from a snapshot. RestoreSnapshot(r SnapshotReader) error } // WALEntry represents a write-ahead log entry. type WALEntry struct { // Position is the unique, monotonically increasing position. Position uint64 // Timestamp when the entry was created. Timestamp int64 // Command is the replicated command. Command *Command } // SnapshotWriter is used to write snapshot data. type SnapshotWriter interface { Write(p []byte) (n int, err error) } // SnapshotReader is used to read snapshot data. type SnapshotReader interface { Read(p []byte) (n int, err error) } // NewReplicator creates the appropriate Replicator based on configuration. // This is the main factory function for creating replicators. // // For standalone mode, this returns a no-op replicator that has zero overhead. // For other modes, it returns the appropriate distributed replicator. // // Example: // // config := replication.LoadFromEnv() // if err := config.Validate(); err != nil { // log.Fatal(err) // } // // replicator, err := replication.NewReplicator(config, storage) // if err != nil { // log.Fatal(err) // } func NewReplicator(config *Config, storage Storage) (Replicator, error) { switch config.Mode { case ModeStandalone: return NewStandaloneReplicator(config, storage), nil case ModeHAStandby: return NewHAStandbyReplicator(config, storage) case ModeRaft: return NewRaftReplicator(config, storage) case ModeMultiRegion: return NewMultiRegionReplicator(config, storage) default: return nil, errors.New("unknown replication mode: " + string(config.Mode)) } } // StandaloneReplicator is a no-op replicator for single-node operation. // It implements the Replicator interface with zero overhead. // All writes are applied directly to storage without any replication. type StandaloneReplicator struct { config *Config storage Storage mu sync.RWMutex started bool closed bool } // NewStandaloneReplicator creates a new standalone (no-op) replicator. func NewStandaloneReplicator(config *Config, storage Storage) *StandaloneReplicator { return &StandaloneReplicator{ config: config, storage: storage, } } // Start initializes the standalone replicator (no-op). func (r *StandaloneReplicator) Start(ctx context.Context) error { r.mu.Lock() defer r.mu.Unlock() if r.closed { return ErrClosed } r.started = true return nil } // Apply applies a command directly to storage. func (r *StandaloneReplicator) Apply(cmd *Command, timeout time.Duration) error { r.mu.RLock() if r.closed { r.mu.RUnlock() return ErrClosed } if !r.started { r.mu.RUnlock() return ErrNotReady } r.mu.RUnlock() return r.storage.ApplyCommand(cmd) } // ApplyBatch applies multiple commands directly to storage. func (r *StandaloneReplicator) ApplyBatch(cmds []*Command, timeout time.Duration) error { for _, cmd := range cmds { if err := r.Apply(cmd, timeout); err != nil { return err } } return nil } // IsLeader always returns true for standalone mode. func (r *StandaloneReplicator) IsLeader() bool { return true } // LeaderAddr returns empty string for standalone mode. func (r *StandaloneReplicator) LeaderAddr() string { return "" } // LeaderID returns this node's ID for standalone mode. func (r *StandaloneReplicator) LeaderID() string { return r.config.NodeID } // Health returns health status. func (r *StandaloneReplicator) Health() *HealthStatus { r.mu.RLock() defer r.mu.RUnlock() state := "initializing" if r.started { state = "ready" } if r.closed { state = "closed" } return &HealthStatus{ Mode: ModeStandalone, NodeID: r.config.NodeID, Role: "standalone", IsLeader: true, LeaderID: r.config.NodeID, State: state, Healthy: r.started && !r.closed, } } // WaitForLeader returns immediately for standalone mode. func (r *StandaloneReplicator) WaitForLeader(ctx context.Context) error { return nil } // Shutdown stops the standalone replicator. func (r *StandaloneReplicator) Shutdown() error { r.mu.Lock() defer r.mu.Unlock() r.closed = true return nil } // Mode returns the replication mode. func (r *StandaloneReplicator) Mode() ReplicationMode { return ModeStandalone } // NodeID returns this node's ID. func (r *StandaloneReplicator) NodeID() string { return r.config.NodeID } // Ensure StandaloneReplicator implements Replicator. var _ Replicator = (*StandaloneReplicator)(nil)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server