Class ReliableTopic<T>

java.lang.Object
com.loomcache.server.datastructures.ReliableTopic<T>
Type Parameters:
T - the type of messages
All Implemented Interfaces:
AutoCloseable

public class ReliableTopic<T> extends Object implements AutoCloseable
Reliable Topic — Ringbuffer-backed pub/sub with message replay.

Unlike DistributedTopic which is fire-and-forget, ReliableTopic stores published messages in a fixed-capacity DistributedRingbuffer, assigns each a monotonic sequence number, and supports replay from any stored sequence. When the ringbuffer is full, the oldest messages are overwritten.

Features:

  • Ringbuffer-backed message storage with configurable capacity
  • Monotonic sequence numbers for total message ordering
  • Message replay from any stored sequence via readFrom(long, int)
  • Reliable subscribers with automatic replay on registration
  • Loss-tolerant and loss-intolerant subscriber modes
  • Statistics: published, delivered, failures, delivery latency

Thread safety: Publish acquires a write lock; reads and listener dispatch use the read lock. The underlying ringbuffer provides its own lock for internal array operations.

Since:
1.1
  • Field Details

    • DEFAULT_CAPACITY

      public static final int DEFAULT_CAPACITY
      Default ringbuffer capacity when not specified.
      See Also:
  • Constructor Details

    • ReliableTopic

      public ReliableTopic(String name, int instanceNumber)
      Create a ReliableTopic with the default capacity (10000<T>).
      Parameters:
      name - the topic name (must not be blank)
      instanceNumber - the node instance number for logging
    • ReliableTopic

      public ReliableTopic(String name, int instanceNumber, int capacity)
      Create a ReliableTopic with the specified ringbuffer capacity.
      Parameters:
      name - the topic name (must not be blank)
      instanceNumber - the node instance number for logging
      capacity - the maximum number of messages retained (must be > 0)
  • Method Details

    • publish

      public long publish(T message)
      Publish a message to the topic.
      Parameters:
      message - the message payload (must not be null)
      Returns:
      the assigned sequence number
    • publish

      public long publish(T message, @Nullable String publisherId)
      Publish a message with a publisher identifier.
      Parameters:
      message - the message payload (must not be null)
      publisherId - optional publisher identifier for tracing
      Returns:
      the assigned sequence number
    • publish

      public long publish(T message, @Nullable String publisherId, ReliableTopic.TopicOverloadPolicy policy)
    • storeOnly

      public long storeOnly(T message, @Nullable String publisherId)
      Store a message in the ringbuffer without dispatching to subscribers.

      Used by Raft followers during state machine apply: the message must be stored (to keep the ringbuffer consistent across replicas) but subscribers must NOT be notified — only the leader notifies, preventing duplicate delivery across the cluster.

      Parameters:
      message - the message payload (must not be null)
      publisherId - optional publisher identifier for tracing
      Returns:
      the assigned sequence number
    • storeOnly

      public long storeOnly(T message, @Nullable String publisherId, ReliableTopic.TopicOverloadPolicy policy)
    • readFrom

      public List<ReliableTopic.TopicMessage<T>> readFrom(long fromSequence, int maxCount)
      Read messages starting from the given sequence (inclusive).
      Parameters:
      fromSequence - the sequence to start reading from (inclusive)
      maxCount - maximum number of messages to return
      Returns:
      ordered list of messages; empty if sequence is out of stored range
    • readFromResult

      public ReliableTopic.ReadResult<T> readFromResult(long fromSequence, int maxCount)
    • readAll

      public List<ReliableTopic.TopicMessage<T>> readAll()
      Read all messages currently stored in the ringbuffer.
      Returns:
      ordered list of all stored messages (oldest to newest)
    • addMessageListener

      public String addMessageListener(ReliableTopic.MessageListener<T> listener)
      Add a message listener that receives new messages published after registration.
      Parameters:
      listener - the listener to register
      Returns:
      subscription ID for use with removeMessageListener(String)
    • addReliableMessageListener

      public String addReliableMessageListener(ReliableTopic.ReliableMessageListener<T> listener)
      Add a reliable message listener with replay support.

      If ReliableTopic.ReliableMessageListener.retrieveInitialSequence() returns a non-negative value, stored messages from that sequence onward are snapshotted under the write lock and replayed from the local snapshot after registration.

      Parameters:
      listener - the reliable listener to register
      Returns:
      subscription ID for use with removeMessageListener(String)
    • removeMessageListener

      public boolean removeMessageListener(String subscriptionId)
      Remove a subscription by ID.
      Parameters:
      subscriptionId - the ID returned by addMessageListener
      Returns:
      true if the subscription was found and removed
    • getHeadSequence

      public long getHeadSequence()
      The sequence of the oldest message still stored in the ringbuffer.
    • getTailSequence

      public long getTailSequence()
      The sequence that will be assigned to the next published message.
    • getCapacity

      public int getCapacity()
      The fixed capacity of the backing ringbuffer.
    • getStoredMessageCount

      public int getStoredMessageCount()
      The number of messages currently stored.
    • subscriberCount

      public int subscriberCount()
      The number of active subscribers.
    • getTotalPublished

      public long getTotalPublished()
    • getTotalDelivered

      public long getTotalDelivered()
    • getListenerFailures

      public long getListenerFailures()
    • getStatistics

      public ReliableTopic.TopicStatistics getStatistics()
      Get a snapshot of topic statistics.
    • close

      public void close()
      Stop delivery workers and fail queued subscriber deliveries. Data retained in the ringbuffer remains readable until the instance is discarded.
      Specified by:
      close in interface AutoCloseable