Class WanPublisher
Events are accumulated in a bounded blocking queue and periodically flushed in configurable-sized batches. Batches larger than 1024 bytes are GZIP-compressed before transmission.
Acknowledgment Tracking: Each event gets a monotonic sequence number. The remote consumer acknowledges up to a given sequence, allowing the publisher to track replication lag.
Failure Handling: Failed sends are retried with exponential backoff (capped at 30000Lms). Events remain in the queue until successfully sent.
Thread Model: A single virtual thread handles the flush loop.
- Since:
- 2.0
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic final recordstatic final recordstatic final recordA single WAN replication event.static interfacestatic interface -
Constructor Summary
ConstructorsConstructorDescriptionWanPublisher(@NonNull String targetClusterId, @NonNull String targetEndpoint, int batchSize, long flushIntervalMs) Create a WAN publisher.WanPublisher(@NonNull String targetClusterId, @NonNull String targetEndpoint, int batchSize, long flushIntervalMs, @NonNull WanPublisher.WanTransport transport) WanPublisher(@NonNull String sourceClusterId, @NonNull String targetClusterId, @NonNull String targetEndpoint, int batchSize, long flushIntervalMs, @NonNull WanPublisher.WanTransport transport) -
Method Summary
Modifier and TypeMethodDescriptionvoidacknowledge(long sequence) Record that the remote cluster has acknowledged events up to this sequence.static byte @NonNull []compress(byte @NonNull [] data) GZIP-compress data.static byte @NonNull []decompress(byte @NonNull [] compressed) static @NonNull List<WanPublisher.WanEvent> deserializeBatch(byte @NonNull [] data) Deserialize a batch of events from a byte array.voidenqueue(@NonNull WanPublisher.WanEvent event) Add an event to the pending queue.longlonglonglonglong@NonNull String@NonNull StringlonglongGenerate the next sequence number.voidrestoreFromSnapshot(@NonNull HashMap<String, Object> snapshot) Restores publisher state from a Raft snapshot.static byte @NonNull []serializeBatch(@NonNull List<WanPublisher.WanEvent> batch) Serialize a batch of events to a byte array.static @NonNull WanPublisher.WanTransportFactoryvoidstart()Start the background flush thread.voidstop()Stop the publisher and drain remaining events.static @NonNull WanPublisher.WanTransportFactorytlsCertificateAuthTransportFactory(@NonNull TlsConfig tlsConfig) Creates a serializable snapshot of the publisher's replication state.
-
Constructor Details
-
WanPublisher
public WanPublisher(@NonNull String targetClusterId, @NonNull String targetEndpoint, int batchSize, long flushIntervalMs) Create a WAN publisher.- Parameters:
targetClusterId- the target cluster identifiertargetEndpoint- the target cluster endpoint (host:port)batchSize- maximum events per batchflushIntervalMs- flush interval in milliseconds
-
WanPublisher
public WanPublisher(@NonNull String targetClusterId, @NonNull String targetEndpoint, int batchSize, long flushIntervalMs, @NonNull WanPublisher.WanTransport transport) -
WanPublisher
public WanPublisher(@NonNull String sourceClusterId, @NonNull String targetClusterId, @NonNull String targetEndpoint, int batchSize, long flushIntervalMs, @NonNull WanPublisher.WanTransport transport)
-
-
Method Details
-
socketTransportFactory
-
tlsCertificateAuthTransportFactory
public static @NonNull WanPublisher.WanTransportFactory tlsCertificateAuthTransportFactory(@NonNull TlsConfig tlsConfig) -
start
public void start()Start the background flush thread. -
stop
public void stop()Stop the publisher and drain remaining events. -
enqueue
Add an event to the pending queue. Blocks if the queue is full until space becomes available — events are never silently dropped.- Parameters:
event- the replication event to enqueue- Throws:
InterruptedException- if the thread is interrupted while waiting for space
-
nextSequence
public long nextSequence()Generate the next sequence number.- Returns:
- a unique monotonically increasing sequence number
-
acknowledge
public void acknowledge(long sequence) Record that the remote cluster has acknowledged events up to this sequence.- Parameters:
sequence- the acknowledged sequence number
-
getPendingCount
public long getPendingCount()- Returns:
- number of events waiting to be sent
-
getLastAckedSequence
public long getLastAckedSequence()- Returns:
- the last acknowledged sequence number
-
getCurrentSequence
public long getCurrentSequence()- Returns:
- current (latest assigned) sequence number
-
getBytesTransferred
public long getBytesTransferred()- Returns:
- total bytes transferred to remote cluster
-
getTotalSendFailures
public long getTotalSendFailures()- Returns:
- total number of failed send attempts
-
getQueueFullBackpressureCount
public long getQueueFullBackpressureCount()- Returns:
- total number of times enqueue blocked due to a full queue
-
getTargetClusterId
- Returns:
- the target cluster identifier
-
getTargetEndpoint
- Returns:
- the target endpoint
-
toSnapshot
Creates a serializable snapshot of the publisher's replication state.Captures sequence counters, transfer metrics, and all pending events so that replication can resume after a node crash without losing queued events or resetting sequence tracking.
- Returns:
- a HashMap suitable for inclusion in a Raft snapshot
-
restoreFromSnapshot
Restores publisher state from a Raft snapshot.Recovers sequence counters, transfer metrics, and pending events. The flush thread is not restarted by this method — call
start()separately after restore.- Parameters:
snapshot- the snapshot data previously produced bytoSnapshot()
-
serializeBatch
Serialize a batch of events to a byte array.- Parameters:
batch- the events to serialize- Returns:
- serialized bytes
-
deserializeBatch
Deserialize a batch of events from a byte array.- Parameters:
data- the serialized bytes (uncompressed)- Returns:
- the deserialized events
-
compress
public static byte @NonNull [] compress(byte @NonNull [] data) GZIP-compress data.- Parameters:
data- the raw data- Returns:
- compressed data
-
decompress
public static byte @NonNull [] decompress(byte @NonNull [] compressed)
-