Class PartitionMigrationManager
Responsibilities:
- Detect membership changes (nodes joining/leaving)
- Compute migration delta: which slots move to which nodes
- Initiate partition migrations (source → target)
- Track migration progress and metrics
Migration protocol:
- Source node sends PARTITION_MIGRATE_START to target (slot = slotId)
- Source batches key-value pairs via PARTITION_MIGRATE_DATA (batch size = 1000)
- Source sends PARTITION_MIGRATE_COMPLETE when done
- All nodes broadcast PARTITION_OWNERSHIP_UPDATE with new table
Thread safety: Uses ConcurrentHashMap for metrics. Migration is async and non-blocking.
- Since:
- 1.0
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic final recordstatic final recordParsed chunk-level DATA metadata + payload.static final recordstatic final recordstatic interfaceAtomically commits a full partition ownership table. -
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionPartitionMigrationManager(PartitionTable partitionTable, TcpServer tcpServer, String nodeId, int instanceNumber) PartitionMigrationManager(PartitionTable partitionTable, TcpServer tcpServer, String nodeId, int instanceNumber, int batchSize) -
Method Summary
Modifier and TypeMethodDescriptionvoidaddMigrationListener(MigrationListener listener) Register an operator listener for partition migration lifecycle events.booleanApplies an exact ownership table that has already passed the external consensus boundary.booleanapplyCommittedOwnershipTable(Map<Integer, String> ownership, List<String> topologyMembers, long revision) awaitChunkAck(int slotId, String targetNodeId, int chunkSeq) Register an ACK waiter for a specific (target, slot, chunkSeq) tuple.booleancanApplyCommittedOwnershipTable(Map<Integer, String> ownership, List<String> topologyMembers, long revision) voidcancelChunkAck(int slotId, String targetNodeId, int chunkSeq) Cancel an ACK waiter (used when the caller gives up, e.g. on timeout).booleancurrentOwnershipEquals(Map<Integer, String> expectedOwnership) static intdecodeAckValue(byte[] payload) decodeChunkedDataValue(byte[] payload) Decode a PARTITION_MIGRATE_DATA value emitted byencodeChunkedDataValue(int, int, byte[]).decodeMigrationRecoveryRequest(byte[] payload) deserializeOwnershipMembers(byte[] payload) deserializeOwnershipTable(byte[] payload) static byte[]encodeAckValue(int chunkSeq) static byte[]encodeChunkedDataValue(int chunkSeq, int totalChunks, byte[] batchBytes) Encode the value payload for a PARTITION_MIGRATE_DATA frame with chunk metadata:[chunkSeq 4B][totalChunks 4B][batchBytes...].static byte[]Get the set of slots currently being received BY this node.Get current migration metrics.Get the set of slots currently being sent FROM this node.booleanbooleanbooleanCheck if any migrations are currently in progress.static booleanisMigrationRecoveryRequest(Message message) static booleanisSerializedOwnershipTable(byte[] payload) longbooleanonCommittedOwnershipUpdate(byte[] payload) Applies an ownership update that has already been committed by consensus.booleanonMembershipChange(List<String> oldMembers, List<String> newMembers) Called when cluster membership changes (node join/leave detected).voidonMigrateComplete(int slotId, String sourceNodeId) Handle incoming PARTITION_MIGRATE_COMPLETE message.voidonMigrateData(int slotId, String sourceNodeId, byte[] batchData) Handle incoming PARTITION_MIGRATE_DATA message.voidonMigrateData(int slotId, String sourceNodeId, int chunkSeq, int totalChunks, byte[] batchData) Handle incoming PARTITION_MIGRATE_DATA with chunk metadata.voidonMigrateDataAck(int slotId, String targetNodeId, int chunkSeq) Handle incoming PARTITION_MIGRATE_DATA_ACK on the SOURCE side.voidonMigrateStart(int slotId, String sourceNodeId) Handle incoming PARTITION_MIGRATE_START message.booleanonMigrationRecoveryRequest(int slotId, String requesterNodeId, byte[] payload, List<String> liveMembers) booleanonOwnershipUpdate(byte[] payload, List<String> expectedMembers) intoutgoingHighWaterMark(int slotId, String targetNodeId) voidrecordOutgoingMigrationComplete(int slotId, String targetNodeId) voidrecordOutgoingMigrationFailure(int slotId, String targetNodeId, @Nullable String failureMessage) voidrecordOutgoingMigrationStart(int slotId, String targetNodeId, int highWaterMark) booleanremoveMigrationListener(MigrationListener listener) Remove a previously registered migration listener.voidrestoreOwnershipTable(@Nullable Object snapshotValue) booleansendCurrentOwnershipUpdate(String targetNodeId, List<String> topologyMembers) Unicast the current partition ownership topology to a single peer as anti-entropy.static byte[]serializeOwnershipMembers(List<String> members) static byte[]static byte[]voidsetLoomMetrics(@Nullable LoomMetrics loomMetrics) voidsetMigratableDataPresentSupplier(@Nullable BooleanSupplier supplier) Installs an optional predicate used to detect whether this node currently holds user data that would need migration during an ownership cutover.voidsetMigrationPipeline(@Nullable PartitionMigrationPipeline pipeline) Set the migration pipeline for real data movement during topology changes.voidsetMigrationsAllowedSupplier(@Nullable BooleanSupplier supplier) Installs an optional predicate that controls whether membership changes may start partition migrations.voidsetOwnershipTableCommitter(@Nullable PartitionMigrationManager.OwnershipTableCommitter committer) Installs the consensus-backed ownership-table committer used for migration cutovers.Optional<byte[]>
-
Field Details
-
DEFAULT_BATCH_SIZE
public static final int DEFAULT_BATCH_SIZE- See Also:
-
MIGRATION_RECOVERY_REQUEST
- See Also:
-
-
Constructor Details
-
PartitionMigrationManager
public PartitionMigrationManager(PartitionTable partitionTable, TcpServer tcpServer, String nodeId, int instanceNumber) -
PartitionMigrationManager
public PartitionMigrationManager(PartitionTable partitionTable, TcpServer tcpServer, String nodeId, int instanceNumber, int batchSize)
-
-
Method Details
-
setMigrationPipeline
Set the migration pipeline for real data movement during topology changes. When set, migrations will use the pipeline's PREPARE/COPY/VERIFY/COMMIT/CLEANUP stages instead of the placeholder protocol.- Parameters:
pipeline- the migration pipeline to use
-
hasMigrationPipeline
public boolean hasMigrationPipeline() -
addMigrationListener
Register an operator listener for partition migration lifecycle events.- Parameters:
listener- listener to add
-
removeMigrationListener
Remove a previously registered migration listener.- Parameters:
listener- listener to remove- Returns:
- true if the listener was registered
-
setMigratableDataPresentSupplier
Installs an optional predicate used to detect whether this node currently holds user data that would need migration during an ownership cutover.Without a predicate, cutovers remain rejected by default when no migration pipeline is configured. Nodes may opt into bootstrap-time cutovers by reporting that their registries are still empty.
-
setMigrationsAllowedSupplier
Installs an optional predicate that controls whether membership changes may start partition migrations. When absent, migrations are allowed. -
setOwnershipTableCommitter
public void setOwnershipTableCommitter(@Nullable PartitionMigrationManager.OwnershipTableCommitter committer) Installs the consensus-backed ownership-table committer used for migration cutovers. If absent, the manager retains the legacy local-table plus acknowledged-broadcast behavior used by isolated unit tests. -
setLoomMetrics
-
onMembershipChange
Called when cluster membership changes (node join/leave detected).Algorithm:
- Capture old partition table
- Call partitionTable.reassignSlots(newMembers)
- Compute delta: which slots this node must send/receive
- Initiate migrations (in background)
- Parameters:
oldMembers- previous cluster members (node IDs)newMembers- new cluster members (node IDs) after topology change
-
onMigrateStart
Handle incoming PARTITION_MIGRATE_START message. Called by CacheNode when this node receives migration data.- Parameters:
slotId- slot ID being migratedsourceNodeId- the source node sending data
-
onMigrateData
Handle incoming PARTITION_MIGRATE_DATA message. Receives a batch of key-value pairs for a slot from a source node.Legacy signature (no chunk metadata): treats the entire payload as chunk 0 of a single-chunk migration. Preserved for tests that exercise the v2.0 non-chunked wire.
-
onMigrateData
public void onMigrateData(int slotId, String sourceNodeId, int chunkSeq, int totalChunks, byte[] batchData) Handle incoming PARTITION_MIGRATE_DATA with chunk metadata.Session 10 ESC-12.1/12.2: the value payload now carries the chunk sequence and the total chunk count inline so the target can:
- Dedup re-delivered chunks (at-least-once + idempotent apply),
- Send back PARTITION_MIGRATE_DATA_ACK so the source can advance its chunk window or detect stalls and retry.
- Parameters:
slotId- slot ID being migratedsourceNodeId- the source nodechunkSeq- 0-based chunk sequencetotalChunks- total chunks expected for this slot (≥ 1)batchData- serialized key-value pairs for THIS chunk only
-
onMigrateDataAck
Handle incoming PARTITION_MIGRATE_DATA_ACK on the SOURCE side. Completes the CompletableFuture registered viaawaitChunkAck(int, String, int)for the given chunk.- Parameters:
slotId- slot the ACK is fortargetNodeId- the node sending the ACK (the migration target)chunkSeq- chunk sequence being acknowledged
-
awaitChunkAck
Register an ACK waiter for a specific (target, slot, chunkSeq) tuple. Must be called BEFORE the DATA chunk is sent so an ACK arriving concurrently with registration races completes the future predictably.- Parameters:
slotId- slot being migratedtargetNodeId- target of the migrationchunkSeq- chunk sequence the caller is about to send- Returns:
- a future that completes with
chunkSeqwhen target ACKs
-
cancelChunkAck
Cancel an ACK waiter (used when the caller gives up, e.g. on timeout). -
encodeChunkedDataValue
public static byte[] encodeChunkedDataValue(int chunkSeq, int totalChunks, byte[] batchBytes) Encode the value payload for a PARTITION_MIGRATE_DATA frame with chunk metadata:[chunkSeq 4B][totalChunks 4B][batchBytes...]. -
decodeChunkedDataValue
Decode a PARTITION_MIGRATE_DATA value emitted byencodeChunkedDataValue(int, int, byte[]). -
encodeAckValue
public static byte[] encodeAckValue(int chunkSeq) -
decodeAckValue
public static int decodeAckValue(byte[] payload) -
onMigrateComplete
Handle incoming PARTITION_MIGRATE_COMPLETE message. Finalizes receipt of a migrated slot.- Parameters:
slotId- slot ID that completed migrationsourceNodeId- the source node
-
onOwnershipUpdate
-
onCommittedOwnershipUpdate
public boolean onCommittedOwnershipUpdate(byte[] payload) Applies an ownership update that has already been committed by consensus.Unlike network anti-entropy updates, a committed Raft log entry must be deterministic and must not depend on this node's transient local cluster-state view. The membership encoded in the committed payload is the source of truth.
-
applyCommittedOwnershipTable
public boolean applyCommittedOwnershipTable(Map<Integer, String> ownership, List<String> topologyMembers) Applies an exact ownership table that has already passed the external consensus boundary. This is used both by the Raft state machine and by the pre-Raft bootstrap path before a cluster has enough verified peers to start elections. -
applyCommittedOwnershipTable
-
canApplyCommittedOwnershipTable
-
nextOwnershipRevision
public long nextOwnershipRevision() -
currentOwnershipEquals
-
currentOwnershipSnapshot
-
hasOwnershipTable
public boolean hasOwnershipTable() -
snapshotOwnershipTable
-
restoreOwnershipTable
-
sendCurrentOwnershipUpdate
Unicast the current partition ownership topology to a single peer as anti-entropy.This repairs peers that missed an earlier ownership broadcast but later reconnected with the same topology view.
- Parameters:
targetNodeId- the peer to updatetopologyMembers- the current alive topology- Returns:
- true if the update was sent, or no ownership exists yet
-
getMigrationProgress
-
isMigratingSlots
public boolean isMigratingSlots()Check if any migrations are currently in progress.- Returns:
- true if migrations are active
-
getOutgoingSlots
-
recordOutgoingMigrationStart
-
recordOutgoingMigrationComplete
-
recordOutgoingMigrationFailure
-
outgoingHighWaterMark
-
getActiveOutgoingMigrations
public Map<Integer, PartitionMigrationManager.ActiveOutgoingMigration> getActiveOutgoingMigrations() -
getIncomingSlots
-
onMigrationRecoveryRequest
-
isMigrationRecoveryRequest
-
encodeMigrationRecoveryRequest
public static byte[] encodeMigrationRecoveryRequest(PartitionMigrationManager.MigrationRecoveryRequest request) -
decodeMigrationRecoveryRequest
public static PartitionMigrationManager.MigrationRecoveryRequest decodeMigrationRecoveryRequest(byte[] payload) -
serializeOwnershipMembers
-
serializeOwnershipTable
-
serializeOwnershipTable
-
isSerializedOwnershipTable
public static boolean isSerializedOwnershipTable(byte[] payload) -
deserializeOwnershipTable
public static PartitionMigrationManager.CommittedOwnershipTable deserializeOwnershipTable(byte[] payload) -
deserializeOwnershipMembers
-