Class DistributedQueue<E>

java.lang.Object
com.loomcache.server.datastructures.DistributedQueue<E>
Type Parameters:
E - the type of elements held in this queue

public class DistributedQueue<E> extends Object
Distributed Queue — FIFO queue backed by ConcurrentLinkedQueue.

Operations:

  • offer: add an item to the tail
  • poll: remove and return item from the head
  • peek: view the head without removing
  • size: number of items
  • drain: remove all items and return them

Supports item listeners (onItemAdded, onItemRemoved). In a cluster, the queue is owned by a single partition leader and replicated to a backup via the ReplicationManager.

Since:
1.0
See Also:
  • Field Details

    • UNBOUNDED

      public static final int UNBOUNDED
      Unbounded capacity — no limit enforced.
      See Also:
    • MAX_SNAPSHOT_ITEMS

      public static final int MAX_SNAPSHOT_ITEMS
      See Also:
  • Constructor Details

    • DistributedQueue

      public DistributedQueue(String name, int instanceNumber)
      Create a DistributedQueue with unbounded capacity.
      Parameters:
      name - the queue name (must not be null)
      instanceNumber - the node instance number
      Throws:
      NullPointerException - if name is null
    • DistributedQueue

      public DistributedQueue(String name, int instanceNumber, int maxCapacity)
      Create a DistributedQueue with a specified maximum capacity.
      Parameters:
      name - the queue name (must not be null)
      instanceNumber - the node instance number
      maxCapacity - maximum capacity, or UNBOUNDED for no limit
      Throws:
      NullPointerException - if name is null
    • DistributedQueue

      public DistributedQueue(String name, int instanceNumber, int maxCapacity, long emptyQueueTtlMillis)
      Create a DistributedQueue with a specified maximum capacity and empty-queue TTL.
      Parameters:
      name - the queue name (must not be null)
      instanceNumber - the node instance number
      maxCapacity - maximum capacity, or UNBOUNDED for no limit
      emptyQueueTtlMillis - milliseconds an empty queue may remain before removal, or -1 to disable
      Throws:
      NullPointerException - if name is null
  • Method Details

    • getTransactionLock

      public ReentrantLock getTransactionLock()
    • setQueueStore

      public void setQueueStore(QueueStore<E> store)
      Wire a persistent backing store for this queue.

      The store must be configured before user items are enqueued. When the in-memory queue is empty, persisted keys are loaded in the store-provided iteration order and new item IDs continue after the largest loaded key.

      Parameters:
      store - the backing store
      Throws:
      IllegalStateException - if a store is already configured or the queue is non-empty
    • getQueueStore

      public @Nullable QueueStore<E> getQueueStore()
      Get the configured backing store, or null when none is wired.
      Returns:
      configured queue store, or null
    • offer

      public boolean offer(E item)
    • poll

      public @Nullable E poll()
    • remainingCapacity

      public int remainingCapacity()
      Returns the remaining capacity, or Integer.MAX_VALUE if unbounded.
    • peek

      public @Nullable E peek()
    • size

      public int size()
      Returns the tracked size (single source of truth for capacity enforcement).
    • isEmpty

      public boolean isEmpty()
    • toList

      public List<E> toList()
      Returns a non-destructive copy of all items in the queue (FIFO order). Does not remove items from the queue. Thread-safe snapshot.
      Returns:
      an unmodifiable list of queue contents
    • drain

      public List<E> drain()
      Drain all items from the queue. Returns the items in FIFO order. Thread-safe: uses updateAndGet to prevent size from going negative under concurrent drain.
    • drain

      public List<E> drain(int maxItems)
      Drain up to maxItems from the queue. Thread-safe: uses updateAndGet to prevent size from going negative under concurrent drain.
    • offerDirect

      public void offerDirect(E item)
      Direct offer without listener notification. Used for backup replication.
    • offerAll

      public int offerAll(Collection<? extends E> elements)
      Bulk enqueue — all-or-nothing. Either every element is enqueued or none are. Operates as a single logical Raft entry for consistency.

      If the queue is bounded and the full batch would not fit, the call rejects the entire batch and returns 0. Callers that want partial-prefix semantics must loop with offer(Object) and react to each boolean.

      Parameters:
      elements - the elements to enqueue
      Returns:
      the number of elements successfully added — either 0 (rejected) or elements.size() (fully accepted)
    • pollN

      public List<E> pollN(int count)
      Bulk dequeue — removes up to N elements atomically from the queue. Returns them in FIFO order.
      Parameters:
      count - the maximum number of elements to poll
      Returns:
      a list of up to count elements
    • pollNIf

      public @Nullable List<E> pollNIf(int count, Predicate<List<E>> permitRemoval)
      Peeks at up to count head elements and removes them only if the caller accepts that exact snapshot while the queue lock is still held.
      Parameters:
      count - the maximum number of elements to preview and remove
      permitRemoval - returns true to remove the previewed items, false to leave the queue unchanged
      Returns:
      the removed items when accepted, or null when removal was rejected
    • drainTo

      public int drainTo(Collection<? super E> target, int maxElements)
      Drain into a target collection atomically. Removes elements and adds them to the target collection.
      Parameters:
      target - the collection to add drained elements to (must not be null)
      maxElements - the maximum number of elements to drain (0 = unlimited)
      Returns:
      the number of elements drained
      Throws:
      NullPointerException - if target is null
    • contains

      public boolean contains(@Nullable E element)
      Checks if the queue contains a specific element. Thread-safe snapshot check.
      Parameters:
      element - the element to check (may be null)
      Returns:
      true if the element exists in the queue, false otherwise
    • pollBatch

      public List<E> pollBatch(int maxItems)
      Polls up to maxItems elements from the queue. Returns them in FIFO order. Thread-safe.
      Parameters:
      maxItems - the maximum number of items to poll
      Returns:
      a list of up to maxItems elements
    • peekBatch

      public List<E> peekBatch(int maxItems)
      Peeks at up to maxItems elements without removing them. Returns them in FIFO order. Thread-safe snapshot.
      Parameters:
      maxItems - the maximum number of items to peek at
      Returns:
      a list of up to maxItems elements
    • clear

      public void clear()
      Clears all elements from the queue. Thread-safe removal with listener notifications.
    • getQueueStats

      public DistributedQueue.QueueStats getQueueStats()
      Returns a snapshot of the current queue statistics.
      Returns:
      a QueueStats record with current metrics
    • take

      public E take() throws InterruptedException
      Blocking take — retrieves and removes the head of the queue, waiting if necessary until an element becomes available. Uses a Condition variable for virtual thread safety.
      Returns:
      the item at the head of the queue
      Throws:
      InterruptedException - if interrupted while waiting
    • put

      public void put(E item) throws InterruptedException
      Blocking put — inserts an element into the queue, waiting if necessary for space to become available. Signals waiting takers.
      Parameters:
      item - the item to insert
      Throws:
      InterruptedException - if interrupted while waiting
    • offer

      public boolean offer(E item, long timeout, TimeUnit unit) throws InterruptedException
      Timed offer — attempts to insert an element, waiting up to the specified timeout.
      Parameters:
      item - the item to insert
      timeout - the maximum time to wait
      unit - the time unit
      Returns:
      true if the item was added, false if timeout expired
      Throws:
      InterruptedException - if interrupted while waiting
    • poll

      public @Nullable E poll(long timeout, TimeUnit unit) throws InterruptedException
      Timed poll — retrieves and removes the head, waiting up to the specified timeout if the queue is empty.
      Parameters:
      timeout - the maximum time to wait
      unit - the time unit
      Returns:
      the item at the head, or null if timeout expires
      Throws:
      InterruptedException - if interrupted while waiting
    • addAll

      public void addAll(@Nullable Collection<? extends E> items) throws InterruptedException
      Bulk add — adds all elements from a collection using put semantics. If the queue is bounded and filling would exceed capacity, blocks until space is available.
      Parameters:
      items - the elements to add (may be null or empty)
      Throws:
      InterruptedException - if interrupted while waiting
      NullPointerException - if any element in items is null
    • removeIf

      public int removeIf(Predicate<? super E> filter)
      Conditional removal — removes all elements matching the predicate. Thread-safe using lock.
      Parameters:
      filter - the predicate to test elements
      Returns:
      the number of elements removed
    • forEach

      public void forEach(Consumer<? super E> action)
      Iterate and perform an action on each element without removing. Thread-safe snapshot iteration.
      Parameters:
      action - the action to perform on each element
    • toArray

      public Object[] toArray()
      Return a snapshot of the queue as an array in FIFO order.
      Returns:
      an array containing all elements
    • toArray

      public <T> T[] toArray(T[] a)
      Return a snapshot of the queue as a typed array in FIFO order.
      Type Parameters:
      T - the type of the array
      Parameters:
      a - the array to populate
      Returns:
      the array (same instance or larger)
    • isEmptyQueueExpired

      public boolean isEmptyQueueExpired(long nowMillis)
    • snapshotForPersistence

      public List<E> snapshotForPersistence()
      Unbounded FIFO snapshot intended solely for Raft/state-machine persistence. Bypasses MAX_SNAPSHOT_ITEMS because the snapshot path must faithfully capture the full queue state regardless of size; user-facing collection views remain capped via toList().
    • addItemListener

      public void addItemListener(DistributedQueue.ItemListener<E> listener)
    • removeItemListener

      public void removeItemListener(DistributedQueue.ItemListener<E> listener)
    • beginBufferedSideEffects

      public void beginBufferedSideEffects()
    • flushBufferedSideEffects

      public void flushBufferedSideEffects()
    • discardBufferedSideEffects

      public void discardBufferedSideEffects()
    • truncateTailForRollback

      public void truncateTailForRollback(long targetSize)
      Rollback-only: truncate the queue back down to targetSize from the tail without touching the cumulative totalAdded/totalPolled/peakSize statistics. The batch atomicLock guarantees no concurrent mutations interleave.

      ConcurrentLinkedQueue has no native tail removal, so we snapshot and rebuild the underlying queue directly (bypassing poll() / offer(Object) so stats counters stay at their pre-batch values). This preserves observational atomicity: a rolled-back batch leaves the queue identical in size AND stats.

      Parameters:
      targetSize - the pre-offer size to restore