Class WanPublisher

java.lang.Object
com.loomcache.server.wan.WanPublisher

public class WanPublisher extends Object
Batches and ships WAN replication events to a remote cluster.

Events are accumulated in a bounded blocking queue and periodically flushed in configurable-sized batches. Batches larger than 1024 bytes are GZIP-compressed before transmission.

Acknowledgment Tracking: Each event gets a monotonic sequence number. The remote consumer acknowledges up to a given sequence, allowing the publisher to track replication lag.

Failure Handling: Failed sends are retried with exponential backoff (capped at 30000Lms). Events remain in the queue until successfully sent.

Thread Model: A single virtual thread handles the flush loop.

Since:
2.0
  • Constructor Details

    • WanPublisher

      public WanPublisher(@NonNull String targetClusterId, @NonNull String targetEndpoint, int batchSize, long flushIntervalMs)
      Create a WAN publisher.
      Parameters:
      targetClusterId - the target cluster identifier
      targetEndpoint - the target cluster endpoint (host:port)
      batchSize - maximum events per batch
      flushIntervalMs - flush interval in milliseconds
    • WanPublisher

      public WanPublisher(@NonNull String targetClusterId, @NonNull String targetEndpoint, int batchSize, long flushIntervalMs, @NonNull WanPublisher.WanTransport transport)
    • WanPublisher

      public WanPublisher(@NonNull String sourceClusterId, @NonNull String targetClusterId, @NonNull String targetEndpoint, int batchSize, long flushIntervalMs, @NonNull WanPublisher.WanTransport transport)
  • Method Details

    • socketTransportFactory

      public static @NonNull WanPublisher.WanTransportFactory socketTransportFactory()
    • tlsCertificateAuthTransportFactory

      public static @NonNull WanPublisher.WanTransportFactory tlsCertificateAuthTransportFactory(@NonNull TlsConfig tlsConfig)
    • start

      public void start()
      Start the background flush thread.
    • stop

      public void stop()
      Stop the publisher and drain remaining events.
    • enqueue

      public void enqueue(@NonNull WanPublisher.WanEvent event) throws InterruptedException
      Add an event to the pending queue. Blocks if the queue is full until space becomes available — events are never silently dropped.
      Parameters:
      event - the replication event to enqueue
      Throws:
      InterruptedException - if the thread is interrupted while waiting for space
    • nextSequence

      public long nextSequence()
      Generate the next sequence number.
      Returns:
      a unique monotonically increasing sequence number
    • acknowledge

      public void acknowledge(long sequence)
      Record that the remote cluster has acknowledged events up to this sequence.
      Parameters:
      sequence - the acknowledged sequence number
    • getPendingCount

      public long getPendingCount()
      Returns:
      number of events waiting to be sent
    • getLastAckedSequence

      public long getLastAckedSequence()
      Returns:
      the last acknowledged sequence number
    • getCurrentSequence

      public long getCurrentSequence()
      Returns:
      current (latest assigned) sequence number
    • getBytesTransferred

      public long getBytesTransferred()
      Returns:
      total bytes transferred to remote cluster
    • getTotalSendFailures

      public long getTotalSendFailures()
      Returns:
      total number of failed send attempts
    • getQueueFullBackpressureCount

      public long getQueueFullBackpressureCount()
      Returns:
      total number of times enqueue blocked due to a full queue
    • getTargetClusterId

      public @NonNull String getTargetClusterId()
      Returns:
      the target cluster identifier
    • getTargetEndpoint

      public @NonNull String getTargetEndpoint()
      Returns:
      the target endpoint
    • toSnapshot

      public @NonNull HashMap<String,Object> toSnapshot()
      Creates a serializable snapshot of the publisher's replication state.

      Captures sequence counters, transfer metrics, and all pending events so that replication can resume after a node crash without losing queued events or resetting sequence tracking.

      Returns:
      a HashMap suitable for inclusion in a Raft snapshot
    • restoreFromSnapshot

      public void restoreFromSnapshot(@NonNull HashMap<String,Object> snapshot)
      Restores publisher state from a Raft snapshot.

      Recovers sequence counters, transfer metrics, and pending events. The flush thread is not restarted by this method — call start() separately after restore.

      Parameters:
      snapshot - the snapshot data previously produced by toSnapshot()
    • serializeBatch

      public static byte @NonNull [] serializeBatch(@NonNull List<WanPublisher.WanEvent> batch)
      Serialize a batch of events to a byte array.
      Parameters:
      batch - the events to serialize
      Returns:
      serialized bytes
    • deserializeBatch

      public static @NonNull List<WanPublisher.WanEvent> deserializeBatch(byte @NonNull [] data)
      Deserialize a batch of events from a byte array.
      Parameters:
      data - the serialized bytes (uncompressed)
      Returns:
      the deserialized events
    • compress

      public static byte @NonNull [] compress(byte @NonNull [] data)
      GZIP-compress data.
      Parameters:
      data - the raw data
      Returns:
      compressed data
    • decompress

      public static byte @NonNull [] decompress(byte @NonNull [] compressed)