Class WanReplicationManager
Manages replication of data to multiple target clusters with two modes:
WanReplicationManager.WanMode.ACTIVE_PASSIVE— Raft log shipping from primary to replicaWanReplicationManager.WanMode.ACTIVE_ACTIVE— CRDT operation shipping with conflict resolution
Thread Safety: All target management uses ConcurrentHashMap.
The running flag is an AtomicBoolean for safe start/shutdown.
- Since:
- 2.0
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic enumType of data change event replicated over WAN.static final recordSummary of a Merkle-based WAN delta-sync plan.static enumWAN replication mode.static final recordReplication statistics for a single WAN target. -
Constructor Summary
ConstructorsConstructorDescriptionWanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry) Create a WAN replication manager with default batch settings.WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, int defaultBatchSize, long defaultFlushIntervalMs) Create a new WAN replication manager.WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, int defaultBatchSize, long defaultFlushIntervalMs, @NonNull WanAcknowledgeType acknowledgeType) Create a new WAN replication manager.WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, int defaultBatchSize, long defaultFlushIntervalMs, @NonNull WanAcknowledgeType acknowledgeType, boolean persistWanReplicatedData) Create a new WAN replication manager.WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, @NonNull WanAcknowledgeType acknowledgeType) Create a WAN replication manager with default batch settings.WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, @NonNull WanAcknowledgeType acknowledgeType, boolean persistWanReplicatedData) Create a WAN replication manager with default batch settings. -
Method Summary
Modifier and TypeMethodDescriptionvoidaddTarget(@NonNull String clusterId, @NonNull String endpoint, @NonNull WanReplicationManager.WanMode mode) Add a replication target cluster.@NonNull WanMerkleTreeBuild a deterministic Merkle digest for all WAN-replicated map entries.enqueueDeltaSync(@NonNull String targetClusterId, @NonNull WanMerkleTree remoteTree) Compare local map/JCache backing state with a remote Merkle tree and enqueue only the keys that differ.@NonNull WanAcknowledgeType@NonNull Map<String, WanReplicationManager.WanReplicationStats> Get replication statistics for all target clusters.@Nullable WanConsumergetConsumer(@NonNull String clusterId) Get the consumer for a specific target cluster.@NonNull StringGet the local cluster identifier.@Nullable WanPublishergetPublisher(@NonNull String clusterId) Get the publisher for a specific target cluster.Get replication statistics for a specific target cluster.intGet the number of configured targets.booleanCheck whether replication to a target is paused.booleanbooleanCheck whether the manager is currently running.voidonCrdtMerge(@NonNull String crdtName, @NonNull String crdtType, byte @NonNull [] state) Called when a CRDT state needs to be shipped to remote clusters.voidonMapChange(@NonNull String mapName, @NonNull String key, byte @Nullable [] oldValue, byte @Nullable [] newValue, @NonNull WanReplicationManager.ChangeType type) Called by the event system when a map entry changes.voidPause replication to a specific target.voidremoveTarget(@NonNull String clusterId) Remove a replication target.voidrestoreFromSnapshot(@NonNull HashMap<String, Object> snapshot) Restores WAN replication targets and their state from a Raft snapshot.voidResume replication to a specific target.voidsetAcknowledgeType(@NonNull WanAcknowledgeType acknowledgeType) voidsetMergePolicy(@NonNull String clusterId, @Nullable MergePolicy<String, String> mergePolicy) Install or clear theMergePolicyoverride on the consumer for a target.voidsetMergePolicyByClassName(@NonNull String clusterId, @NonNull String policyClassName) Load and install a WAN merge policy by class name.voidsetMergePolicyByClassName(@NonNull String clusterId, @NonNull String policyClassName, @Nullable ClassLoader classLoader) Load and install a WAN merge policy by class name with an explicit classloader.voidsetPersistWanReplicatedData(boolean persistWanReplicatedData) voidsetTransportFactory(WanPublisher.WanTransportFactory transportFactory) voidshutdown()Shut down the WAN replication manager and all publishers.voidstart()Start the WAN replication manager and all configured publishers.Creates a serializable snapshot of all WAN replication targets and their state.
-
Constructor Details
-
WanReplicationManager
public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, int defaultBatchSize, long defaultFlushIntervalMs) Create a new WAN replication manager.- Parameters:
localClusterId- the local cluster identifier (non-null)instanceNumber- the node instance number for loggingregistry- the data structure registry for applying remote eventsdefaultBatchSize- default batch size for publishersdefaultFlushIntervalMs- default flush interval in milliseconds
-
WanReplicationManager
public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, int defaultBatchSize, long defaultFlushIntervalMs, @NonNull WanAcknowledgeType acknowledgeType) Create a new WAN replication manager.- Parameters:
localClusterId- the local cluster identifier (non-null)instanceNumber- the node instance number for loggingregistry- the data structure registry for applying remote eventsdefaultBatchSize- default batch size for publishersdefaultFlushIntervalMs- default flush interval in millisecondsacknowledgeType- when a target acknowledges received WAN batches
-
WanReplicationManager
public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, int defaultBatchSize, long defaultFlushIntervalMs, @NonNull WanAcknowledgeType acknowledgeType, boolean persistWanReplicatedData) Create a new WAN replication manager.- Parameters:
localClusterId- the local cluster identifier (non-null)instanceNumber- the node instance number for loggingregistry- the data structure registry for applying remote eventsdefaultBatchSize- default batch size for publishersdefaultFlushIntervalMs- default flush interval in millisecondsacknowledgeType- when a target acknowledges received WAN batchespersistWanReplicatedData- whether incoming WAN events update target MapStore
-
WanReplicationManager
public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry) Create a WAN replication manager with default batch settings.- Parameters:
localClusterId- the local cluster identifierinstanceNumber- the node instance numberregistry- the data structure registry
-
WanReplicationManager
public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, @NonNull WanAcknowledgeType acknowledgeType) Create a WAN replication manager with default batch settings.- Parameters:
localClusterId- the local cluster identifierinstanceNumber- the node instance numberregistry- the data structure registryacknowledgeType- when a target acknowledges received WAN batches
-
WanReplicationManager
public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, @NonNull WanAcknowledgeType acknowledgeType, boolean persistWanReplicatedData) Create a WAN replication manager with default batch settings.- Parameters:
localClusterId- the local cluster identifierinstanceNumber- the node instance numberregistry- the data structure registryacknowledgeType- when a target acknowledges received WAN batchespersistWanReplicatedData- whether incoming WAN events update target MapStore
-
-
Method Details
-
start
public void start()Start the WAN replication manager and all configured publishers. -
shutdown
public void shutdown()Shut down the WAN replication manager and all publishers. -
isRunning
public boolean isRunning()Check whether the manager is currently running.- Returns:
- true if running
-
getAcknowledgeType
-
setAcknowledgeType
-
isPersistWanReplicatedData
public boolean isPersistWanReplicatedData() -
setPersistWanReplicatedData
public void setPersistWanReplicatedData(boolean persistWanReplicatedData) -
setTransportFactory
-
addTarget
public void addTarget(@NonNull String clusterId, @NonNull String endpoint, @NonNull WanReplicationManager.WanMode mode) Add a replication target cluster.- Parameters:
clusterId- the target cluster identifier (non-null)endpoint- the target cluster endpoint (host:port, non-null)mode- the replication mode (non-null)- Throws:
IllegalArgumentException- if target already existsIllegalStateException- if manager is not running
-
removeTarget
Remove a replication target.- Parameters:
clusterId- the target cluster identifier (non-null)- Throws:
IllegalArgumentException- if target does not exist
-
pause
Pause replication to a specific target.- Parameters:
clusterId- the target cluster identifier (non-null)- Throws:
IllegalArgumentException- if target does not exist
-
resume
Resume replication to a specific target.- Parameters:
clusterId- the target cluster identifier (non-null)- Throws:
IllegalArgumentException- if target does not exist
-
isPaused
Check whether replication to a target is paused.- Parameters:
clusterId- the target cluster identifier- Returns:
- true if paused
-
onMapChange
public void onMapChange(@NonNull String mapName, @NonNull String key, byte @Nullable [] oldValue, byte @Nullable [] newValue, @NonNull WanReplicationManager.ChangeType type) Called by the event system when a map entry changes. Fans out the event to all active, non-paused WAN targets.- Parameters:
mapName- the map namekey- the affected keyoldValue- the previous value (null for new entries)newValue- the new value (null for deletions)type- the change type
-
onCrdtMerge
Called when a CRDT state needs to be shipped to remote clusters. Only meaningful for ACTIVE_ACTIVE mode targets.- Parameters:
crdtName- the CRDT instance namecrdtType- the CRDT type (e.g., "PN_COUNTER", "OR_SET")state- the serialized CRDT state
-
buildMerkleTree
Build a deterministic Merkle digest for all WAN-replicated map entries. Distributed JCache caches are included via their qualifiedjcache.<cacheName>backing maps.- Returns:
- the local Merkle tree
-
enqueueDeltaSync
public @NonNull WanReplicationManager.WanDeltaSyncResult enqueueDeltaSync(@NonNull String targetClusterId, @NonNull WanMerkleTree remoteTree) Compare local map/JCache backing state with a remote Merkle tree and enqueue only the keys that differ.PUT events are emitted for local keys that are missing or stale on the remote side. DELETE events are emitted for keys that exist only in the remote tree. The caller is responsible for shipping the queued events via the target publisher.
- Parameters:
targetClusterId- the target cluster identifierremoteTree- the remote cluster's Merkle tree- Returns:
- summary of the planned delta
- Throws:
IllegalArgumentException- if the target does not exist or tree bucket counts differIllegalStateException- if the thread is interrupted while enqueueing deltas
-
toSnapshot
Creates a serializable snapshot of all WAN replication targets and their state.Captures each target's configuration (endpoint, mode, paused flag) and delegates to
WanPublisher.toSnapshot()andWanConsumer.toSnapshot()for per-target replication state. This allows WAN replication to resume seamlessly after a node crash.- Returns:
- a HashMap suitable for inclusion in a Raft snapshot
-
restoreFromSnapshot
Restores WAN replication targets and their state from a Raft snapshot.Re-creates any target that is not already present and delegates publisher/consumer state restoration. Targets that existed before restore are updated in place (paused flag, publisher/consumer state).
- Parameters:
snapshot- the snapshot data previously produced bytoSnapshot()
-
getStats
Get replication statistics for a specific target cluster.- Parameters:
clusterId- the target cluster identifier- Returns:
- the replication statistics
- Throws:
IllegalArgumentException- if target does not exist
-
getAllStats
Get replication statistics for all target clusters.- Returns:
- a map of cluster ID to statistics
-
getTargetCount
public int getTargetCount()Get the number of configured targets.- Returns:
- target count
-
getLocalClusterId
Get the local cluster identifier.- Returns:
- the local cluster ID
-
getConsumer
Get the consumer for a specific target cluster. Used by the handler to apply incoming replication batches.- Parameters:
clusterId- the source cluster identifier- Returns:
- the WAN consumer, or null if not found
-
setMergePolicy
public void setMergePolicy(@NonNull String clusterId, @Nullable MergePolicy<String, String> mergePolicy) Install or clear theMergePolicyoverride on the consumer for a target.When a merge policy is set, it supersedes the default
WanConsumer.ConflictResolverpath for PUT events on that target.- Parameters:
clusterId- the source/target cluster identifier (non-null)mergePolicy- the merge policy to install, ornullto clear- Throws:
IllegalArgumentException- if the target does not exist
-
setMergePolicyByClassName
Load and install a WAN merge policy by class name.Custom policy classes must implement
CustomMergePolicyand expose either a public staticinstance()method or a public no-arg constructor.- Parameters:
clusterId- the source/target cluster identifier (non-null)policyClassName- fully qualified merge-policy class name- Throws:
IllegalArgumentException- if the target does not exist or the policy cannot be loaded
-
setMergePolicyByClassName
public void setMergePolicyByClassName(@NonNull String clusterId, @NonNull String policyClassName, @Nullable ClassLoader classLoader) Load and install a WAN merge policy by class name with an explicit classloader.- Parameters:
clusterId- the source/target cluster identifier (non-null)policyClassName- fully qualified merge-policy class nameclassLoader- classloader to load the policy from, ornullfor the default loader- Throws:
IllegalArgumentException- if the target does not exist or the policy cannot be loaded
-
getPublisher
Get the publisher for a specific target cluster. Used by the handler to process acknowledgments.- Parameters:
clusterId- the target cluster identifier- Returns:
- the WAN publisher, or null if not found
-