Class WanConsumer

java.lang.Object
com.loomcache.server.wan.WanConsumer

public class WanConsumer extends Object
Receives and applies remote WAN replication events to local data structures.

Handles conflict resolution for ACTIVE_ACTIVE mode using a pluggable WanConsumer.ConflictResolver strategy (LWW, LOCAL_WINS, REMOTE_WINS, or custom).

Idempotency: Tracks the last applied sequence per source cluster. Events with sequence numbers at or below the last applied sequence are skipped as duplicates, providing exactly-once semantics.

Thread Safety: Uses ConcurrentHashMap for sequence tracking and atomic counters for statistics.

Consistency Model: WAN events are applied directly to local data structures and acknowledgments are produced after local application only. This path is eventually consistent and does not guarantee that an event has been Raft-committed or durably persisted. Deployments that require stronger cross-cluster guarantees should route WAN updates through the Raft command path.

Since:
2.0
  • Constructor Details

    • WanConsumer

      public WanConsumer(@NonNull DataStructureRegistry registry, @NonNull WanConsumer.ConflictResolver conflictResolver)
      Create a WAN consumer.
      Parameters:
      registry - the data structure registry for applying events
      conflictResolver - the conflict resolution strategy
    • WanConsumer

      public WanConsumer(@NonNull DataStructureRegistry registry, @NonNull WanConsumer.ConflictResolver conflictResolver, long gapTimeoutMs)
      Create a WAN consumer with configurable gap timeout.
      Parameters:
      registry - the data structure registry for applying events
      conflictResolver - the conflict resolution strategy
      gapTimeoutMs - max time (ms) to buffer out-of-order events before warning about a stalled gap
  • Method Details

    • setMergePolicy

      public void setMergePolicy(@Nullable MergePolicy<String,String> mergePolicy)
      Install an optional MergePolicy override.

      When set, the merge policy takes precedence over the WanConsumer.ConflictResolver for PUT events. When unset (default), the existing conflict-resolver path is used. Passing null clears any previously-set policy.

      Parameters:
      mergePolicy - the merge policy to use for PUT conflicts, or null to clear
    • getMergePolicy

      public @Nullable MergePolicy<String,String> getMergePolicy()
      Returns:
      the current merge policy, or null if the default conflict-resolver path is used
    • applyBatch

      public @NonNull WanConsumer.WanAck applyBatch(@NonNull String sourceClusterId, @NonNull List<WanPublisher.WanEvent> events)
      Apply a batch of replication events from a remote cluster.
      Parameters:
      sourceClusterId - the originating cluster identifier
      events - the events to apply (non-null, may be empty)
      Returns:
      an acknowledgment with the applied count and highest sequence; the acknowledgment confirms only local application, not Raft commit
    • getLastAppliedSequence

      public long getLastAppliedSequence(@NonNull String sourceClusterId)
      Get the last applied sequence for a source cluster.
      Parameters:
      sourceClusterId - the source cluster identifier
      Returns:
      the last applied sequence (0 if no events applied yet)
    • getTotalApplied

      public long getTotalApplied()
      Returns:
      total events applied across all source clusters
    • getTotalConflicts

      public long getTotalConflicts()
      Returns:
      total conflicts resolved across all source clusters
    • getTotalDuplicatesSkipped

      public long getTotalDuplicatesSkipped()
      Returns:
      total duplicate events skipped
    • getTotalReorderBufferDrops

      public long getTotalReorderBufferDrops()
    • toSnapshot

      public @NonNull HashMap<String,Object> toSnapshot()
      Creates a serializable snapshot of the consumer's idempotency state.

      Captures per-source sequence tracking and aggregate metrics so that duplicate detection survives node crashes.

      Returns:
      a HashMap suitable for inclusion in a Raft snapshot
    • restoreFromSnapshot

      public void restoreFromSnapshot(@NonNull HashMap<String,Object> snapshot)
      Restores consumer idempotency state from a Raft snapshot.

      Recovers per-source sequence tracking so that events already applied before the crash are not re-applied.

      Parameters:
      snapshot - the snapshot data previously produced by toSnapshot()