Class RaftNode

java.lang.Object
com.loomcache.server.consensus.RaftNode
All Implemented Interfaces:
RaftNodeApi

public class RaftNode extends Object implements RaftNodeApi
Full Raft consensus node implementation.

Implements the Raft protocol (Ongaro invalid input: '&' Ousterhout, 2014) including: - Leader election with randomized timeouts - Log replication (AppendEntries RPC) - Commit index advancement with majority quorum - Heartbeats from leader to followers - Log compaction / snapshots - Dynamic membership changes (AddServer / RemoveServer)

State machine application is delegated to a callback so any upper layer (DistributedMap, DistributedQueue, etc.) can consume committed entries.

Threading model: - Election timer runs on a single scheduled virtual thread - Heartbeat/replication timer runs on a separate scheduled virtual thread - Incoming RPCs are processed on the caller's virtual thread (from TcpServer) - All state mutations are guarded by a ReentrantLock (not read-write lock, since election/replication reads and writes are interleaved)

  • Field Details

    • UNSAFE_CONFIG_CHANGE_DISABLED_MESSAGE

      public static final String UNSAFE_CONFIG_CHANGE_DISABLED_MESSAGE
      See Also:
    • DEFAULT_COMPACTION_THRESHOLD

      public static final long DEFAULT_COMPACTION_THRESHOLD
      See Also:
  • Constructor Details

    • RaftNode

      public RaftNode(String nodeId, int instanceNumber)
      Construct a RaftNode in the default group ("raft-0"). Used by single-group (non-sharded) deployments and most existing tests.
      Parameters:
      nodeId - the stable node identifier (must not be null)
      instanceNumber - the instance number used for logging (must be >= 0)
    • RaftNode

      public RaftNode(String nodeId, int instanceNumber, String groupName)
      Construct a RaftNode bound to a specific Raft group. Used by RaftGroupManager when creating per-group instances for sharded deployments. The groupName is stamped on every outbound Raft RPC (in Message.mapName()) so peers can route incoming messages to the correct group.
      Parameters:
      nodeId - the stable node identifier (must not be null)
      instanceNumber - the instance number used for logging (must be >= 0)
      groupName - the name of the Raft group this node belongs to (must not be null)
  • Method Details

    • getCurrentTerm

      public long getCurrentTerm()
      Specified by:
      getCurrentTerm in interface RaftNodeApi
    • getLeaseExpiry

      public long getLeaseExpiry()
      Returns the current leader lease expiry timestamp (System.nanoTime()-based).
    • setWalWriter

      public void setWalWriter(@Nullable WalWriter walWriter)
      Set the WAL writer for persistence. When set, commit acknowledgement will fsync the WAL to ensure durability. Without this, acknowledged writes may be lost on crash.
      Parameters:
      walWriter - The WAL writer instance, or null to disable WAL fsyncs
    • setMetadataStore

      public void setMetadataStore(@Nullable RaftMetadataStore store)
      Set the metadata store for durable Raft state (currentTerm, votedFor). Per Raft §5.2, these MUST be persisted before responding to any RPC.
      Parameters:
      store - the metadata store, or null to disable persistence (unsafe)
    • recoverMetadata

      public void recoverMetadata() throws IOException
      Recover persisted Raft metadata (currentTerm, votedFor, commit boundary) from disk. MUST be called before start() when metadata persistence is configured.
      Throws:
      IOException
    • setSnapshotStore

      public void setSnapshotStore(@Nullable SnapshotStore store)
      Set the snapshot store for persistent snapshot management. When set, auto-compaction will save state machine snapshots to disk and truncate the WAL, enabling fast crash recovery.
      Parameters:
      store - the SnapshotStore instance, or null to disable snapshots
    • setCompactionThreshold

      public void setCompactionThreshold(long threshold)
    • setLoomMetrics

      public void setLoomMetrics(@Nullable LoomMetrics metrics)
      Set the LoomMetrics instance for instrumentation. When set, Raft operations will be instrumented with latency and validation metrics.
      Parameters:
      metrics - the LoomMetrics instance, or null to disable metrics
    • setLastApplied

      public void setLastApplied(long index)
      Set the last applied index, used during startup recovery from a disk snapshot to prevent re-applying entries already included in the snapshot.
      Parameters:
      index - the snapshot index that has been restored
    • setCommitIndex

      public void setCommitIndex(long index)
      Set the commit index, used during startup recovery to restore committed state. Only advances — never lowers the commit index.
      Parameters:
      index - the commit index to restore to
    • recoverCommittedEntries

      public void recoverCommittedEntries(long durableCommitIndex)
      Replay committed log entries that were recovered from durable storage. The caller must rebuild the in-memory log first, then invoke this method before the node starts serving traffic.
      Parameters:
      durableCommitIndex - highest committed index recovered from persistent metadata
    • setSnapshotInstallWarnSeconds

      public void setSnapshotInstallWarnSeconds(long seconds)
    • setElectionTimeouts

      public void setElectionTimeouts(long minMs, long maxMs)
      Configure election timeout bounds (min/max). Used in tests to speed up elections without timeout waiting.
      Parameters:
      minMs - minimum election timeout in milliseconds
      maxMs - maximum election timeout in milliseconds
    • setHeartbeatInterval

      public void setHeartbeatInterval(long intervalMs)
      Configure heartbeat interval. Used in tests to speed up leader-follower synchronization.
      Parameters:
      intervalMs - heartbeat interval in milliseconds
    • setReplicationInterval

      public void setReplicationInterval(long intervalMs)
      Configure replication interval. Used in tests to control replication frequency.
      Parameters:
      intervalMs - replication check interval in milliseconds
    • addPeer

      public void addPeer(String peerId)
      Add a peer to the cluster. Used to dynamically add new nodes (e.g., during AddServer membership changes).
      Parameters:
      peerId - the ID of the peer to add
    • initializeStaticVoters

      public void initializeStaticVoters(Collection<String> voterIds)
      Initialize a fixed voter set before the Raft node starts.

      This is the only supported production bootstrap path until joint-consensus live voter changes are implemented. It does not append a config-change log entry; every member must be started with the same explicit voter list.

      Parameters:
      voterIds - complete static voter set, including this node
    • removePeer

      public void removePeer(String peerId)
      Remove a peer from the cluster. Used to dynamically remove nodes (e.g., during RemoveServer membership changes). Also cleans up leader state tracking for this peer.
      Parameters:
      peerId - the ID of the peer to remove
    • addLearner

      public void addLearner(String learnerId)
      Add a learner (non-voting member) to the cluster. Learners receive log replication from the leader but do NOT: - Participate in elections - Count toward quorum decisions

      Used for safe scaling (add learner first, then promote to voter via ADD_SERVER).

      Parameters:
      learnerId - the ID of the learner to add
    • removeLearner

      public void removeLearner(String learnerId)
      Remove a learner from the cluster. Cleans up all replication state tracking for the learner.
      Parameters:
      learnerId - the ID of the learner to remove
    • promoteLearner

      public boolean promoteLearner(String learnerId)
      Promote a learner to a voting member (voter). The learner must already have been added via addLearner or LEARNER_ADD config change. This is typically called as part of a config change to upgrade the learner.
      Parameters:
      learnerId - the ID of the learner to promote
      Returns:
      true if promotion succeeded, false if the node is not a learner
    • isLearner

      public boolean isLearner(String nodeId)
      Check if a node is a learner (non-voting member).
      Parameters:
      nodeId - the ID to check
      Returns:
      true if the node is a learner
    • getLearners

      public Set<String> getLearners()
      Get the set of all learners in the cluster.
      Specified by:
      getLearners in interface RaftNodeApi
      Returns:
      a copy of the learners set
    • getVoters

      public Set<String> getVoters()
      Get the set of all voters (non-learner members) in the cluster.
      Specified by:
      getVoters in interface RaftNodeApi
      Returns:
      a copy of voters = clusterMembers - learners
    • start

      public void start()
    • stop

      public void stop()
    • submitCommand

      public CompletableFuture<Object> submitCommand(byte[] command)
      Submit a command to be replicated across the cluster.

      SEMANTICS (At-Most-Once): This method provides at-most-once semantics: the command is appended to the leader's log and a replication attempt is initiated, but the future may complete with failure even if the command was eventually committed by the cluster. This is the standard Raft behavior: writes are not idempotent at the RPC level.

      DURABILITY: When the future completes with success, the entry is guaranteed to be:

      • Replicated on a majority of nodes
      • Persisted to the write-ahead log (WAL)
      • fsync'd to disk if walWriter.isSyncOnCommit() is true
      • Applied to the state machine
      Without fsync, a crash immediately after acknowledgment could lose the write. With fsync enabled, durability is guaranteed (trade-off: higher latency).

      IDEMPOTENCY: Clients requiring at-least-once semantics MUST use idempotency keys:

      • Embed a unique client-side identifier (e.g., UUID) in each command
      • On leader change, retry with the same command (and same ID)
      • Server deduplicates based on the ID, returning cached result if already applied
      Without idempotency keys, retries on leader change can apply the same command twice.

      LEADER CHANGES: If the leader steps down or crashes while a command is pending:

      • The future will complete exceptionally with LeaderChangedException
      • The caller MUST retry against the new leader (obtain via the exception's leaderId field)
      • The original command may or may not have been committed; the caller cannot assume it was lost
      • If using idempotency keys, retrying is safe; the new leader will deduplicate

      THREAD SAFETY: This method is thread-safe. Calls are serialized internally by stateLock. Multiple threads can call this simultaneously; all updates to the log and match index are atomic.

      CONSTRAINTS: Only the current leader can accept commands. Followers reject all submissions with NotLeaderException.

      Specified by:
      submitCommand in interface RaftNodeApi
      Parameters:
      command - the serialized command to replicate (must not be null). Content is opaque to Raft; the state machine defines semantics.
      Returns:
      a CompletableFuture<Object> that:
      • Completes with true when the command is committed and applied
      • Fails with NotLeaderException if this node is not the leader
      • Fails with LeaderChangedException if leadership changes before commit
      • Fails with IllegalStateException if the node is stopped
      • Fails with other exceptions on log I/O errors
      Throws:
      IllegalStateException - if the node is stopped (future will fail with this)
    • readIndex

      public CompletableFuture<Long> readIndex()
      Perform a linearizable read using the ReadIndex protocol.

      This method confirms the node is still the valid leader (via lease check), then waits until the state machine has applied entries up to the current commitIndex before completing. The caller can then safely read from local state.

      Returns:
      a future that completes with the readIndex once it's safe to read
      Throws:
      NotLeaderException - if this node is not the leader
    • awaitLinearizableReadToken

      public long awaitLinearizableReadToken(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      Wait until it is safe to serve a linearizable local read and return a term token that the caller must revalidate before publishing the read result.
      Specified by:
      awaitLinearizableReadToken in interface RaftNodeApi
      Throws:
      InterruptedException
      TimeoutException
    • awaitLinearizableReadToken

      public long awaitLinearizableReadToken() throws InterruptedException, TimeoutException
      Specified by:
      awaitLinearizableReadToken in interface RaftNodeApi
      Throws:
      InterruptedException
      TimeoutException
    • isLinearizableReadTokenValid

      public boolean isLinearizableReadTokenValid(long readTerm)
      Revalidate a read token after the state machine has been read. If this returns false, the caller must discard the already-read result and retry or redirect.
      Specified by:
      isLinearizableReadTokenValid in interface RaftNodeApi
    • withStateMachineReadLock

      public <T> T withStateMachineReadLock(Callable<T> readAction) throws Exception
      Run a local state-machine read while excluding in-progress state-machine apply.

      ReadIndex/lease checks prove the leader is current and has applied through the read index; this guard additionally prevents a read from interleaving with a later multi-key apply that is already mutating local structures.

      Throws:
      Exception
    • submitConfigChange

      public CompletableFuture<Object> submitConfigChange(ConfigChange change)
      Submit a safe dynamic membership change (AddServer or RemoveServer).

      Safety Guarantees:

      • Only ONE config change at a time (rejects if pendingConfigChange != null)
      • For AddServer: waits until new server is caught up before committing
      • Config changes are committed with majority quorum (like normal entries)
      • For RemoveServer: if removing self, first transfers leadership
      Specified by:
      submitConfigChange in interface RaftNodeApi
      Parameters:
      change - the ConfigChange to submit (ADD_SERVER or REMOVE_SERVER)
      Returns:
      a future that completes when the config change is committed
      Throws:
      IllegalStateException - if node is stopped or not leader
      IllegalArgumentException - if a config change is already in-progress
    • submitLearnerAdd

      public CompletableFuture<Object> submitLearnerAdd(String learnerId, String address)
      Add a learner (non-voting member) to the cluster for safe scaling. Learners receive log replication but do NOT participate in elections or count toward quorum.

      Usage pattern: 1. submitLearnerAdd("new-node", "localhost:9000") — add as learner 2. Wait for replication to catch up (monitor replication lag) 3. submitConfigChange(ADD_SERVER) — promote to voting member

      Parameters:
      learnerId - the ID of the learner to add
      address - the network address of the learner
      Returns:
      a CompletableFuture that completes when the LEARNER_ADD config change is committed
    • submitLearnerRemove

      public CompletableFuture<Object> submitLearnerRemove(String learnerId)
      Remove a learner from the cluster. Use this to cleanly remove read replicas or observation nodes.
      Parameters:
      learnerId - the ID of the learner to remove
      Returns:
      a CompletableFuture that completes when the LEARNER_REMOVE config change is committed
    • getLearnerReplicationLag

      public long getLearnerReplicationLag(String learnerId)
      Get the replication lag of a learner (how far behind commitIndex it is). This helps determine when a learner is caught up enough to be promoted to voter.
      Parameters:
      learnerId - the ID of the learner
      Returns:
      the replication lag (commitIndex - learnerMatchIndex), or -1 if learner not found
    • handleVoteRequest

      public Message handleVoteRequest(Message msg)
      Handle an incoming RequestVote RPC. Called from CacheNode message dispatch.
    • handleVoteResponse

      public void handleVoteResponse(Message msg)
      Handle an incoming VoteResponse.
    • handlePreVoteRequest

      public Message handlePreVoteRequest(Message msg)
      Handle an incoming PreVote request. Respond yes if: the candidate's term >= ours AND its log is up-to-date. Unlike real RequestVote, we do NOT update our term or votedFor.
    • handlePreVoteResponse

      public void handlePreVoteResponse(Message msg)
      Handle an incoming PreVote response. If we receive a majority, proceed to a real election.
    • handleAppendEntries

      public Message handleAppendEntries(Message msg)
      Handle incoming AppendEntries RPC (follower side).
    • handleInstallSnapshot

      public Message handleInstallSnapshot(Message msg)
      Handle incoming InstallSnapshot RPC (follower side). The leader sends this when the follower's nextIndex is behind the leader's lastSnapshotIndex. The follower replaces its log with the snapshot and updates its snapshot metadata.

      Supports chunked transfer for large snapshots. Includes SHA-256 checksum validation and latency instrumentation.

      Format per chunk: [term(8)][lastSnapshotIndex(8)][lastSnapshotTerm(8)][offset(8)][done(1)][chunkLen(4)][chunk...][sha256(32) if done=true]

    • handleInstallSnapshotResponse

      public void handleInstallSnapshotResponse(Message msg)
      Handle InstallSnapshot response (leader side). Updates nextIndex for the follower after successful snapshot installation.
    • handleAppendResponse

      public void handleAppendResponse(Message msg)
      Handle AppendEntries response (leader side).
    • handleHeartbeat

      public Message handleHeartbeat(Message msg)
      Handle a Raft heartbeat (simplified AppendEntries with no entries). Used for leader liveness detection.
    • takeSnapshot

      public void takeSnapshot(long snapshotIndex, long snapshotTerm, byte[] snapshotData)
      Take a snapshot of the state machine at a specific log index.

      The snapshot data is provided by the upper layer (e.g., a serialized copy of the state machine). This method triggers log compaction to remove entries before the snapshot index, improving memory usage and recovery time.

      Parameters:
      snapshotIndex - the log index up to which the snapshot is taken
      snapshotTerm - the term of the entry at snapshotIndex
      snapshotData - the serialized state machine snapshot
    • captureSnapshotForHotBackup

      public RaftNode.SnapshotCapture captureSnapshotForHotBackup()
      Capture the currently applied state machine image for an out-of-band Hot Backup. This does not persist to the live snapshot store and does not compact the Raft log.
      Returns:
      snapshot bytes plus the applied Raft boundary they represent
    • forceCompaction

      public void forceCompaction(long snapshotIndex, long snapshotTerm, byte[] snapshotData) throws IOException
      Force log compaction on a running node (for integration tests and admin tooling).

      Unlike takeSnapshot(long, long, byte[]), this method is safe to call on a running node. If a SnapshotStore is configured, the snapshot is persisted to disk first (matching the production auto-compaction path). If no store is configured, the compaction is performed purely in memory. The requested boundary must already be committed and applied on this node, and the supplied term must match the log.

      Parameters:
      snapshotIndex - the log index up to which to compact
      snapshotTerm - the term of the entry at snapshotIndex
      snapshotData - the serialized state machine snapshot
      Throws:
      IOException - if persisting the snapshot to disk fails
    • addServer

      public CompletableFuture<Object> addServer(String newNodeId)
      Add a new server to the cluster (leader only).

      Uses the single-server change approach from Raft dissertation §4. The change is committed as a special log entry; this method returns a future that completes when the change is committed.

      Parameters:
      newNodeId - the ID of the new node to add
      Returns:
      a future that completes when the add is committed, or fails if not leader
    • removeServer

      public CompletableFuture<Object> removeServer(String targetNodeId)
      Remove a server from the cluster (leader only).

      The removal is committed as a special log entry. This method returns a future that completes when the removal is committed. The target node will no longer receive replication updates after this is committed.

      Parameters:
      targetNodeId - the ID of the node to remove
      Returns:
      a future that completes when the removal is committed, or fails if not leader
    • canServeLocalRead

      public boolean canServeLocalRead()
      Check if this node can serve a linearizable read from local state.

      Returns true if: 1. This node is the leader 2. The leader lease has not expired 3. All entries up to commitIndex have been applied 4. At least one entry in the current term has been committed (etcd-7331)

      Condition 4 is critical: without it, a newly elected leader could serve stale reads because it doesn't know the true commit point yet. The leader must commit a no-op in its own term first to establish this.

      This avoids the cost of a full quorum read-index round-trip.

      Specified by:
      canServeLocalRead in interface RaftNodeApi
    • isLeaseValid

      public boolean isLeaseValid()
      Check if the leader lease is currently valid. Must hold stateLock to prevent TOCTOU between checking state and lease.
      Specified by:
      isLeaseValid in interface RaftNodeApi
    • getLeaderId

      public @Nullable String getLeaderId()
      Specified by:
      getLeaderId in interface RaftNodeApi
    • getClusterMembers

      public Set<String> getClusterMembers()
      Specified by:
      getClusterMembers in interface RaftNodeApi
    • isLeader

      public boolean isLeader()
      Specified by:
      isLeader in interface RaftNodeApi
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface RaftNodeApi
    • getNextIndex

      public Map<String,Long> getNextIndex()
    • getMatchIndex

      public Map<String,Long> getMatchIndex()
    • getFollowerLagMetrics

      public Map<String,Long> getFollowerLagMetrics()
      Get follower replication lag metrics: per-peer elapsed time in nanoseconds since the last successful ACK. The value is computed as System.nanoTime() - lastAckTimeNanos, so high values indicate a slow or degraded follower.
      Specified by:
      getFollowerLagMetrics in interface RaftNodeApi
    • getFollowerMatchIndexMetrics

      public Map<String,Long> getFollowerMatchIndexMetrics()
      Get follower match index metrics: per-peer highest replicated log index. Returns Map<peerId, matchIndex> showing replication progress on each follower. A low match index relative to commit index means the follower is behind.
      Specified by:
      getFollowerMatchIndexMetrics in interface RaftNodeApi
    • getElectionsStarted

      public long getElectionsStarted()
      Total elections started (including pre-vote) since node start.
      Specified by:
      getElectionsStarted in interface RaftNodeApi
    • getElectionsWon

      public long getElectionsWon()
      Total elections won (became leader) since node start.
    • getPreVotesStarted

      public long getPreVotesStarted()
      Total pre-vote rounds started since node start.
    • getCommandsSubmitted

      public long getCommandsSubmitted()
      Total commands submitted to this leader since node start.
      Specified by:
      getCommandsSubmitted in interface RaftNodeApi
    • getCommandsCommitted

      public long getCommandsCommitted()
      Total commands committed (replicated to majority) since node start.
      Specified by:
      getCommandsCommitted in interface RaftNodeApi
    • getReplicationLag

      public long getReplicationLag()
      Replication lag: commit index minus last applied.
      Specified by:
      getReplicationLag in interface RaftNodeApi
    • awaitReplicationDrained

      public boolean awaitReplicationDrained(long timeout, TimeUnit unit) throws InterruptedException
      Blocks until replication lag reaches zero or the timeout expires.
      Specified by:
      awaitReplicationDrained in interface RaftNodeApi
      Parameters:
      timeout - the maximum time to wait
      unit - the time unit
      Returns:
      true if replication lag reached zero, false if timed out
      Throws:
      InterruptedException - if the current thread is interrupted
    • getStateApplyErrors

      public long getStateApplyErrors()
      Total state machine application errors since node start.
      Specified by:
      getStateApplyErrors in interface RaftNodeApi
    • getLogSize

      public long getLogSize()
      Total entries in Raft log (including snapshotted entries).
      Specified by:
      getLogSize in interface RaftNodeApi
    • handleTimeoutNow

      public Message handleTimeoutNow(Message msg)
      Handle an incoming TIMEOUT_NOW RPC from the leader. This forces the follower to immediately start an election, bypassing the normal election timeout. This is the standard Raft leadership transfer protocol.
      Parameters:
      msg - the TIMEOUT_NOW message
      Returns:
      a TIMEOUT_NOW_RESPONSE message
    • getTimeSinceLastHeartbeatMs

      public long getTimeSinceLastHeartbeatMs()
      Get time since last heartbeat received in milliseconds. For followers, this indicates how long since the last AppendEntries RPC from leader. For leaders and candidates, this is not meaningful and returns 0.
      Specified by:
      getTimeSinceLastHeartbeatMs in interface RaftNodeApi
      Returns:
      milliseconds since last heartbeat received, or 0 if no heartbeat received
    • stepDown

      public void stepDown()
      Gracefully step down from leader role (if leader). This is called during shutdown to allow another node to become leader. Non-leaders safely return immediately.
    • getElectionStats

      public ElectionStats getElectionStats()
      Get a snapshot of current election statistics.
      Returns:
      ElectionStats record with current election metrics
    • getReplicationStats

      public ReplicationStats getReplicationStats()
      Get a snapshot of current replication statistics.
      Returns:
      ReplicationStats record with current replication metrics