Class WanConsumer
Handles conflict resolution for ACTIVE_ACTIVE mode using a pluggable
WanConsumer.ConflictResolver strategy (LWW, LOCAL_WINS, REMOTE_WINS, or custom).
Idempotency: Tracks the last applied sequence per source cluster. Events with sequence numbers at or below the last applied sequence are skipped as duplicates, providing exactly-once semantics.
Thread Safety: Uses ConcurrentHashMap for sequence tracking and atomic counters for statistics.
Consistency Model: WAN events are applied directly to local data structures and acknowledgments are produced after local application only. This path is eventually consistent and does not guarantee that an event has been Raft-committed or durably persisted. Deployments that require stronger cross-cluster guarantees should route WAN updates through the Raft command path.
- Since:
- 2.0
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfaceStrategy for resolving conflicts when both local and remote clusters have concurrent writes to the same key.static final recordAcknowledgment for a processed WAN batch. -
Constructor Summary
ConstructorsConstructorDescriptionWanConsumer(@NonNull DataStructureRegistry registry, @NonNull WanConsumer.ConflictResolver conflictResolver) Create a WAN consumer.WanConsumer(@NonNull DataStructureRegistry registry, @NonNull WanConsumer.ConflictResolver conflictResolver, long gapTimeoutMs) Create a WAN consumer with configurable gap timeout. -
Method Summary
Modifier and TypeMethodDescription@NonNull WanConsumer.WanAckapplyBatch(@NonNull String sourceClusterId, @NonNull List<WanPublisher.WanEvent> events) Apply a batch of replication events from a remote cluster.longgetLastAppliedSequence(@NonNull String sourceClusterId) Get the last applied sequence for a source cluster.@Nullable MergePolicy<String, String> longlonglonglongvoidrestoreFromSnapshot(@NonNull HashMap<String, Object> snapshot) Restores consumer idempotency state from a Raft snapshot.voidsetMergePolicy(@Nullable MergePolicy<String, String> mergePolicy) Install an optionalMergePolicyoverride.Creates a serializable snapshot of the consumer's idempotency state.
-
Constructor Details
-
WanConsumer
public WanConsumer(@NonNull DataStructureRegistry registry, @NonNull WanConsumer.ConflictResolver conflictResolver) Create a WAN consumer.- Parameters:
registry- the data structure registry for applying eventsconflictResolver- the conflict resolution strategy
-
WanConsumer
public WanConsumer(@NonNull DataStructureRegistry registry, @NonNull WanConsumer.ConflictResolver conflictResolver, long gapTimeoutMs) Create a WAN consumer with configurable gap timeout.- Parameters:
registry- the data structure registry for applying eventsconflictResolver- the conflict resolution strategygapTimeoutMs- max time (ms) to buffer out-of-order events before warning about a stalled gap
-
-
Method Details
-
setMergePolicy
Install an optionalMergePolicyoverride.When set, the merge policy takes precedence over the
WanConsumer.ConflictResolverfor PUT events. When unset (default), the existing conflict-resolver path is used. Passingnullclears any previously-set policy.- Parameters:
mergePolicy- the merge policy to use for PUT conflicts, ornullto clear
-
getMergePolicy
- Returns:
- the current merge policy, or
nullif the default conflict-resolver path is used
-
applyBatch
public @NonNull WanConsumer.WanAck applyBatch(@NonNull String sourceClusterId, @NonNull List<WanPublisher.WanEvent> events) Apply a batch of replication events from a remote cluster.- Parameters:
sourceClusterId- the originating cluster identifierevents- the events to apply (non-null, may be empty)- Returns:
- an acknowledgment with the applied count and highest sequence; the acknowledgment confirms only local application, not Raft commit
-
getLastAppliedSequence
Get the last applied sequence for a source cluster.- Parameters:
sourceClusterId- the source cluster identifier- Returns:
- the last applied sequence (0 if no events applied yet)
-
getTotalApplied
public long getTotalApplied()- Returns:
- total events applied across all source clusters
-
getTotalConflicts
public long getTotalConflicts()- Returns:
- total conflicts resolved across all source clusters
-
getTotalDuplicatesSkipped
public long getTotalDuplicatesSkipped()- Returns:
- total duplicate events skipped
-
getTotalReorderBufferDrops
public long getTotalReorderBufferDrops() -
toSnapshot
-
restoreFromSnapshot
Restores consumer idempotency state from a Raft snapshot.Recovers per-source sequence tracking so that events already applied before the crash are not re-applied.
- Parameters:
snapshot- the snapshot data previously produced bytoSnapshot()
-