Class ReliableTopic<T>
- Type Parameters:
T- the type of messages
- All Implemented Interfaces:
AutoCloseable
Unlike DistributedTopic which is fire-and-forget, ReliableTopic
stores published messages in a fixed-capacity DistributedRingbuffer,
assigns each a monotonic sequence number, and supports replay from any stored
sequence. When the ringbuffer is full, the oldest messages are overwritten.
Features:
- Ringbuffer-backed message storage with configurable capacity
- Monotonic sequence numbers for total message ordering
- Message replay from any stored sequence via
readFrom(long, int) - Reliable subscribers with automatic replay on registration
- Loss-tolerant and loss-intolerant subscriber modes
- Statistics: published, delivered, failures, delivery latency
Thread safety: Publish acquires a write lock; reads and listener dispatch use the read lock. The underlying ringbuffer provides its own lock for internal array operations.
- Since:
- 1.1
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfaceListener for receiving messages from a ReliableTopic.static final recordstatic interfaceReliable message listener with sequence tracking and replay support.static final recordA message stored in the ReliableTopic's ringbuffer.static enumPolicy applied when a publish arrives while the retained ringbuffer window is full.static final recordSnapshot of ReliableTopic statistics. -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final intDefault ringbuffer capacity when not specified. -
Constructor Summary
ConstructorsConstructorDescriptionReliableTopic(String name, int instanceNumber) ReliableTopic(String name, int instanceNumber, int capacity) Create a ReliableTopic with the specified ringbuffer capacity. -
Method Summary
Modifier and TypeMethodDescriptionaddMessageListener(ReliableTopic.MessageListener<T> listener) Add a message listener that receives new messages published after registration.Add a reliable message listener with replay support.voidclose()Stop delivery workers and fail queued subscriber deliveries.intThe fixed capacity of the backing ringbuffer.longThe sequence of the oldest message still stored in the ringbuffer.longGet a snapshot of topic statistics.intThe number of messages currently stored.longThe sequence that will be assigned to the next published message.longlonglongPublish a message to the topic.longPublish a message with a publisher identifier.longpublish(T message, @Nullable String publisherId, ReliableTopic.TopicOverloadPolicy policy) readAll()Read all messages currently stored in the ringbuffer.readFrom(long fromSequence, int maxCount) Read messages starting from the given sequence (inclusive).readFromResult(long fromSequence, int maxCount) booleanremoveMessageListener(String subscriptionId) Remove a subscription by ID.longStore a message in the ringbuffer without dispatching to subscribers.longstoreOnly(T message, @Nullable String publisherId, ReliableTopic.TopicOverloadPolicy policy) intThe number of active subscribers.
-
Field Details
-
DEFAULT_CAPACITY
public static final int DEFAULT_CAPACITYDefault ringbuffer capacity when not specified.- See Also:
-
-
Constructor Details
-
ReliableTopic
-
ReliableTopic
Create a ReliableTopic with the specified ringbuffer capacity.- Parameters:
name- the topic name (must not be blank)instanceNumber- the node instance number for loggingcapacity- the maximum number of messages retained (must be > 0)
-
-
Method Details
-
publish
Publish a message to the topic.- Parameters:
message- the message payload (must not be null)- Returns:
- the assigned sequence number
-
publish
-
publish
public long publish(T message, @Nullable String publisherId, ReliableTopic.TopicOverloadPolicy policy) -
storeOnly
Store a message in the ringbuffer without dispatching to subscribers.Used by Raft followers during state machine apply: the message must be stored (to keep the ringbuffer consistent across replicas) but subscribers must NOT be notified — only the leader notifies, preventing duplicate delivery across the cluster.
- Parameters:
message- the message payload (must not be null)publisherId- optional publisher identifier for tracing- Returns:
- the assigned sequence number
-
storeOnly
public long storeOnly(T message, @Nullable String publisherId, ReliableTopic.TopicOverloadPolicy policy) -
readFrom
Read messages starting from the given sequence (inclusive).- Parameters:
fromSequence- the sequence to start reading from (inclusive)maxCount- maximum number of messages to return- Returns:
- ordered list of messages; empty if sequence is out of stored range
-
readFromResult
-
readAll
Read all messages currently stored in the ringbuffer.- Returns:
- ordered list of all stored messages (oldest to newest)
-
addMessageListener
Add a message listener that receives new messages published after registration.- Parameters:
listener- the listener to register- Returns:
- subscription ID for use with
removeMessageListener(String)
-
addReliableMessageListener
Add a reliable message listener with replay support.If
ReliableTopic.ReliableMessageListener.retrieveInitialSequence()returns a non-negative value, stored messages from that sequence onward are snapshotted under the write lock and replayed from the local snapshot after registration.- Parameters:
listener- the reliable listener to register- Returns:
- subscription ID for use with
removeMessageListener(String)
-
removeMessageListener
Remove a subscription by ID.- Parameters:
subscriptionId- the ID returned byaddMessageListener- Returns:
- true if the subscription was found and removed
-
getHeadSequence
public long getHeadSequence()The sequence of the oldest message still stored in the ringbuffer. -
getTailSequence
public long getTailSequence()The sequence that will be assigned to the next published message. -
getCapacity
public int getCapacity()The fixed capacity of the backing ringbuffer. -
getStoredMessageCount
public int getStoredMessageCount()The number of messages currently stored. -
subscriberCount
public int subscriberCount()The number of active subscribers. -
getTotalPublished
public long getTotalPublished() -
getTotalDelivered
public long getTotalDelivered() -
getListenerFailures
public long getListenerFailures() -
getStatistics
Get a snapshot of topic statistics. -
close
public void close()Stop delivery workers and fail queued subscriber deliveries. Data retained in the ringbuffer remains readable until the instance is discarded.- Specified by:
closein interfaceAutoCloseable
-