Class DistributedSet<E>
- Type Parameters:
E- the type of elements in this set
Backed by ConcurrentHashMap.newKeySet() for O(1) add, remove, and
contains operations under high concurrency without external locking.
Features:
- add, remove, contains — core set operations
- addAll — bulk add with count of new elements
- size, isEmpty, clear
- toSet — snapshot as unmodifiable Set
- Entry listeners (onAdded, onRemoved)
- addDirect/removeDirect — replication-safe, no listener notification
Thread safety: all operations are thread-safe via the underlying ConcurrentHashMap key set. Listener dispatch is fail-safe (try-catch per listener).
- Since:
- 1.1
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfacestatic final recordSetStatistics record for capturing aggregate metrics. -
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionDistributedSet(String name, int instanceNumber) Create a DistributedSet with the given name. -
Method Summary
Modifier and TypeMethodDescriptionbooleanAdd an element to the set.intaddAll(Collection<E> elements) Add all elements from the given collection.voidDirect add without listener notification.voidaddSetListener(DistributedSet.SetListener<E> listener) voidvoidclear()booleanCheck if the set contains the given element.difference(DistributedSet<E> other) Return the difference of this set minus another set (elements in this but not other).voidvoidvoidIterate over all elements and apply the given action.voidforEachPage(int pageSize, Consumer<List<E>> action) Iterate the whole set in bounded pages, applyingactionper page.Get a snapshot of current set statistics.intersection(DistributedSet<E> other) Return the intersection of this set with another set (elements in both).booleanisEmpty()booleanisSubsetOf(DistributedSet<E> other) Check if this set is a subset of another set (all elements in this are in other).booleanisSupersetOf(DistributedSet<E> other) Check if this set is a superset of another set (all elements in other are in this).page(int offset, int pageSize) Read a page of elements without materializing the whole set.@Nullable Epop()Remove and return a random member (pop operation).@Nullable EReturn a random member from the set, or null if empty.randomMembers(int count) Return N random members from the set (without replacement).booleanRemove an element from the set.intremoveAll(Collection<E> elements) Remove all elements that are also in the other set.voidremoveDirect(E element) Direct remove without listener notification.voidremoveSetListener(DistributedSet.SetListener<E> listener) intretainAll(Collection<E> elements) Retain only elements that are also in the given collection.Scan the set with cursor-based iteration (Redis-like SCAN).intsize()Unbounded snapshot intended solely for Raft/state-machine persistence.srandmember(int count) Redis-style SRANDMEMBER with count parameter.stream()Return a Stream over the set elements.symmetricDifference(DistributedSet<E> other) Return the symmetric difference (elements in either but not both).Object[]toArray()Return set contents as Object array.toSet()Return an unmodifiable snapshot of the current set contents.toString()union(DistributedSet<E> other) Return the union of this set with another set (all elements in either).
-
Field Details
-
MAX_SNAPSHOT_ITEMS
public static final int MAX_SNAPSHOT_ITEMS- See Also:
-
-
Constructor Details
-
DistributedSet
Create a DistributedSet with the given name.- Parameters:
name- the name of the set (must not be null)instanceNumber- the node instance number- Throws:
NullPointerException- if name is null
-
-
Method Details
-
add
Add an element to the set.- Returns:
- true if the element was added (was not already present)
-
remove
Remove an element from the set.- Returns:
- true if the element was removed (was present)
-
contains
Check if the set contains the given element. -
addAll
Add all elements from the given collection.- Returns:
- the number of new elements added (not already present)
-
size
public int size() -
isEmpty
public boolean isEmpty() -
clear
public void clear() -
toSet
-
union
Return the union of this set with another set (all elements in either). Does NOT modify this set — returns a new snapshot. Takes a snapshot of other.store to ensure consistency against concurrent mutations. -
intersection
Return the intersection of this set with another set (elements in both). Does NOT modify this set — returns a new snapshot. Takes a snapshot of other.store to ensure consistency against concurrent mutations. -
difference
Return the difference of this set minus another set (elements in this but not other). Does NOT modify this set — returns a new snapshot. Takes a snapshot of other.store to ensure consistency against concurrent mutations. -
symmetricDifference
Return the symmetric difference (elements in either but not both). Does NOT modify this set — returns a new snapshot. -
isSubsetOf
Check if this set is a subset of another set (all elements in this are in other).- Returns:
- true if this is a subset of other
-
isSupersetOf
Check if this set is a superset of another set (all elements in other are in this).- Returns:
- true if this is a superset of other
-
randomMember
Return a random member from the set, or null if empty.- Returns:
- a random element, or null if set is empty
-
randomMembers
-
pop
Remove and return a random member (pop operation).- Returns:
- a random element that was removed, or null if set is empty
-
srandmember
Redis-style SRANDMEMBER with count parameter. Positive count: return unique random elements (up to count). Negative count: return count random elements allowing duplicates.- Parameters:
count- positive = unique, negative = allow duplicates- Returns:
- list of random elements
-
removeAll
Remove all elements that are also in the other set.- Returns:
- the number of elements removed
-
retainAll
Retain only elements that are also in the given collection.- Returns:
- the number of elements removed
-
toArray
Return set contents as Object array.- Returns:
- array snapshot of current elements
-
stream
-
snapshotForPersistence
Unbounded snapshot intended solely for Raft/state-machine persistence. BypassesMAX_SNAPSHOT_ITEMSbecause the snapshot path must faithfully capture the full set state regardless of size; user-facing collection views remain capped viatoSet(). -
page
Read a page of elements without materializing the whole set.Pairs with
toSet(),toArray(), andstream()as the paginated alternative when the set grows beyondMAX_SNAPSHOT_ITEMS.Ordering is weakly consistent, not stable. This method walks the underlying
ConcurrentHashMapkey-set iterator, whose order can change between calls as the table resizes or entries are added/removed. Offset-based pagination can therefore skip or duplicate elements across successive calls — callers that need stable, sorted iteration must usescan(long, String, int)for String sets, or snapshot viasnapshotForPersistence()and page the returned immutable view themselves.- Parameters:
offset- starting index (0-based); clamped to sizepageSize- page size (must be positive and ≤MAX_SNAPSHOT_ITEMS)- Returns:
- an immutable list of at most
pageSizeelements
-
forEachPage
Iterate the whole set in bounded pages, applyingactionper page. Arbitrarily large sets can be consumed this way without violatingMAX_SNAPSHOT_ITEMS. Each page is a snapshot taken on the liveConcurrentHashMapiterator — elements added or removed mid-iteration follow CHM's weakly-consistent iterator semantics.- Parameters:
pageSize- page size (must be positive and ≤MAX_SNAPSHOT_ITEMS)action- callback invoked once per non-empty page
-
forEach
-
addDirect
Direct add without listener notification. Used for backup replication. -
removeDirect
Direct remove without listener notification. Used for backup replication. -
scan
Scan the set with cursor-based iteration (Redis-like SCAN).Each invocation snapshots the current set into an immutable sorted list before applying the cursor window. This gives deterministic pagination for that call while retaining Redis-style weak consistency: concurrent writes between SCAN calls may still cause later pages to reflect newer data.
Type contract: this method is wire-protocol-facing and only supports
DistributedSet<String>. Non-String element types are rejected withIllegalStateExceptionrather than silently flattening distinct elements throughtoString()collisions.- Parameters:
cursor- starting position (0 = start from beginning)pattern- optional glob pattern (* = any chars, ? = single char), or null to match allcount- maximum number of elements to return- Returns:
- ScanResult containing next cursor and matched elements
- Throws:
IllegalStateException- if the set contains non-String elements
-
getSetStatistics
Get a snapshot of current set statistics.- Returns:
- SetStatistics record with aggregated stats
-
addSetListener
-
removeSetListener
-
beginBufferedSideEffects
public void beginBufferedSideEffects() -
flushBufferedSideEffects
public void flushBufferedSideEffects() -
discardBufferedSideEffects
public void discardBufferedSideEffects() -
toString
-