Class DistributedTopic<T>
- Type Parameters:
T- The type of message published to this topic
Features:
- Publish/subscribe messaging with message ordering guarantees
- Reliable delivery: messages can be replayed from a known sequence
- Ring buffer for message replay with configurable capacity
- FIFO message ordering per publisher
- Virtual thread-based async message dispatch
- Thread-safe with minimal lock contention
Thread safety: Uses ConcurrentHashMap for subscriber storage and
ReadWriteLock for critical section protection.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic final recordA message paired with its sequence number.static final recordA topic message with sequence number, timestamp, payload and publisher ID. -
Constructor Summary
ConstructorsConstructorDescriptionDistributedTopic(String topicName) Create a distributed topic with default configuration.DistributedTopic(String topicName, TopicConfig config) Create a distributed topic with custom configuration. -
Method Summary
Modifier and TypeMethodDescriptionaddMessageFilter(Predicate<T> filter, MessageListener<T> listener) Add a filtered subscription that only receives messages matching a predicate.voidClear all messages from the buffer.intGet the current size of the message buffer.getHistory(int count) Get the last N messages from the message history buffer.getHistoryFromSequence(long startSeq, int maxCount) Get messages from history starting from a specific sequence number with a maximum count.longGet the total number of messages published to this topic.getMessagesFrom(long fromSequence) Get messages from the buffer for replay starting from a given sequence.longGets the next sequence number to be assigned.intGet the number of active subscribers on this topic (both regular and filtered).Get statistics for this topic including message and subscriber counts.voidPublish a message to all subscribers.voidpublishBatch(Collection<T> messages) Publish multiple messages atomically to all subscribers.voidsetHistoryCapacity(int capacity) Set the history buffer capacity.subscribe(MessageListener<T> listener) Subscribe to messages on this topic.voidunsubscribe(String subscriptionId) Unsubscribe a listener from this topic.
-
Constructor Details
-
DistributedTopic
Create a distributed topic with default configuration.- Parameters:
topicName- The name of this topic
-
DistributedTopic
Create a distributed topic with custom configuration.- Parameters:
topicName- The name of this topicconfig- Topic configuration- Throws:
IllegalArgumentException- if topicName is null or empty
-
-
Method Details
-
publish
Publish a message to all subscribers.Messages are assigned a sequence number and dispatched asynchronously to each subscriber using virtual threads. If reliable delivery is enabled, messages are stored in the ring buffer for replay.
- Parameters:
message- The message to publish- Throws:
IllegalArgumentException- if message is null
-
publishBatch
Publish multiple messages atomically to all subscribers.All messages are assigned sequential sequence numbers and dispatched asynchronously. This is more efficient than calling publish() multiple times.
- Parameters:
messages- Collection of messages to publish- Throws:
IllegalArgumentException- if messages is null or contains null
-
subscribe
Subscribe to messages on this topic.The listener will receive all messages published to this topic, starting from the current point. If the listener is a
ReliableMessageListener, it can request message replay from a known sequence.- Parameters:
listener- The message listener- Returns:
- A unique subscription ID for this subscription
- Throws:
IllegalArgumentException- if listener is null
-
unsubscribe
Unsubscribe a listener from this topic.- Parameters:
subscriptionId- The subscription ID returned by subscribe() or addMessageFilter()- Throws:
IllegalArgumentException- if subscriptionId is null or empty
-
getNextSequence
public long getNextSequence()Gets the next sequence number to be assigned.- Returns:
- the current next sequence value
-
getTopicStats
Get statistics for this topic including message and subscriber counts.- Returns:
- TopicStats record with name, message count, subscriber count, and messages per second
-
addMessageFilter
Add a filtered subscription that only receives messages matching a predicate.The listener will only receive messages for which the predicate returns true.
- Parameters:
filter- The predicate to filter messageslistener- The message listener- Returns:
- A unique subscription ID for this filtered subscription
- Throws:
IllegalArgumentException- if filter or listener is null
-
getMessageCount
public long getMessageCount()Get the total number of messages published to this topic.- Returns:
- The count of messages published
-
getSubscriberCount
public int getSubscriberCount()Get the number of active subscribers on this topic (both regular and filtered).- Returns:
- The count of active subscribers
-
getMessagesFrom
Get messages from the buffer for replay starting from a given sequence.This is useful for reliable message delivery when a subscriber has fallen behind and needs to catch up.
- Parameters:
fromSequence- The starting sequence number (inclusive)- Returns:
- A list of messages with sequence >= fromSequence
-
getHistory
Get the last N messages from the message history buffer.- Parameters:
count- The number of messages to retrieve- Returns:
- A list of the last N messages (fewer if buffer has fewer than N messages)
- Throws:
IllegalArgumentException- if count is negative
-
getHistoryFromSequence
Get messages from history starting from a specific sequence number with a maximum count.- Parameters:
startSeq- The starting sequence number (inclusive)maxCount- The maximum number of messages to return- Returns:
- A list of messages starting from startSeq
- Throws:
IllegalArgumentException- if maxCount is negative
-
setHistoryCapacity
public void setHistoryCapacity(int capacity) Set the history buffer capacity.- Parameters:
capacity- The maximum number of messages to keep in history- Throws:
IllegalArgumentException- if capacity is not positive
-
getBufferSize
public int getBufferSize()Get the current size of the message buffer.- Returns:
- The number of messages currently in the ring buffer
-
clearBuffer
public void clearBuffer()Clear all messages from the buffer. This is useful for testing or when dropping old messages.
-