Class DistributedQueue<E>
- Type Parameters:
E- the type of elements held in this queue
Operations:
- offer: add an item to the tail
- poll: remove and return item from the head
- peek: view the head without removing
- size: number of items
- drain: remove all items and return them
Supports item listeners (onItemAdded, onItemRemoved). In a cluster, the queue is owned by a single partition leader and replicated to a backup via the ReplicationManager.
- Since:
- 1.0
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfacestatic final recordQueue statistics record — tracks cumulative and current queue metrics. -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final intstatic final intUnbounded capacity — no limit enforced. -
Constructor Summary
ConstructorsConstructorDescriptionDistributedQueue(String name, int instanceNumber) Create a DistributedQueue with unbounded capacity.DistributedQueue(String name, int instanceNumber, int maxCapacity) Create a DistributedQueue with a specified maximum capacity.DistributedQueue(String name, int instanceNumber, int maxCapacity, long emptyQueueTtlMillis) Create a DistributedQueue with a specified maximum capacity and empty-queue TTL. -
Method Summary
Modifier and TypeMethodDescriptionvoidaddAll(@Nullable Collection<? extends E> items) Bulk add — adds all elements from a collection using put semantics.voidaddItemListener(DistributedQueue.ItemListener<E> listener) voidvoidclear()Clears all elements from the queue.booleanChecks if the queue contains a specific element.voiddrain()Drain all items from the queue.drain(int maxItems) Drain up to maxItems from the queue.intdrainTo(Collection<? super E> target, int maxElements) Drain into a target collection atomically.voidvoidIterate and perform an action on each element without removing.Returns a snapshot of the current queue statistics.@Nullable QueueStore<E> Get the configured backing store, ornullwhen none is wired.booleanisEmpty()booleanisEmptyQueueExpired(long nowMillis) booleanbooleanTimed offer — attempts to insert an element, waiting up to the specified timeout.intofferAll(Collection<? extends E> elements) Bulk enqueue — all-or-nothing.voidofferDirect(E item) Direct offer without listener notification.@Nullable Epeek()peekBatch(int maxItems) Peeks at up to maxItems elements without removing them.@Nullable Epoll()@Nullable ETimed poll — retrieves and removes the head, waiting up to the specified timeout if the queue is empty.pollBatch(int maxItems) Polls up to maxItems elements from the queue.pollN(int count) Bulk dequeue — removes up to N elements atomically from the queue.Peeks at up tocounthead elements and removes them only if the caller accepts that exact snapshot while the queue lock is still held.voidBlocking put — inserts an element into the queue, waiting if necessary for space to become available.intReturns the remaining capacity, or Integer.MAX_VALUE if unbounded.intConditional removal — removes all elements matching the predicate.voidremoveItemListener(DistributedQueue.ItemListener<E> listener) voidsetQueueStore(QueueStore<E> store) Wire a persistent backing store for this queue.intsize()Returns the tracked size (single source of truth for capacity enforcement).Unbounded FIFO snapshot intended solely for Raft/state-machine persistence.take()Blocking take — retrieves and removes the head of the queue, waiting if necessary until an element becomes available.Object[]toArray()Return a snapshot of the queue as an array in FIFO order.<T> T[]toArray(T[] a) Return a snapshot of the queue as a typed array in FIFO order.toList()Returns a non-destructive copy of all items in the queue (FIFO order).voidtruncateTailForRollback(long targetSize) Rollback-only: truncate the queue back down totargetSizefrom the tail without touching the cumulativetotalAdded/totalPolled/peakSizestatistics.
-
Field Details
-
UNBOUNDED
public static final int UNBOUNDEDUnbounded capacity — no limit enforced.- See Also:
-
MAX_SNAPSHOT_ITEMS
public static final int MAX_SNAPSHOT_ITEMS- See Also:
-
-
Constructor Details
-
DistributedQueue
Create a DistributedQueue with unbounded capacity.- Parameters:
name- the queue name (must not be null)instanceNumber- the node instance number- Throws:
NullPointerException- if name is null
-
DistributedQueue
Create a DistributedQueue with a specified maximum capacity.- Parameters:
name- the queue name (must not be null)instanceNumber- the node instance numbermaxCapacity- maximum capacity, or UNBOUNDED for no limit- Throws:
NullPointerException- if name is null
-
DistributedQueue
Create a DistributedQueue with a specified maximum capacity and empty-queue TTL.- Parameters:
name- the queue name (must not be null)instanceNumber- the node instance numbermaxCapacity- maximum capacity, or UNBOUNDED for no limitemptyQueueTtlMillis- milliseconds an empty queue may remain before removal, or -1 to disable- Throws:
NullPointerException- if name is null
-
-
Method Details
-
getTransactionLock
-
setQueueStore
Wire a persistent backing store for this queue.The store must be configured before user items are enqueued. When the in-memory queue is empty, persisted keys are loaded in the store-provided iteration order and new item IDs continue after the largest loaded key.
- Parameters:
store- the backing store- Throws:
IllegalStateException- if a store is already configured or the queue is non-empty
-
getQueueStore
Get the configured backing store, ornullwhen none is wired.- Returns:
- configured queue store, or null
-
offer
-
poll
-
remainingCapacity
public int remainingCapacity()Returns the remaining capacity, or Integer.MAX_VALUE if unbounded. -
peek
-
size
public int size()Returns the tracked size (single source of truth for capacity enforcement). -
isEmpty
public boolean isEmpty() -
toList
-
drain
-
drain
-
offerDirect
Direct offer without listener notification. Used for backup replication. -
offerAll
Bulk enqueue — all-or-nothing. Either every element is enqueued or none are. Operates as a single logical Raft entry for consistency.If the queue is bounded and the full batch would not fit, the call rejects the entire batch and returns
0. Callers that want partial-prefix semantics must loop withoffer(Object)and react to each boolean.- Parameters:
elements- the elements to enqueue- Returns:
- the number of elements successfully added — either
0(rejected) orelements.size()(fully accepted)
-
pollN
-
pollNIf
Peeks at up tocounthead elements and removes them only if the caller accepts that exact snapshot while the queue lock is still held.- Parameters:
count- the maximum number of elements to preview and removepermitRemoval- returns true to remove the previewed items, false to leave the queue unchanged- Returns:
- the removed items when accepted, or null when removal was rejected
-
drainTo
Drain into a target collection atomically. Removes elements and adds them to the target collection.- Parameters:
target- the collection to add drained elements to (must not be null)maxElements- the maximum number of elements to drain (0 = unlimited)- Returns:
- the number of elements drained
- Throws:
NullPointerException- if target is null
-
contains
Checks if the queue contains a specific element. Thread-safe snapshot check.- Parameters:
element- the element to check (may be null)- Returns:
- true if the element exists in the queue, false otherwise
-
pollBatch
-
peekBatch
-
clear
public void clear()Clears all elements from the queue. Thread-safe removal with listener notifications. -
getQueueStats
Returns a snapshot of the current queue statistics.- Returns:
- a QueueStats record with current metrics
-
take
Blocking take — retrieves and removes the head of the queue, waiting if necessary until an element becomes available. Uses a Condition variable for virtual thread safety.- Returns:
- the item at the head of the queue
- Throws:
InterruptedException- if interrupted while waiting
-
put
Blocking put — inserts an element into the queue, waiting if necessary for space to become available. Signals waiting takers.- Parameters:
item- the item to insert- Throws:
InterruptedException- if interrupted while waiting
-
offer
Timed offer — attempts to insert an element, waiting up to the specified timeout.- Parameters:
item- the item to inserttimeout- the maximum time to waitunit- the time unit- Returns:
- true if the item was added, false if timeout expired
- Throws:
InterruptedException- if interrupted while waiting
-
poll
Timed poll — retrieves and removes the head, waiting up to the specified timeout if the queue is empty.- Parameters:
timeout- the maximum time to waitunit- the time unit- Returns:
- the item at the head, or null if timeout expires
- Throws:
InterruptedException- if interrupted while waiting
-
addAll
Bulk add — adds all elements from a collection using put semantics. If the queue is bounded and filling would exceed capacity, blocks until space is available.- Parameters:
items- the elements to add (may be null or empty)- Throws:
InterruptedException- if interrupted while waitingNullPointerException- if any element in items is null
-
removeIf
-
forEach
-
toArray
Return a snapshot of the queue as an array in FIFO order.- Returns:
- an array containing all elements
-
toArray
public <T> T[] toArray(T[] a) Return a snapshot of the queue as a typed array in FIFO order.- Type Parameters:
T- the type of the array- Parameters:
a- the array to populate- Returns:
- the array (same instance or larger)
-
isEmptyQueueExpired
public boolean isEmptyQueueExpired(long nowMillis) -
snapshotForPersistence
Unbounded FIFO snapshot intended solely for Raft/state-machine persistence. BypassesMAX_SNAPSHOT_ITEMSbecause the snapshot path must faithfully capture the full queue state regardless of size; user-facing collection views remain capped viatoList(). -
addItemListener
-
removeItemListener
-
beginBufferedSideEffects
public void beginBufferedSideEffects() -
flushBufferedSideEffects
public void flushBufferedSideEffects() -
discardBufferedSideEffects
public void discardBufferedSideEffects() -
truncateTailForRollback
public void truncateTailForRollback(long targetSize) Rollback-only: truncate the queue back down totargetSizefrom the tail without touching the cumulativetotalAdded/totalPolled/peakSizestatistics. The batch atomicLock guarantees no concurrent mutations interleave.ConcurrentLinkedQueue has no native tail removal, so we snapshot and rebuild the underlying queue directly (bypassing
poll()/offer(Object)so stats counters stay at their pre-batch values). This preserves observational atomicity: a rolled-back batch leaves the queue identical in size AND stats.- Parameters:
targetSize- the pre-offer size to restore
-