Class RaftNode
- All Implemented Interfaces:
RaftNodeApi
Implements the Raft protocol (Ongaro invalid input: '&' Ousterhout, 2014) including: - Leader election with randomized timeouts - Log replication (AppendEntries RPC) - Commit index advancement with majority quorum - Heartbeats from leader to followers - Log compaction / snapshots - Dynamic membership changes (AddServer / RemoveServer)
State machine application is delegated to a callback so any upper layer (DistributedMap, DistributedQueue, etc.) can consume committed entries.
Threading model: - Election timer runs on a single scheduled virtual thread - Heartbeat/replication timer runs on a separate scheduled virtual thread - Incoming RPCs are processed on the caller's virtual thread (from TcpServer) - All state mutations are guarded by a ReentrantLock (not read-write lock, since election/replication reads and writes are interleaved)
-
Nested Class Summary
Nested Classes -
Field Summary
Fields -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoidaddLearner(String learnerId) Add a learner (non-voting member) to the cluster.voidAdd a peer to the cluster.Add a new server to the cluster (leader only).longlongawaitLinearizableReadToken(long timeout, TimeUnit unit) Wait until it is safe to serve a linearizable local read and return a term token that the caller must revalidate before publishing the read result.booleanawaitReplicationDrained(long timeout, TimeUnit unit) Blocks until replication lag reaches zero or the timeout expires.booleanCheck if this node can serve a linearizable read from local state.Capture the currently applied state machine image for an out-of-band Hot Backup.voidforceCompaction(long snapshotIndex, long snapshotTerm, byte[] snapshotData) Force log compaction on a running node (for integration tests and admin tooling).longTotal commands committed (replicated to majority) since node start.longTotal commands submitted to this leader since node start.longlongTotal elections started (including pre-vote) since node start.Get a snapshot of current election statistics.longTotal elections won (became leader) since node start.Get follower replication lag metrics: per-peer elapsed time in nanoseconds since the last successful ACK.Get follower match index metrics: per-peer highest replicated log index.@Nullable StringlonggetLearnerReplicationLag(String learnerId) Get the replication lag of a learner (how far behind commitIndex it is).Get the set of all learners in the cluster.longReturns the current leader lease expiry timestamp (System.nanoTime()-based).longTotal entries in Raft log (including snapshotted entries).longTotal pre-vote rounds started since node start.longReplication lag: commit index minus last applied.Get a snapshot of current replication statistics.longTotal state machine application errors since node start.longGet time since last heartbeat received in milliseconds.Get the set of all voters (non-learner members) in the cluster.Handle incoming AppendEntries RPC (follower side).voidHandle AppendEntries response (leader side).handleHeartbeat(Message msg) Handle a Raft heartbeat (simplified AppendEntries with no entries).Handle incoming InstallSnapshot RPC (follower side).voidHandle InstallSnapshot response (leader side).Handle an incoming PreVote request.voidHandle an incoming PreVote response.handleTimeoutNow(Message msg) Handle an incoming TIMEOUT_NOW RPC from the leader.handleVoteRequest(Message msg) Handle an incoming RequestVote RPC.voidHandle an incoming VoteResponse.voidinitializeStaticVoters(Collection<String> voterIds) Initialize a fixed voter set before the Raft node starts.booleanisLeader()booleanCheck if a node is a learner (non-voting member).booleanCheck if the leader lease is currently valid.booleanisLinearizableReadTokenValid(long readTerm) Revalidate a read token after the state machine has been read.booleanbooleanpromoteLearner(String learnerId) Promote a learner to a voting member (voter).Perform a linearizable read using the ReadIndex protocol.voidrecoverCommittedEntries(long durableCommitIndex) Replay committed log entries that were recovered from durable storage.voidRecover persisted Raft metadata (currentTerm, votedFor, commit boundary) from disk.voidremoveLearner(String learnerId) Remove a learner from the cluster.voidremovePeer(String peerId) Remove a peer from the cluster.removeServer(String targetNodeId) Remove a server from the cluster (leader only).voidsetCommitIndex(long index) Set the commit index, used during startup recovery to restore committed state.voidsetCompactionThreshold(long threshold) voidsetElectionTimeouts(long minMs, long maxMs) Configure election timeout bounds (min/max).voidsetHeartbeatInterval(long intervalMs) Configure heartbeat interval.voidsetLastApplied(long index) Set the last applied index, used during startup recovery from a disk snapshot to prevent re-applying entries already included in the snapshot.voidsetLoomMetrics(@Nullable LoomMetrics metrics) Set the LoomMetrics instance for instrumentation.voidsetMetadataStore(@Nullable RaftMetadataStore store) Set the metadata store for durable Raft state (currentTerm, votedFor).voidsetReplicationInterval(long intervalMs) Configure replication interval.voidsetSnapshotInstallWarnSeconds(long seconds) voidsetSnapshotStore(@Nullable SnapshotStore store) Set the snapshot store for persistent snapshot management.voidsetWalWriter(@Nullable WalWriter walWriter) Set the WAL writer for persistence.voidstart()voidstepDown()Gracefully step down from leader role (if leader).voidstop()submitCommand(byte[] command) Submit a command to be replicated across the cluster.submitConfigChange(ConfigChange change) Submit a safe dynamic membership change (AddServer or RemoveServer).submitLearnerAdd(String learnerId, String address) Add a learner (non-voting member) to the cluster for safe scaling.submitLearnerRemove(String learnerId) Remove a learner from the cluster.voidtakeSnapshot(long snapshotIndex, long snapshotTerm, byte[] snapshotData) Take a snapshot of the state machine at a specific log index.<T> TwithStateMachineReadLock(Callable<T> readAction) Run a local state-machine read while excluding in-progress state-machine apply.Methods inherited from class Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface RaftNodeApi
getCommitIndex, getGroupName, getLastApplied, getNodeId, getRaftLog, getState, isRaftFaulted
-
Field Details
-
UNSAFE_CONFIG_CHANGE_DISABLED_MESSAGE
- See Also:
-
DEFAULT_COMPACTION_THRESHOLD
public static final long DEFAULT_COMPACTION_THRESHOLD- See Also:
-
-
Constructor Details
-
RaftNode
Construct a RaftNode in the default group ("raft-0"). Used by single-group (non-sharded) deployments and most existing tests.- Parameters:
nodeId- the stable node identifier (must not be null)instanceNumber- the instance number used for logging (must be >= 0)
-
RaftNode
Construct a RaftNode bound to a specific Raft group. Used byRaftGroupManagerwhen creating per-group instances for sharded deployments. ThegroupNameis stamped on every outbound Raft RPC (inMessage.mapName()) so peers can route incoming messages to the correct group.- Parameters:
nodeId- the stable node identifier (must not be null)instanceNumber- the instance number used for logging (must be >= 0)groupName- the name of the Raft group this node belongs to (must not be null)
-
-
Method Details
-
getCurrentTerm
public long getCurrentTerm()- Specified by:
getCurrentTermin interfaceRaftNodeApi
-
getLeaseExpiry
public long getLeaseExpiry()Returns the current leader lease expiry timestamp (System.nanoTime()-based). -
setWalWriter
Set the WAL writer for persistence. When set, commit acknowledgement will fsync the WAL to ensure durability. Without this, acknowledged writes may be lost on crash.- Parameters:
walWriter- The WAL writer instance, or null to disable WAL fsyncs
-
setMetadataStore
Set the metadata store for durable Raft state (currentTerm, votedFor). Per Raft §5.2, these MUST be persisted before responding to any RPC.- Parameters:
store- the metadata store, or null to disable persistence (unsafe)
-
recoverMetadata
Recover persisted Raft metadata (currentTerm, votedFor, commit boundary) from disk. MUST be called before start() when metadata persistence is configured.- Throws:
IOException
-
setSnapshotStore
Set the snapshot store for persistent snapshot management. When set, auto-compaction will save state machine snapshots to disk and truncate the WAL, enabling fast crash recovery.- Parameters:
store- the SnapshotStore instance, or null to disable snapshots
-
setCompactionThreshold
public void setCompactionThreshold(long threshold) -
setLoomMetrics
Set the LoomMetrics instance for instrumentation. When set, Raft operations will be instrumented with latency and validation metrics.- Parameters:
metrics- the LoomMetrics instance, or null to disable metrics
-
setLastApplied
public void setLastApplied(long index) Set the last applied index, used during startup recovery from a disk snapshot to prevent re-applying entries already included in the snapshot.- Parameters:
index- the snapshot index that has been restored
-
setCommitIndex
public void setCommitIndex(long index) Set the commit index, used during startup recovery to restore committed state. Only advances — never lowers the commit index.- Parameters:
index- the commit index to restore to
-
recoverCommittedEntries
public void recoverCommittedEntries(long durableCommitIndex) Replay committed log entries that were recovered from durable storage. The caller must rebuild the in-memory log first, then invoke this method before the node starts serving traffic.- Parameters:
durableCommitIndex- highest committed index recovered from persistent metadata
-
setSnapshotInstallWarnSeconds
public void setSnapshotInstallWarnSeconds(long seconds) -
setElectionTimeouts
public void setElectionTimeouts(long minMs, long maxMs) Configure election timeout bounds (min/max). Used in tests to speed up elections without timeout waiting.- Parameters:
minMs- minimum election timeout in millisecondsmaxMs- maximum election timeout in milliseconds
-
setHeartbeatInterval
public void setHeartbeatInterval(long intervalMs) Configure heartbeat interval. Used in tests to speed up leader-follower synchronization.- Parameters:
intervalMs- heartbeat interval in milliseconds
-
setReplicationInterval
public void setReplicationInterval(long intervalMs) Configure replication interval. Used in tests to control replication frequency.- Parameters:
intervalMs- replication check interval in milliseconds
-
addPeer
Add a peer to the cluster. Used to dynamically add new nodes (e.g., during AddServer membership changes).- Parameters:
peerId- the ID of the peer to add
-
initializeStaticVoters
Initialize a fixed voter set before the Raft node starts.This is the only supported production bootstrap path until joint-consensus live voter changes are implemented. It does not append a config-change log entry; every member must be started with the same explicit voter list.
- Parameters:
voterIds- complete static voter set, including this node
-
removePeer
Remove a peer from the cluster. Used to dynamically remove nodes (e.g., during RemoveServer membership changes). Also cleans up leader state tracking for this peer.- Parameters:
peerId- the ID of the peer to remove
-
addLearner
Add a learner (non-voting member) to the cluster. Learners receive log replication from the leader but do NOT: - Participate in elections - Count toward quorum decisionsUsed for safe scaling (add learner first, then promote to voter via ADD_SERVER).
- Parameters:
learnerId- the ID of the learner to add
-
removeLearner
Remove a learner from the cluster. Cleans up all replication state tracking for the learner.- Parameters:
learnerId- the ID of the learner to remove
-
promoteLearner
Promote a learner to a voting member (voter). The learner must already have been added via addLearner or LEARNER_ADD config change. This is typically called as part of a config change to upgrade the learner.- Parameters:
learnerId- the ID of the learner to promote- Returns:
- true if promotion succeeded, false if the node is not a learner
-
isLearner
Check if a node is a learner (non-voting member).- Parameters:
nodeId- the ID to check- Returns:
- true if the node is a learner
-
getLearners
Get the set of all learners in the cluster.- Specified by:
getLearnersin interfaceRaftNodeApi- Returns:
- a copy of the learners set
-
getVoters
Get the set of all voters (non-learner members) in the cluster.- Specified by:
getVotersin interfaceRaftNodeApi- Returns:
- a copy of voters = clusterMembers - learners
-
start
public void start() -
stop
public void stop() -
submitCommand
Submit a command to be replicated across the cluster.SEMANTICS (At-Most-Once): This method provides at-most-once semantics: the command is appended to the leader's log and a replication attempt is initiated, but the future may complete with failure even if the command was eventually committed by the cluster. This is the standard Raft behavior: writes are not idempotent at the RPC level.
DURABILITY: When the future completes with success, the entry is guaranteed to be:
- Replicated on a majority of nodes
- Persisted to the write-ahead log (WAL)
- fsync'd to disk if
walWriter.isSyncOnCommit()is true - Applied to the state machine
IDEMPOTENCY: Clients requiring at-least-once semantics MUST use idempotency keys:
- Embed a unique client-side identifier (e.g., UUID) in each command
- On leader change, retry with the same command (and same ID)
- Server deduplicates based on the ID, returning cached result if already applied
LEADER CHANGES: If the leader steps down or crashes while a command is pending:
- The future will complete exceptionally with
LeaderChangedException - The caller MUST retry against the new leader (obtain via the exception's leaderId field)
- The original command may or may not have been committed; the caller cannot assume it was lost
- If using idempotency keys, retrying is safe; the new leader will deduplicate
THREAD SAFETY: This method is thread-safe. Calls are serialized internally by
stateLock. Multiple threads can call this simultaneously; all updates to the log and match index are atomic.CONSTRAINTS: Only the current leader can accept commands. Followers reject all submissions with
NotLeaderException.- Specified by:
submitCommandin interfaceRaftNodeApi- Parameters:
command- the serialized command to replicate (must not be null). Content is opaque to Raft; the state machine defines semantics.- Returns:
- a
CompletableFuture<Object>that:- Completes with
truewhen the command is committed and applied - Fails with
NotLeaderExceptionif this node is not the leader - Fails with
LeaderChangedExceptionif leadership changes before commit - Fails with
IllegalStateExceptionif the node is stopped - Fails with other exceptions on log I/O errors
- Completes with
- Throws:
IllegalStateException- if the node is stopped (future will fail with this)
-
readIndex
Perform a linearizable read using the ReadIndex protocol.This method confirms the node is still the valid leader (via lease check), then waits until the state machine has applied entries up to the current commitIndex before completing. The caller can then safely read from local state.
- Returns:
- a future that completes with the readIndex once it's safe to read
- Throws:
NotLeaderException- if this node is not the leader
-
awaitLinearizableReadToken
public long awaitLinearizableReadToken(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException Wait until it is safe to serve a linearizable local read and return a term token that the caller must revalidate before publishing the read result.- Specified by:
awaitLinearizableReadTokenin interfaceRaftNodeApi- Throws:
InterruptedExceptionTimeoutException
-
awaitLinearizableReadToken
- Specified by:
awaitLinearizableReadTokenin interfaceRaftNodeApi- Throws:
InterruptedExceptionTimeoutException
-
isLinearizableReadTokenValid
public boolean isLinearizableReadTokenValid(long readTerm) Revalidate a read token after the state machine has been read. If this returns false, the caller must discard the already-read result and retry or redirect.- Specified by:
isLinearizableReadTokenValidin interfaceRaftNodeApi
-
withStateMachineReadLock
Run a local state-machine read while excluding in-progress state-machine apply.ReadIndex/lease checks prove the leader is current and has applied through the read index; this guard additionally prevents a read from interleaving with a later multi-key apply that is already mutating local structures.
- Throws:
Exception
-
submitConfigChange
Submit a safe dynamic membership change (AddServer or RemoveServer).Safety Guarantees:
- Only ONE config change at a time (rejects if pendingConfigChange != null)
- For AddServer: waits until new server is caught up before committing
- Config changes are committed with majority quorum (like normal entries)
- For RemoveServer: if removing self, first transfers leadership
- Specified by:
submitConfigChangein interfaceRaftNodeApi- Parameters:
change- the ConfigChange to submit (ADD_SERVER or REMOVE_SERVER)- Returns:
- a future that completes when the config change is committed
- Throws:
IllegalStateException- if node is stopped or not leaderIllegalArgumentException- if a config change is already in-progress
-
submitLearnerAdd
Add a learner (non-voting member) to the cluster for safe scaling. Learners receive log replication but do NOT participate in elections or count toward quorum.Usage pattern: 1. submitLearnerAdd("new-node", "localhost:9000") — add as learner 2. Wait for replication to catch up (monitor replication lag) 3. submitConfigChange(ADD_SERVER) — promote to voting member
- Parameters:
learnerId- the ID of the learner to addaddress- the network address of the learner- Returns:
- a CompletableFuture that completes when the LEARNER_ADD config change is committed
-
submitLearnerRemove
Remove a learner from the cluster. Use this to cleanly remove read replicas or observation nodes.- Parameters:
learnerId- the ID of the learner to remove- Returns:
- a CompletableFuture that completes when the LEARNER_REMOVE config change is committed
-
getLearnerReplicationLag
Get the replication lag of a learner (how far behind commitIndex it is). This helps determine when a learner is caught up enough to be promoted to voter.- Parameters:
learnerId- the ID of the learner- Returns:
- the replication lag (commitIndex - learnerMatchIndex), or -1 if learner not found
-
handleVoteRequest
-
handleVoteResponse
Handle an incoming VoteResponse. -
handlePreVoteRequest
-
handlePreVoteResponse
Handle an incoming PreVote response. If we receive a majority, proceed to a real election. -
handleAppendEntries
-
handleInstallSnapshot
Handle incoming InstallSnapshot RPC (follower side). The leader sends this when the follower's nextIndex is behind the leader's lastSnapshotIndex. The follower replaces its log with the snapshot and updates its snapshot metadata.Supports chunked transfer for large snapshots. Includes SHA-256 checksum validation and latency instrumentation.
Format per chunk: [term(8)][lastSnapshotIndex(8)][lastSnapshotTerm(8)][offset(8)][done(1)][chunkLen(4)][chunk...][sha256(32) if done=true]
-
handleInstallSnapshotResponse
Handle InstallSnapshot response (leader side). Updates nextIndex for the follower after successful snapshot installation. -
handleAppendResponse
Handle AppendEntries response (leader side). -
handleHeartbeat
-
takeSnapshot
public void takeSnapshot(long snapshotIndex, long snapshotTerm, byte[] snapshotData) Take a snapshot of the state machine at a specific log index.The snapshot data is provided by the upper layer (e.g., a serialized copy of the state machine). This method triggers log compaction to remove entries before the snapshot index, improving memory usage and recovery time.
- Parameters:
snapshotIndex- the log index up to which the snapshot is takensnapshotTerm- the term of the entry at snapshotIndexsnapshotData- the serialized state machine snapshot
-
captureSnapshotForHotBackup
Capture the currently applied state machine image for an out-of-band Hot Backup. This does not persist to the live snapshot store and does not compact the Raft log.- Returns:
- snapshot bytes plus the applied Raft boundary they represent
-
forceCompaction
public void forceCompaction(long snapshotIndex, long snapshotTerm, byte[] snapshotData) throws IOException Force log compaction on a running node (for integration tests and admin tooling).Unlike
takeSnapshot(long, long, byte[]), this method is safe to call on a running node. If aSnapshotStoreis configured, the snapshot is persisted to disk first (matching the production auto-compaction path). If no store is configured, the compaction is performed purely in memory. The requested boundary must already be committed and applied on this node, and the supplied term must match the log.- Parameters:
snapshotIndex- the log index up to which to compactsnapshotTerm- the term of the entry at snapshotIndexsnapshotData- the serialized state machine snapshot- Throws:
IOException- if persisting the snapshot to disk fails
-
addServer
Add a new server to the cluster (leader only).Uses the single-server change approach from Raft dissertation §4. The change is committed as a special log entry; this method returns a future that completes when the change is committed.
- Parameters:
newNodeId- the ID of the new node to add- Returns:
- a future that completes when the add is committed, or fails if not leader
-
removeServer
Remove a server from the cluster (leader only).The removal is committed as a special log entry. This method returns a future that completes when the removal is committed. The target node will no longer receive replication updates after this is committed.
- Parameters:
targetNodeId- the ID of the node to remove- Returns:
- a future that completes when the removal is committed, or fails if not leader
-
canServeLocalRead
public boolean canServeLocalRead()Check if this node can serve a linearizable read from local state.Returns true if: 1. This node is the leader 2. The leader lease has not expired 3. All entries up to commitIndex have been applied 4. At least one entry in the current term has been committed (etcd-7331)
Condition 4 is critical: without it, a newly elected leader could serve stale reads because it doesn't know the true commit point yet. The leader must commit a no-op in its own term first to establish this.
This avoids the cost of a full quorum read-index round-trip.
- Specified by:
canServeLocalReadin interfaceRaftNodeApi
-
isLeaseValid
public boolean isLeaseValid()Check if the leader lease is currently valid. Must hold stateLock to prevent TOCTOU between checking state and lease.- Specified by:
isLeaseValidin interfaceRaftNodeApi
-
getLeaderId
- Specified by:
getLeaderIdin interfaceRaftNodeApi
-
getClusterMembers
- Specified by:
getClusterMembersin interfaceRaftNodeApi
-
isLeader
public boolean isLeader()- Specified by:
isLeaderin interfaceRaftNodeApi
-
isRunning
public boolean isRunning()- Specified by:
isRunningin interfaceRaftNodeApi
-
getNextIndex
-
getMatchIndex
-
getFollowerLagMetrics
Get follower replication lag metrics: per-peer elapsed time in nanoseconds since the last successful ACK. The value is computed asSystem.nanoTime() - lastAckTimeNanos, so high values indicate a slow or degraded follower.- Specified by:
getFollowerLagMetricsin interfaceRaftNodeApi
-
getFollowerMatchIndexMetrics
Get follower match index metrics: per-peer highest replicated log index. Returns Map<peerId, matchIndex> showing replication progress on each follower. A low match index relative to commit index means the follower is behind.- Specified by:
getFollowerMatchIndexMetricsin interfaceRaftNodeApi
-
getElectionsStarted
public long getElectionsStarted()Total elections started (including pre-vote) since node start.- Specified by:
getElectionsStartedin interfaceRaftNodeApi
-
getElectionsWon
public long getElectionsWon()Total elections won (became leader) since node start. -
getPreVotesStarted
public long getPreVotesStarted()Total pre-vote rounds started since node start. -
getCommandsSubmitted
public long getCommandsSubmitted()Total commands submitted to this leader since node start.- Specified by:
getCommandsSubmittedin interfaceRaftNodeApi
-
getCommandsCommitted
public long getCommandsCommitted()Total commands committed (replicated to majority) since node start.- Specified by:
getCommandsCommittedin interfaceRaftNodeApi
-
getReplicationLag
public long getReplicationLag()Replication lag: commit index minus last applied.- Specified by:
getReplicationLagin interfaceRaftNodeApi
-
awaitReplicationDrained
Blocks until replication lag reaches zero or the timeout expires.- Specified by:
awaitReplicationDrainedin interfaceRaftNodeApi- Parameters:
timeout- the maximum time to waitunit- the time unit- Returns:
- true if replication lag reached zero, false if timed out
- Throws:
InterruptedException- if the current thread is interrupted
-
getStateApplyErrors
public long getStateApplyErrors()Total state machine application errors since node start.- Specified by:
getStateApplyErrorsin interfaceRaftNodeApi
-
getLogSize
public long getLogSize()Total entries in Raft log (including snapshotted entries).- Specified by:
getLogSizein interfaceRaftNodeApi
-
handleTimeoutNow
Handle an incoming TIMEOUT_NOW RPC from the leader. This forces the follower to immediately start an election, bypassing the normal election timeout. This is the standard Raft leadership transfer protocol.- Parameters:
msg- the TIMEOUT_NOW message- Returns:
- a TIMEOUT_NOW_RESPONSE message
-
getTimeSinceLastHeartbeatMs
public long getTimeSinceLastHeartbeatMs()Get time since last heartbeat received in milliseconds. For followers, this indicates how long since the last AppendEntries RPC from leader. For leaders and candidates, this is not meaningful and returns 0.- Specified by:
getTimeSinceLastHeartbeatMsin interfaceRaftNodeApi- Returns:
- milliseconds since last heartbeat received, or 0 if no heartbeat received
-
stepDown
public void stepDown()Gracefully step down from leader role (if leader). This is called during shutdown to allow another node to become leader. Non-leaders safely return immediately. -
getElectionStats
Get a snapshot of current election statistics.- Returns:
- ElectionStats record with current election metrics
-
getReplicationStats
Get a snapshot of current replication statistics.- Returns:
- ReplicationStats record with current replication metrics
-