Class PartitionMigrationManager

java.lang.Object
com.loomcache.server.cluster.PartitionMigrationManager

public class PartitionMigrationManager extends Object
Coordinates partition migration when cluster topology changes.

Responsibilities:

  • Detect membership changes (nodes joining/leaving)
  • Compute migration delta: which slots move to which nodes
  • Initiate partition migrations (source → target)
  • Track migration progress and metrics

Migration protocol:

  1. Source node sends PARTITION_MIGRATE_START to target (slot = slotId)
  2. Source batches key-value pairs via PARTITION_MIGRATE_DATA (batch size = 1000)
  3. Source sends PARTITION_MIGRATE_COMPLETE when done
  4. All nodes broadcast PARTITION_OWNERSHIP_UPDATE with new table

Thread safety: Uses ConcurrentHashMap for metrics. Migration is async and non-blocking.

Since:
1.0
See Also:
  • Field Details

  • Constructor Details

    • PartitionMigrationManager

      public PartitionMigrationManager(PartitionTable partitionTable, TcpServer tcpServer, String nodeId, int instanceNumber)
    • PartitionMigrationManager

      public PartitionMigrationManager(PartitionTable partitionTable, TcpServer tcpServer, String nodeId, int instanceNumber, int batchSize)
  • Method Details

    • setMigrationPipeline

      public void setMigrationPipeline(@Nullable PartitionMigrationPipeline pipeline)
      Set the migration pipeline for real data movement during topology changes. When set, migrations will use the pipeline's PREPARE/COPY/VERIFY/COMMIT/CLEANUP stages instead of the placeholder protocol.
      Parameters:
      pipeline - the migration pipeline to use
    • hasMigrationPipeline

      public boolean hasMigrationPipeline()
    • addMigrationListener

      public void addMigrationListener(MigrationListener listener)
      Register an operator listener for partition migration lifecycle events.
      Parameters:
      listener - listener to add
    • removeMigrationListener

      public boolean removeMigrationListener(MigrationListener listener)
      Remove a previously registered migration listener.
      Parameters:
      listener - listener to remove
      Returns:
      true if the listener was registered
    • setMigratableDataPresentSupplier

      public void setMigratableDataPresentSupplier(@Nullable BooleanSupplier supplier)
      Installs an optional predicate used to detect whether this node currently holds user data that would need migration during an ownership cutover.

      Without a predicate, cutovers remain rejected by default when no migration pipeline is configured. Nodes may opt into bootstrap-time cutovers by reporting that their registries are still empty.

    • setMigrationsAllowedSupplier

      public void setMigrationsAllowedSupplier(@Nullable BooleanSupplier supplier)
      Installs an optional predicate that controls whether membership changes may start partition migrations. When absent, migrations are allowed.
    • setOwnershipTableCommitter

      public void setOwnershipTableCommitter(@Nullable PartitionMigrationManager.OwnershipTableCommitter committer)
      Installs the consensus-backed ownership-table committer used for migration cutovers. If absent, the manager retains the legacy local-table plus acknowledged-broadcast behavior used by isolated unit tests.
    • setLoomMetrics

      public void setLoomMetrics(@Nullable LoomMetrics loomMetrics)
    • onMembershipChange

      public boolean onMembershipChange(List<String> oldMembers, List<String> newMembers)
      Called when cluster membership changes (node join/leave detected).

      Algorithm:

      1. Capture old partition table
      2. Call partitionTable.reassignSlots(newMembers)
      3. Compute delta: which slots this node must send/receive
      4. Initiate migrations (in background)
      Parameters:
      oldMembers - previous cluster members (node IDs)
      newMembers - new cluster members (node IDs) after topology change
    • onMigrateStart

      public void onMigrateStart(int slotId, String sourceNodeId)
      Handle incoming PARTITION_MIGRATE_START message. Called by CacheNode when this node receives migration data.
      Parameters:
      slotId - slot ID being migrated
      sourceNodeId - the source node sending data
    • onMigrateData

      public void onMigrateData(int slotId, String sourceNodeId, byte[] batchData)
      Handle incoming PARTITION_MIGRATE_DATA message. Receives a batch of key-value pairs for a slot from a source node.

      Legacy signature (no chunk metadata): treats the entire payload as chunk 0 of a single-chunk migration. Preserved for tests that exercise the v2.0 non-chunked wire.

    • onMigrateData

      public void onMigrateData(int slotId, String sourceNodeId, int chunkSeq, int totalChunks, byte[] batchData)
      Handle incoming PARTITION_MIGRATE_DATA with chunk metadata.

      Session 10 ESC-12.1/12.2: the value payload now carries the chunk sequence and the total chunk count inline so the target can:

      1. Dedup re-delivered chunks (at-least-once + idempotent apply),
      2. Send back PARTITION_MIGRATE_DATA_ACK so the source can advance its chunk window or detect stalls and retry.
      Parameters:
      slotId - slot ID being migrated
      sourceNodeId - the source node
      chunkSeq - 0-based chunk sequence
      totalChunks - total chunks expected for this slot (≥ 1)
      batchData - serialized key-value pairs for THIS chunk only
    • onMigrateDataAck

      public void onMigrateDataAck(int slotId, String targetNodeId, int chunkSeq)
      Handle incoming PARTITION_MIGRATE_DATA_ACK on the SOURCE side. Completes the CompletableFuture registered via awaitChunkAck(int, String, int) for the given chunk.
      Parameters:
      slotId - slot the ACK is for
      targetNodeId - the node sending the ACK (the migration target)
      chunkSeq - chunk sequence being acknowledged
    • awaitChunkAck

      public CompletableFuture<Integer> awaitChunkAck(int slotId, String targetNodeId, int chunkSeq)
      Register an ACK waiter for a specific (target, slot, chunkSeq) tuple. Must be called BEFORE the DATA chunk is sent so an ACK arriving concurrently with registration races completes the future predictably.
      Parameters:
      slotId - slot being migrated
      targetNodeId - target of the migration
      chunkSeq - chunk sequence the caller is about to send
      Returns:
      a future that completes with chunkSeq when target ACKs
    • cancelChunkAck

      public void cancelChunkAck(int slotId, String targetNodeId, int chunkSeq)
      Cancel an ACK waiter (used when the caller gives up, e.g. on timeout).
    • encodeChunkedDataValue

      public static byte[] encodeChunkedDataValue(int chunkSeq, int totalChunks, byte[] batchBytes)
      Encode the value payload for a PARTITION_MIGRATE_DATA frame with chunk metadata: [chunkSeq 4B][totalChunks 4B][batchBytes...].
    • decodeChunkedDataValue

      public static PartitionMigrationManager.ChunkedDataFrame decodeChunkedDataValue(byte[] payload)
      Decode a PARTITION_MIGRATE_DATA value emitted by encodeChunkedDataValue(int, int, byte[]).
    • encodeAckValue

      public static byte[] encodeAckValue(int chunkSeq)
    • decodeAckValue

      public static int decodeAckValue(byte[] payload)
    • onMigrateComplete

      public void onMigrateComplete(int slotId, String sourceNodeId)
      Handle incoming PARTITION_MIGRATE_COMPLETE message. Finalizes receipt of a migrated slot.
      Parameters:
      slotId - slot ID that completed migration
      sourceNodeId - the source node
    • onOwnershipUpdate

      public boolean onOwnershipUpdate(byte[] payload, List<String> expectedMembers)
    • onCommittedOwnershipUpdate

      public boolean onCommittedOwnershipUpdate(byte[] payload)
      Applies an ownership update that has already been committed by consensus.

      Unlike network anti-entropy updates, a committed Raft log entry must be deterministic and must not depend on this node's transient local cluster-state view. The membership encoded in the committed payload is the source of truth.

    • applyCommittedOwnershipTable

      public boolean applyCommittedOwnershipTable(Map<Integer,String> ownership, List<String> topologyMembers)
      Applies an exact ownership table that has already passed the external consensus boundary. This is used both by the Raft state machine and by the pre-Raft bootstrap path before a cluster has enough verified peers to start elections.
    • applyCommittedOwnershipTable

      public boolean applyCommittedOwnershipTable(Map<Integer,String> ownership, List<String> topologyMembers, long revision)
    • canApplyCommittedOwnershipTable

      public boolean canApplyCommittedOwnershipTable(Map<Integer,String> ownership, List<String> topologyMembers, long revision)
    • nextOwnershipRevision

      public long nextOwnershipRevision()
    • currentOwnershipEquals

      public boolean currentOwnershipEquals(Map<Integer,String> expectedOwnership)
    • currentOwnershipSnapshot

      public Map<Integer,String> currentOwnershipSnapshot()
    • hasOwnershipTable

      public boolean hasOwnershipTable()
    • snapshotOwnershipTable

      public Optional<byte[]> snapshotOwnershipTable()
    • restoreOwnershipTable

      public void restoreOwnershipTable(@Nullable Object snapshotValue)
    • sendCurrentOwnershipUpdate

      public boolean sendCurrentOwnershipUpdate(String targetNodeId, List<String> topologyMembers)
      Unicast the current partition ownership topology to a single peer as anti-entropy.

      This repairs peers that missed an earlier ownership broadcast but later reconnected with the same topology view.

      Parameters:
      targetNodeId - the peer to update
      topologyMembers - the current alive topology
      Returns:
      true if the update was sent, or no ownership exists yet
    • getMigrationProgress

      public Map<Integer,Long> getMigrationProgress()
      Get current migration metrics.
      Returns:
      map of slotId → bytes migrated
    • isMigratingSlots

      public boolean isMigratingSlots()
      Check if any migrations are currently in progress.
      Returns:
      true if migrations are active
    • getOutgoingSlots

      public Set<Integer> getOutgoingSlots()
      Get the set of slots currently being sent FROM this node.
      Returns:
      set of outgoing slot IDs
    • recordOutgoingMigrationStart

      public void recordOutgoingMigrationStart(int slotId, String targetNodeId, int highWaterMark)
    • recordOutgoingMigrationComplete

      public void recordOutgoingMigrationComplete(int slotId, String targetNodeId)
    • recordOutgoingMigrationFailure

      public void recordOutgoingMigrationFailure(int slotId, String targetNodeId, @Nullable String failureMessage)
    • outgoingHighWaterMark

      public int outgoingHighWaterMark(int slotId, String targetNodeId)
    • getActiveOutgoingMigrations

      public Map<Integer, PartitionMigrationManager.ActiveOutgoingMigration> getActiveOutgoingMigrations()
    • getIncomingSlots

      public Set<Integer> getIncomingSlots()
      Get the set of slots currently being received BY this node.
      Returns:
      set of incoming slot IDs
    • onMigrationRecoveryRequest

      public boolean onMigrationRecoveryRequest(int slotId, String requesterNodeId, byte[] payload, List<String> liveMembers)
    • isMigrationRecoveryRequest

      public static boolean isMigrationRecoveryRequest(Message message)
    • encodeMigrationRecoveryRequest

      public static byte[] encodeMigrationRecoveryRequest(PartitionMigrationManager.MigrationRecoveryRequest request)
    • decodeMigrationRecoveryRequest

      public static PartitionMigrationManager.MigrationRecoveryRequest decodeMigrationRecoveryRequest(byte[] payload)
    • serializeOwnershipMembers

      public static byte[] serializeOwnershipMembers(List<String> members)
    • serializeOwnershipTable

      public static byte[] serializeOwnershipTable(Map<Integer,String> ownership, List<String> members)
    • serializeOwnershipTable

      public static byte[] serializeOwnershipTable(Map<Integer,String> ownership, List<String> members, long revision)
    • isSerializedOwnershipTable

      public static boolean isSerializedOwnershipTable(byte[] payload)
    • deserializeOwnershipTable

      public static PartitionMigrationManager.CommittedOwnershipTable deserializeOwnershipTable(byte[] payload)
    • deserializeOwnershipMembers

      public static List<String> deserializeOwnershipMembers(byte[] payload)