Class DistributedTopic<T>

java.lang.Object
com.loomcache.server.topic.DistributedTopic<T>
Type Parameters:
T - The type of message published to this topic

public class DistributedTopic<T> extends Object
A distributed publish-subscribe topic implementation using virtual threads for async dispatch.

Features:

  • Publish/subscribe messaging with message ordering guarantees
  • Reliable delivery: messages can be replayed from a known sequence
  • Ring buffer for message replay with configurable capacity
  • FIFO message ordering per publisher
  • Virtual thread-based async message dispatch
  • Thread-safe with minimal lock contention

Thread safety: Uses ConcurrentHashMap for subscriber storage and ReadWriteLock for critical section protection.

  • Constructor Details

    • DistributedTopic

      public DistributedTopic(String topicName)
      Create a distributed topic with default configuration.
      Parameters:
      topicName - The name of this topic
    • DistributedTopic

      public DistributedTopic(String topicName, TopicConfig config)
      Create a distributed topic with custom configuration.
      Parameters:
      topicName - The name of this topic
      config - Topic configuration
      Throws:
      IllegalArgumentException - if topicName is null or empty
  • Method Details

    • publish

      public void publish(T message)
      Publish a message to all subscribers.

      Messages are assigned a sequence number and dispatched asynchronously to each subscriber using virtual threads. If reliable delivery is enabled, messages are stored in the ring buffer for replay.

      Parameters:
      message - The message to publish
      Throws:
      IllegalArgumentException - if message is null
    • publishBatch

      public void publishBatch(Collection<T> messages)
      Publish multiple messages atomically to all subscribers.

      All messages are assigned sequential sequence numbers and dispatched asynchronously. This is more efficient than calling publish() multiple times.

      Parameters:
      messages - Collection of messages to publish
      Throws:
      IllegalArgumentException - if messages is null or contains null
    • subscribe

      public String subscribe(MessageListener<T> listener)
      Subscribe to messages on this topic.

      The listener will receive all messages published to this topic, starting from the current point. If the listener is a ReliableMessageListener, it can request message replay from a known sequence.

      Parameters:
      listener - The message listener
      Returns:
      A unique subscription ID for this subscription
      Throws:
      IllegalArgumentException - if listener is null
    • unsubscribe

      public void unsubscribe(String subscriptionId)
      Unsubscribe a listener from this topic.
      Parameters:
      subscriptionId - The subscription ID returned by subscribe() or addMessageFilter()
      Throws:
      IllegalArgumentException - if subscriptionId is null or empty
    • getNextSequence

      public long getNextSequence()
      Gets the next sequence number to be assigned.
      Returns:
      the current next sequence value
    • getTopicStats

      public TopicStats getTopicStats()
      Get statistics for this topic including message and subscriber counts.
      Returns:
      TopicStats record with name, message count, subscriber count, and messages per second
    • addMessageFilter

      public String addMessageFilter(Predicate<T> filter, MessageListener<T> listener)
      Add a filtered subscription that only receives messages matching a predicate.

      The listener will only receive messages for which the predicate returns true.

      Parameters:
      filter - The predicate to filter messages
      listener - The message listener
      Returns:
      A unique subscription ID for this filtered subscription
      Throws:
      IllegalArgumentException - if filter or listener is null
    • getMessageCount

      public long getMessageCount()
      Get the total number of messages published to this topic.
      Returns:
      The count of messages published
    • getSubscriberCount

      public int getSubscriberCount()
      Get the number of active subscribers on this topic (both regular and filtered).
      Returns:
      The count of active subscribers
    • getMessagesFrom

      public List<DistributedTopic.MessageWithSequence<T>> getMessagesFrom(long fromSequence)
      Get messages from the buffer for replay starting from a given sequence.

      This is useful for reliable message delivery when a subscriber has fallen behind and needs to catch up.

      Parameters:
      fromSequence - The starting sequence number (inclusive)
      Returns:
      A list of messages with sequence >= fromSequence
    • getHistory

      public List<DistributedTopic.TopicMessage<T>> getHistory(int count)
      Get the last N messages from the message history buffer.
      Parameters:
      count - The number of messages to retrieve
      Returns:
      A list of the last N messages (fewer if buffer has fewer than N messages)
      Throws:
      IllegalArgumentException - if count is negative
    • getHistoryFromSequence

      public List<DistributedTopic.TopicMessage<T>> getHistoryFromSequence(long startSeq, int maxCount)
      Get messages from history starting from a specific sequence number with a maximum count.
      Parameters:
      startSeq - The starting sequence number (inclusive)
      maxCount - The maximum number of messages to return
      Returns:
      A list of messages starting from startSeq
      Throws:
      IllegalArgumentException - if maxCount is negative
    • setHistoryCapacity

      public void setHistoryCapacity(int capacity)
      Set the history buffer capacity.
      Parameters:
      capacity - The maximum number of messages to keep in history
      Throws:
      IllegalArgumentException - if capacity is not positive
    • getBufferSize

      public int getBufferSize()
      Get the current size of the message buffer.
      Returns:
      The number of messages currently in the ring buffer
    • clearBuffer

      public void clearBuffer()
      Clear all messages from the buffer. This is useful for testing or when dropping old messages.