Class WanReplicationManager

java.lang.Object
com.loomcache.server.wan.WanReplicationManager

public class WanReplicationManager extends Object
Central orchestrator for WAN (Wide Area Network) replication.

Manages replication of data to multiple target clusters with two modes:

Thread Safety: All target management uses ConcurrentHashMap. The running flag is an AtomicBoolean for safe start/shutdown.

Since:
2.0
  • Constructor Details

    • WanReplicationManager

      public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, int defaultBatchSize, long defaultFlushIntervalMs)
      Create a new WAN replication manager.
      Parameters:
      localClusterId - the local cluster identifier (non-null)
      instanceNumber - the node instance number for logging
      registry - the data structure registry for applying remote events
      defaultBatchSize - default batch size for publishers
      defaultFlushIntervalMs - default flush interval in milliseconds
    • WanReplicationManager

      public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, int defaultBatchSize, long defaultFlushIntervalMs, @NonNull WanAcknowledgeType acknowledgeType)
      Create a new WAN replication manager.
      Parameters:
      localClusterId - the local cluster identifier (non-null)
      instanceNumber - the node instance number for logging
      registry - the data structure registry for applying remote events
      defaultBatchSize - default batch size for publishers
      defaultFlushIntervalMs - default flush interval in milliseconds
      acknowledgeType - when a target acknowledges received WAN batches
    • WanReplicationManager

      public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, int defaultBatchSize, long defaultFlushIntervalMs, @NonNull WanAcknowledgeType acknowledgeType, boolean persistWanReplicatedData)
      Create a new WAN replication manager.
      Parameters:
      localClusterId - the local cluster identifier (non-null)
      instanceNumber - the node instance number for logging
      registry - the data structure registry for applying remote events
      defaultBatchSize - default batch size for publishers
      defaultFlushIntervalMs - default flush interval in milliseconds
      acknowledgeType - when a target acknowledges received WAN batches
      persistWanReplicatedData - whether incoming WAN events update target MapStore
    • WanReplicationManager

      public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry)
      Create a WAN replication manager with default batch settings.
      Parameters:
      localClusterId - the local cluster identifier
      instanceNumber - the node instance number
      registry - the data structure registry
    • WanReplicationManager

      public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, @NonNull WanAcknowledgeType acknowledgeType)
      Create a WAN replication manager with default batch settings.
      Parameters:
      localClusterId - the local cluster identifier
      instanceNumber - the node instance number
      registry - the data structure registry
      acknowledgeType - when a target acknowledges received WAN batches
    • WanReplicationManager

      public WanReplicationManager(@NonNull String localClusterId, int instanceNumber, @NonNull DataStructureRegistry registry, @NonNull WanAcknowledgeType acknowledgeType, boolean persistWanReplicatedData)
      Create a WAN replication manager with default batch settings.
      Parameters:
      localClusterId - the local cluster identifier
      instanceNumber - the node instance number
      registry - the data structure registry
      acknowledgeType - when a target acknowledges received WAN batches
      persistWanReplicatedData - whether incoming WAN events update target MapStore
  • Method Details

    • start

      public void start()
      Start the WAN replication manager and all configured publishers.
    • shutdown

      public void shutdown()
      Shut down the WAN replication manager and all publishers.
    • isRunning

      public boolean isRunning()
      Check whether the manager is currently running.
      Returns:
      true if running
    • getAcknowledgeType

      public @NonNull WanAcknowledgeType getAcknowledgeType()
    • setAcknowledgeType

      public void setAcknowledgeType(@NonNull WanAcknowledgeType acknowledgeType)
    • isPersistWanReplicatedData

      public boolean isPersistWanReplicatedData()
    • setPersistWanReplicatedData

      public void setPersistWanReplicatedData(boolean persistWanReplicatedData)
    • setTransportFactory

      public void setTransportFactory(WanPublisher.WanTransportFactory transportFactory)
    • addTarget

      public void addTarget(@NonNull String clusterId, @NonNull String endpoint, @NonNull WanReplicationManager.WanMode mode)
      Add a replication target cluster.
      Parameters:
      clusterId - the target cluster identifier (non-null)
      endpoint - the target cluster endpoint (host:port, non-null)
      mode - the replication mode (non-null)
      Throws:
      IllegalArgumentException - if target already exists
      IllegalStateException - if manager is not running
    • removeTarget

      public void removeTarget(@NonNull String clusterId)
      Remove a replication target.
      Parameters:
      clusterId - the target cluster identifier (non-null)
      Throws:
      IllegalArgumentException - if target does not exist
    • pause

      public void pause(@NonNull String clusterId)
      Pause replication to a specific target.
      Parameters:
      clusterId - the target cluster identifier (non-null)
      Throws:
      IllegalArgumentException - if target does not exist
    • resume

      public void resume(@NonNull String clusterId)
      Resume replication to a specific target.
      Parameters:
      clusterId - the target cluster identifier (non-null)
      Throws:
      IllegalArgumentException - if target does not exist
    • isPaused

      public boolean isPaused(@NonNull String clusterId)
      Check whether replication to a target is paused.
      Parameters:
      clusterId - the target cluster identifier
      Returns:
      true if paused
    • onMapChange

      public void onMapChange(@NonNull String mapName, @NonNull String key, byte @Nullable [] oldValue, byte @Nullable [] newValue, @NonNull WanReplicationManager.ChangeType type)
      Called by the event system when a map entry changes. Fans out the event to all active, non-paused WAN targets.
      Parameters:
      mapName - the map name
      key - the affected key
      oldValue - the previous value (null for new entries)
      newValue - the new value (null for deletions)
      type - the change type
    • onCrdtMerge

      public void onCrdtMerge(@NonNull String crdtName, @NonNull String crdtType, byte @NonNull [] state)
      Called when a CRDT state needs to be shipped to remote clusters. Only meaningful for ACTIVE_ACTIVE mode targets.
      Parameters:
      crdtName - the CRDT instance name
      crdtType - the CRDT type (e.g., "PN_COUNTER", "OR_SET")
      state - the serialized CRDT state
    • buildMerkleTree

      public @NonNull WanMerkleTree buildMerkleTree()
      Build a deterministic Merkle digest for all WAN-replicated map entries. Distributed JCache caches are included via their qualified jcache.<cacheName> backing maps.
      Returns:
      the local Merkle tree
    • enqueueDeltaSync

      public @NonNull WanReplicationManager.WanDeltaSyncResult enqueueDeltaSync(@NonNull String targetClusterId, @NonNull WanMerkleTree remoteTree)
      Compare local map/JCache backing state with a remote Merkle tree and enqueue only the keys that differ.

      PUT events are emitted for local keys that are missing or stale on the remote side. DELETE events are emitted for keys that exist only in the remote tree. The caller is responsible for shipping the queued events via the target publisher.

      Parameters:
      targetClusterId - the target cluster identifier
      remoteTree - the remote cluster's Merkle tree
      Returns:
      summary of the planned delta
      Throws:
      IllegalArgumentException - if the target does not exist or tree bucket counts differ
      IllegalStateException - if the thread is interrupted while enqueueing deltas
    • toSnapshot

      public @NonNull HashMap<String,Object> toSnapshot()
      Creates a serializable snapshot of all WAN replication targets and their state.

      Captures each target's configuration (endpoint, mode, paused flag) and delegates to WanPublisher.toSnapshot() and WanConsumer.toSnapshot() for per-target replication state. This allows WAN replication to resume seamlessly after a node crash.

      Returns:
      a HashMap suitable for inclusion in a Raft snapshot
    • restoreFromSnapshot

      public void restoreFromSnapshot(@NonNull HashMap<String,Object> snapshot)
      Restores WAN replication targets and their state from a Raft snapshot.

      Re-creates any target that is not already present and delegates publisher/consumer state restoration. Targets that existed before restore are updated in place (paused flag, publisher/consumer state).

      Parameters:
      snapshot - the snapshot data previously produced by toSnapshot()
    • getStats

      public @NonNull WanReplicationManager.WanReplicationStats getStats(@NonNull String clusterId)
      Get replication statistics for a specific target cluster.
      Parameters:
      clusterId - the target cluster identifier
      Returns:
      the replication statistics
      Throws:
      IllegalArgumentException - if target does not exist
    • getAllStats

      public @NonNull Map<String, WanReplicationManager.WanReplicationStats> getAllStats()
      Get replication statistics for all target clusters.
      Returns:
      a map of cluster ID to statistics
    • getTargetCount

      public int getTargetCount()
      Get the number of configured targets.
      Returns:
      target count
    • getLocalClusterId

      public @NonNull String getLocalClusterId()
      Get the local cluster identifier.
      Returns:
      the local cluster ID
    • getConsumer

      public @Nullable WanConsumer getConsumer(@NonNull String clusterId)
      Get the consumer for a specific target cluster. Used by the handler to apply incoming replication batches.
      Parameters:
      clusterId - the source cluster identifier
      Returns:
      the WAN consumer, or null if not found
    • setMergePolicy

      public void setMergePolicy(@NonNull String clusterId, @Nullable MergePolicy<String,String> mergePolicy)
      Install or clear the MergePolicy override on the consumer for a target.

      When a merge policy is set, it supersedes the default WanConsumer.ConflictResolver path for PUT events on that target.

      Parameters:
      clusterId - the source/target cluster identifier (non-null)
      mergePolicy - the merge policy to install, or null to clear
      Throws:
      IllegalArgumentException - if the target does not exist
    • setMergePolicyByClassName

      public void setMergePolicyByClassName(@NonNull String clusterId, @NonNull String policyClassName)
      Load and install a WAN merge policy by class name.

      Custom policy classes must implement CustomMergePolicy and expose either a public static instance() method or a public no-arg constructor.

      Parameters:
      clusterId - the source/target cluster identifier (non-null)
      policyClassName - fully qualified merge-policy class name
      Throws:
      IllegalArgumentException - if the target does not exist or the policy cannot be loaded
    • setMergePolicyByClassName

      public void setMergePolicyByClassName(@NonNull String clusterId, @NonNull String policyClassName, @Nullable ClassLoader classLoader)
      Load and install a WAN merge policy by class name with an explicit classloader.
      Parameters:
      clusterId - the source/target cluster identifier (non-null)
      policyClassName - fully qualified merge-policy class name
      classLoader - classloader to load the policy from, or null for the default loader
      Throws:
      IllegalArgumentException - if the target does not exist or the policy cannot be loaded
    • getPublisher

      public @Nullable WanPublisher getPublisher(@NonNull String clusterId)
      Get the publisher for a specific target cluster. Used by the handler to process acknowledgments.
      Parameters:
      clusterId - the target cluster identifier
      Returns:
      the WAN publisher, or null if not found