Class DistributedMap<K,V>
- All Implemented Interfaces:
Snapshotable
Each node holds entries for keys it owns (primary or backup). Writes are replicated synchronously to the backup before ack.
Features:
- get, put, delete, containsKey, size, keys, values, clear
- putIfAbsent, replace
- getAll, putAll (batch operations)
- put with TTL (time-to-live) — entries auto-expire after a duration
- Map change listeners (add/update/remove events)
- Integrates with ReplicationManager for primary-backup sync
- Integrates with CacheStore for write-through persistence
- Configurable via DistributedMapConfig (max-size, eviction, metrics)
- LRU eviction when max-size is reached
- Cache metrics: hit/miss ratio, operation counts, eviction counts
- Atomic compare-and-set (CAS) for safe concurrent mutations
Thread safety: ConcurrentHashMap as backing store. TTL expiration is lazy (checked on access) to avoid background-thread overhead. LRU tracking uses ConcurrentLinkedDeque for O(1) append and concurrent access.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic final recordstatic interfacestatic final recordStatistics snapshot of map operations and performance.static interfacestatic final recordPer-key metadata describing which node/time last wrote the value.Nested classes/interfaces inherited from interface Snapshotable
Snapshotable.SnapshotCapture -
Constructor Summary
ConstructorsConstructorDescriptionDistributedMap(String name, int instanceNumber) Create a DistributedMap with the given name and default configuration.DistributedMap(String name, int instanceNumber, DistributedMapConfig config) Create a DistributedMap with the given name and custom configuration. -
Method Summary
Modifier and TypeMethodDescriptionvoidaddInterceptor(MapInterceptor interceptor) Register a map interceptor.<R> Raggregate(Aggregator<K, V, R> aggregator) Aggregate all visible entries using a single aggregator instance.<R> Raggregate(Supplier<? extends Aggregator<K, V, R>> aggregatorFactory) Aggregate all visible entries with two-stage map-reduce combine semantics.<R> Raggregate(Supplier<? extends Aggregator<K, V, R>> aggregatorFactory, MapPredicate<K, V> predicate) Aggregate visible entries matchingpredicatewith two-stage map-reduce combine semantics.voidassertProductionMutationAllowed(K key, String operation) Preflight a key before multi-entry mutations start applying partial state.voidCapture a delta snapshot and atomically advance dirty tracking for the captured state.Capture the exact current state for a single key so callers can restore it without advancing revisions or replaying user-visible side effects.Capture a full snapshot and atomically advance dirty tracking for the captured state.voidclear()voidClear the dirty-tracking state after a delta snapshot has been persisted.voidclearWriteMetadata(K key) Remove per-entry metadata forkey.booleancompareAndDelete(K key, V expectedValue) Atomically removeskeyonly when the current value equalsexpectedValue.booleancompareAndSet(K key, V expectedValue, V newValue) Atomically sets the value forkeytonewValueonly if the current value equalsexpectedValue.@Nullable VAtomically compute a new value forkeyusing the given function.@Nullable VcomputeIfAbsent(K key, BiFunction<? super K, ? super @Nullable V, ? extends V> mappingFunction) Compute and insert only if the key is absent.@Nullable VcomputeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) Compute and update only if the key is present.booleancontainsKey(K key) @Nullable VvoiddeleteDirect(K key) Direct delete without write-through or listener notification.voidderegisterMapListener(DistributedMap.MapChangeListener<K, V> listener) voidGet all entries with keys matching a given prefix.entrySet()Returns an unmodifiable view of all entries in this map.entrySet(MapPredicate<K, V> predicate) Returns an unmodifiable filtered view of visible entries.intBulk eviction of all expired entries.booleanevictIfUnchanged(K key, V expectedValue, @Nullable Long expectedExpiryNanos) Evict a visible entry only if it still matches the value/expiry observed by an eviction scan.executeOnAll(CacheEntryProcessor<K, V, @NonNull R> processor) Execute a processor on all entries in the map in parallel using virtual threads.<R extends @Nullable Object>
@Nullable RexecuteWithEntryLock(K key, long timeoutMillis, Supplier<@Nullable R> action) Execute entry-processor work while holding the map-owned per-key processor gate and the transaction lock that serializes map mutations.voidfireEntryMerged(K key, @Nullable V oldValue, V newValue) Fire a MERGED event — used by WAN replication when two divergent histories are reconciled and a single canonical value is chosen.voidvoidForce a synchronous flush of the write-behind queue.@Nullable VBatch get — returns a map of key→value for all keys that exist and are not expired.longGet the current memory usage in bytes (estimated).longGet the default TTL setting.longReturn the local monotonic expiry deadline for migration verification.longgetExpirationTimeMillis(K key) longgetHitCount(K key) longgetLastAccessTimeMillis(K key) @Nullable StringGet a snapshot of current map statistics.Get the wiredMapStore, ornullif none has been configured.@Nullable MapStoreConfigGet the activeMapStoreConfig, ornullif no store is wired.getOrDefault(K key, V defaultValue) Get the value for a key, or return the default value if the key is absent or expired.longgetRemainingTtlMillis(K key) Return the remaining TTL for a key in milliseconds.longgetRemainingTtlSeconds(K key) Return the remaining TTL for a key in seconds.longgetTimeToLive(K key) Get the remaining time-to-live for a key.@Nullable VgetWithTTL(K key) Get a value only if it exists and has not expired.@Nullable DistributedMap.WriteMetadatagetWriteMetadata(K key) booleanWhether any data has changed since the last snapshot.booleanisEmpty()keysByPattern(String pattern) Get all keys matching a given glob/regex pattern.keySet()keySet(MapPredicate<K, V> predicate) voidmaybeAdvanceWriteMetadata(K key, long timestampMillis, String writerNodeId) Advance the recorded timestamp forkeyIFF the incoming tuple is strictly newer.@Nullable VMerge a value into the map for the given key.voidoverrideExpirationTimeMillis(K key, long expirationTimeMillis) Override TTL metadata after a WAN merge winner is applied.voidoverrideHitCount(K key, long hitCount) Override read-hit metadata after a WAN merge winner is applied.voidoverrideLastAccessTimeMillis(K key, long accessTimeMillis) Override access-time metadata after a WAN merge winner is applied.voidoverrideWriteMetadata(K key, long timestampMillis, String writerNodeId) Overwrite the metadata forkeyafter a WAN merge has selected a remote-sourced winner.processEntries(Set<K> keys, CacheEntryProcessor<K, V, @NonNull R> processor) Execute a processor on multiple keys in parallel using virtual threads.<R extends @Nullable Object>
@Nullable RprocessEntry(K key, CacheEntryProcessor<K, V, @NonNull R> processor) Execute a processor on a single key atomically.@Nullable V@Nullable VPut a key-value pair with a time-to-live.voidvoidDirect put without write-through or listener notification.voidputForMigration(K key, V value) Write a migrated entry into this map without following write redirects or firing side effects.voidputForMigration(K key, V value, long expirationTimeMillis) Write a migrated entry into this map while preserving an absolute expiration timestamp captured on the source node.booleanputForMigrationIfAbsent(K key, V value) Write a migrated entry only when the target has not already accepted a newer local/redirected write for the key.booleanputForMigrationIfAbsent(K key, V value, long expirationTimeMillis) Write a migrated entry only when absent while preserving the absolute expiration timestamp captured on the source node.booleanputForMigrationIfValueMatches(K key, @Nullable V expectedCurrentValue, V value, long expirationTimeMillis) Write a migrated entry only when the target still contains the value that an earlier migration pass copied.@Nullable VputForWanMerge(K key, V newValue) WAN-merge put: stores the value and firesfireEntryMerged(K, V, V)instead of ADDED/UPDATED.@Nullable VputIfAbsent(K key, V value) @Nullable VputIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) Put only if absent, with TTL.@Nullable VputWithExpirationTimeMillis(K key, V value, long expirationTimeMillis) Put a key-value pair with an absolute wall-clock expiration.@Nullable VputWithTTL(K key, V value, long ttlMillis) Store a key-value pair with a TTL (Time-To-Live) in milliseconds.voidRecord metadata for a local write.booleanrefreshTTL(K key, long newTtlMillis) Refresh the TTL for an existing key without changing its value.voidRegister a listener that fires only for events originating on THIS node.voidregisterLocalMapListener(DistributedMap.MapChangeListener<K, V> listener, EntryPredicate<K, V> predicate) Register a local-only listener filtered by a predicate.voidregisterMapListener(DistributedMap.MapChangeListener<K, V> listener) voidregisterMapListener(DistributedMap.MapChangeListener<K, V> listener, EntryPredicate<K, V> predicate) intremoveAll(Collection<K> keys) Remove all entries for the given keys.booleanremoveForMigrationIfValueMatches(K key, @Nullable V expectedValue) Remove a migrated source entry only if it still contains the value captured in the migration snapshot.booleanremoveInterceptor(MapInterceptor interceptor) Remove a previously registered map interceptor.intremoveKeysForMigration(Collection<K> keys) Remove source-side copies during partition migration without emitting delete side effects.booleanRemove the TTL from a key (persist it).@Nullable VbooleanReplace a value only if the key exists and the old value matches.booleanreplaceIfPresent(K key, V oldValue, V newValue) Replace a value only if the key exists and the old value matches.voidrestoreEntryRollbackState(K key, DistributedMap.EntryRollbackState<V> state) Restore a previously captured entry state without incrementing revisions, invoking write-through callbacks, or notifying listeners.voidrestoreSnapshot(byte[] data) Restore this data structure from a previously serialized state.voidrollbackDirtyState(@Nullable Object dirtyStateToken) Restore dirty-tracking state that was detached during snapshot capture.voidRun a mutation without invoking configured MapStore write-through/write-behind callbacks.Scan the map with cursor-based iteration (Redis-like SCAN).booleanscanVisibleEntries(int maxVisibleEntries, DistributedMap.VisibleEntryVisitor<K, V> visitor) Streams visible entries without materializing a fullentrySet()snapshot.voidsetDefaultTTL(long ttlMillis) Set a default TTL for all new entries added via put() and putIfAbsent().voidsetDeleteThrough(@Nullable Consumer<K> deleteThrough) voidsetKryoSerializer(KryoSerializer serializer) Wire the Kryo serializer used for map snapshots.voidSet the node-local writer identifier used when recording new write metadata.voidsetMapStore(MapStore<K, V> store, MapStoreConfig config) Wire a pluggableMapStoreas the backing store for this map.voidsetMaxMemoryBytes(long maxBytes) Set the maximum memory limit for this map.voidsetWriteThrough(@Nullable BiConsumer<K, V> writeThrough) voidshutdown()Shutdown the distributed map and release TTL cleanup resources.intsize()Get a unique identifier for this data structure.voidStop the background TTL cleanup thread.booleanWhether this data structure supports delta (incremental) snapshots.byte[]Serialize only the changed state since the last snapshot into a byte array.byte[]Serialize the current state of this data structure into a byte array.toString()intReturn the number of entries with TTL set (for diagnostics).booleanUpdate the TTL of an existing entry without changing its value.values()values(MapPredicate<K, V> predicate)
-
Constructor Details
-
DistributedMap
Create a DistributedMap with the given name and default configuration.- Parameters:
name- the map name (must not be null)instanceNumber- the node instance number- Throws:
NullPointerException- if name is null
-
DistributedMap
Create a DistributedMap with the given name and custom configuration.- Parameters:
name- the map name (must not be null)instanceNumber- the node instance numberconfig- the map configuration (must not be null)- Throws:
NullPointerException- if name or config is null
-
-
Method Details
-
putForMigration
-
putForMigration
-
putForMigrationIfAbsent
-
putForMigrationIfAbsent
-
putForMigrationIfValueMatches
public boolean putForMigrationIfValueMatches(K key, @Nullable V expectedCurrentValue, V value, long expirationTimeMillis) Write a migrated entry only when the target still contains the value that an earlier migration pass copied. This lets a rebalance sweep repair source writes that raced the first copy without overwriting a newer write already redirected to the target partition. -
removeKeysForMigration
Remove source-side copies during partition migration without emitting delete side effects. -
removeForMigrationIfValueMatches
Remove a migrated source entry only if it still contains the value captured in the migration snapshot. This keeps cleanup from deleting a newer write that arrived before the ownership cutover fully quiesced on this node. -
executeWithEntryLock
-
get
-
put
-
put
-
putWithExpirationTimeMillis
-
putIfAbsent
-
putIfAbsent
-
updateTtl
-
replace
-
replace
Replace a value only if the key exists and the old value matches. This is a CAS (compare-and-swap) variant of replace.- Parameters:
key- the key to replaceoldValue- the expected current valuenewValue- the new value to set- Returns:
- true if the replacement was successful, false if the key doesn't exist or value doesn't match
-
delete
-
evictIfUnchanged
Evict a visible entry only if it still matches the value/expiry observed by an eviction scan.Eviction is distinct from delete: it must not invoke delete-through MapStore callbacks, remove interceptors, or removed-entry listeners. It records eviction metrics, marks snapshot state dirty, clears local metadata, and emits an eviction event.
-
containsKey
-
size
public int size() -
isEmpty
public boolean isEmpty() -
keySet
-
keySet
-
values
-
values
-
entrySet
-
entrySet
Returns an unmodifiable filtered view of visible entries. If the predicate is aPagingPredicate, the predicate's page state and comparator determine the returned slice. -
scanVisibleEntries
public boolean scanVisibleEntries(int maxVisibleEntries, DistributedMap.VisibleEntryVisitor<K, V> visitor) Streams visible entries without materializing a fullentrySet()snapshot.- Returns:
truewhen all visible entries were visited or the visitor stopped early,falsewhenmaxVisibleEntrieswas reached before the map was exhausted
-
getAll
-
putAll
-
clear
public void clear() -
scan
Scan the map with cursor-based iteration (Redis-like SCAN).- Parameters:
cursor- starting position (0 = start from beginning)pattern- optional glob pattern (* = any chars, ? = single char), null = match allcount- maximum number of keys to return- Returns:
- ScanResult containing next cursor and matched keys
-
shutdown
public void shutdown()Shutdown the distributed map and release TTL cleanup resources. Call this when the map is no longer needed to release resources.When a write-behind MapStore is wired, pending writes are flushed synchronously before the scheduler is stopped so no buffered entries are lost on orderly shutdown.
-
putDirect
-
runWithoutMapStorePersistence
Run a mutation without invoking configured MapStore write-through/write-behind callbacks.Used by WAN replication when
persist-wan-replicated-data=false: the target map still changes in memory and emits normal listener events, but remote WAN traffic does not update the external backing store. -
setLocalWriterId
Set the node-local writer identifier used when recording new write metadata. Typically called once at map construction withclusterId + ":" + nodeId.- Parameters:
id- deterministic writer identifier (non-null)
-
getLocalWriterId
- Returns:
- the node-local writer identifier, or
nullif not yet set
-
getWriteMetadata
- Returns:
- the write metadata for
key, ornullif the key has no recorded metadata (either unwritten locally or cleared)
-
getHitCount
- Returns:
- number of successful reads recorded for
key
-
overrideHitCount
Override read-hit metadata after a WAN merge winner is applied.- Parameters:
key- key whose hit count should be replacedhitCount- non-negative hit count from the winning entry
-
getLastAccessTimeMillis
- Returns:
- epoch millis of the last successful read recorded for
key, or 0 if never accessed
-
overrideLastAccessTimeMillis
Override access-time metadata after a WAN merge winner is applied.- Parameters:
key- key whose access time should be replacedaccessTimeMillis- non-negative epoch millis from the winning entry
-
getExpirationTimeMillis
- Returns:
- epoch millis when
keyexpires,Long.MAX_VALUEwhen it does not expire, or 0 when the key is absent or already expired
-
getExpirationDeadlineNanos
Return the local monotonic expiry deadline for migration verification.Long.MAX_VALUEmeans the entry has no TTL, and0means the entry is absent or already expired. This value is only stable on the source node and must not be used as the target-side expiry time. -
overrideExpirationTimeMillis
Override TTL metadata after a WAN merge winner is applied.- Parameters:
key- key whose expiration should be replacedexpirationTimeMillis- epoch millis when the key expires,Long.MAX_VALUEfor no TTL, or 0 to mark it expired
-
recordLocalWriteMetadata
Record metadata for a local write. Called byputInternal(K, V, Long)and related write paths AFTER the store update succeeds. UsesSystem.currentTimeMillis()as the write timestamp and the node-local writer id set viasetLocalWriterId(String).No-op if
localWriterIdhas not been set — tests and pre- wired registries pay no cost. -
overrideWriteMetadata
-
maybeAdvanceWriteMetadata
Advance the recorded timestamp forkeyIFF the incoming tuple is strictly newer. Called by WanConsumer on the no-op case — chose local value — so subsequent merge decisions see the most recent "last observed write" across WAN. -
clearWriteMetadata
Remove per-entry metadata forkey. Called from local and WAN delete paths to keep metadata maps bounded alongside the store. -
putForWanMerge
WAN-merge put: stores the value and firesfireEntryMerged(K, V, V)instead of ADDED/UPDATED. Used byWanConsumerwhen aMergePolicyresolves divergent cross-cluster histories to a canonical value.The MERGED event signals to listeners that the new value came from cross-cluster conflict resolution, not from a local write.
- Parameters:
key- the keynewValue- the merged value (not null)- Returns:
- the previous value, or
nullif absent
-
deleteDirect
Direct delete without write-through or listener notification. Used for backup replication. -
compareAndSet
Atomically sets the value forkeytonewValueonly if the current value equalsexpectedValue.This is the distributed-cache equivalent of
ConcurrentHashMap.replace(K, V, V). Essential for safe concurrent state transitions without distributed locks.- Returns:
- true if the value was replaced, false if the current value didn't match
-
compareAndDelete
-
compute
public @Nullable V compute(K key, BiFunction<? super K, ? super @Nullable V, ? extends @Nullable V> remappingFunction) Atomically compute a new value forkeyusing the given function.The function receives the current value (or null if absent) and returns the new value (or null to remove the entry). The function is called under ConcurrentHashMap's segment lock, ensuring atomicity.
- Returns:
- the new value associated with the key, or null if removed
-
computeIfAbsent
public @Nullable V computeIfAbsent(K key, BiFunction<? super K, ? super @Nullable V, ? extends V> mappingFunction) Compute and insert only if the key is absent. If the key already exists, returns the existing value unchanged. The function is only called if the key is not present.This is the equivalent of
ConcurrentHashMap.computeIfAbsent.- Returns:
- the value associated with the key after operation (either existing or newly computed)
-
computeIfPresent
public @Nullable V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) Compute and update only if the key is present. If the key doesn't exist, returns null and the function is not called. If the function returns null, the entry is removed.This is the equivalent of
ConcurrentHashMap.computeIfPresent.- Returns:
- the new value associated with the key, or null if the key is absent or removed
-
merge
Merge a value into the map for the given key. If the key is absent, the value is inserted directly. If the key is present, the remapper function is called with the current value and the provided value. If the remapper returns null, the entry is removed.- Parameters:
key- the keyvalue- the value to merge inremapper- function to merge values- Returns:
- the final value associated with the key, or null if removed
-
getOrDefault
Get the value for a key, or return the default value if the key is absent or expired. This is the equivalent ofMap.getOrDefault.- Parameters:
key- the keydefaultValue- the value to return if key is absent- Returns:
- the value for the key, or defaultValue if absent
-
removeAll
Remove all entries for the given keys. Entries that don't exist are silently ignored.- Parameters:
keys- the keys to remove- Returns:
- the number of entries actually removed
-
replaceIfPresent
Replace a value only if the key exists and the old value matches. This is likereplace(K, V, V)but with an alternative signature.- Parameters:
key- the keyoldValue- the expected old valuenewValue- the new value to set- Returns:
- true if replaced successfully, false otherwise
-
entries
-
keysByPattern
-
assertProductionMutationAllowed
Preflight a key before multi-entry mutations start applying partial state.Single-key mutation methods call the same production guard internally. Bulk callers use this method to validate every target key first, so a later TTL/max-idle rejection cannot leave earlier keys committed without their invalidations.
-
ttlEntryCount
public int ttlEntryCount()Return the number of entries with TTL set (for diagnostics). -
getRemainingTtlSeconds
Return the remaining TTL for a key in seconds. Returns -1 if the key exists but has no TTL, -2 if the key does not exist. -
getRemainingTtlMillis
Return the remaining TTL for a key in milliseconds. Returns -1 if the key exists but has no TTL, -2 if the key does not exist. -
removeTtl
Remove the TTL from a key (persist it).- Returns:
- true if the key had a TTL that was removed
-
putWithTTL
Store a key-value pair with a TTL (Time-To-Live) in milliseconds. After the TTL expires, reads treat the entry as absent until cleanup removes it.- Parameters:
key- the keyvalue- the valuettlMillis- TTL duration in milliseconds (0 or negative = no TTL)- Returns:
- the previous value associated with the key, or null if there was none
-
getWithTTL
-
refreshTTL
Refresh the TTL for an existing key without changing its value. Equivalent to updateTtl().- Parameters:
key- the keynewTtlMillis- new TTL duration in milliseconds (0 or negative = remove TTL)- Returns:
- true if the key exists and TTL was updated, false if key doesn't exist
-
setDefaultTTL
public void setDefaultTTL(long ttlMillis) Set a default TTL for all new entries added via put() and putIfAbsent(). Entries with explicit TTL (via putWithTTL or put with TimeUnit) are not affected.- Parameters:
ttlMillis- default TTL in milliseconds (0 = disable default TTL)
-
getDefaultTTL
public long getDefaultTTL()Get the default TTL setting.- Returns:
- default TTL in milliseconds, 0 if not set
-
getTimeToLive
Get the remaining time-to-live for a key.- Parameters:
key- the key- Returns:
- remaining TTL in milliseconds, -1 if no TTL is set or key doesn't exist
-
evictExpired
public int evictExpired()Bulk eviction of all expired entries. Scans the entire map and removes entries whose TTL has passed. This is an explicit cleanup operation; regular reads only filter expired entries.- Returns:
- the number of entries evicted
-
getMapStatistics
Get a snapshot of current map statistics. Includes hit/miss counts, operation counts, and eviction counts.- Returns:
- a MapStatistics record with current metrics
-
setWriteThrough
-
setDeleteThrough
-
setMapStore
Wire a pluggableMapStoreas the backing store for this map.Supersedes the legacy single-purpose
setWriteThrough(BiConsumer)/setDeleteThrough(Consumer)pair: internally, this method wires both callbacks to adapters that invokestore.store(k,v)andstore.delete(k). Write-through MapStore calls are fail-close: persistence failures are rethrown and the entry state is restored before callers observe success.Modes:
- writeThrough — synchronous: every local write calls
mapStore.store(key, value)/.delete(key)before emitting listener side effects; failures abort the visible write. - writeBehind — asynchronous: writes are coalesced into a bounded per-key queue and flushed on a scheduled virtual-thread executor. Same-key updates before flush collapse into a single store/delete call.
Load mode:
- DISABLED — no reads are issued to the backing store.
- LAZY — values are fetched from the store on
get()miss and inserted into the map (cache-aside). - EAGER —
MapStore.loadAllKeys()is invoked asynchronously on a virtual thread and each returned key is fetched and inserted.
- Parameters:
store- the backing store (not null)config- wire-up options (not null)- Throws:
IllegalStateException- if a MapStore is already wired
- writeThrough — synchronous: every local write calls
-
getMapStore
-
getMapStoreConfig
Get the activeMapStoreConfig, ornullif no store is wired.- Returns:
- the map-store config, or null
-
flushWriteBehindNow
public void flushWriteBehindNow()Force a synchronous flush of the write-behind queue. Intended for tests and graceful-shutdown paths. -
addInterceptor
Register a map interceptor. Interceptors execute in registration order before and after get/put/remove operations.- Parameters:
interceptor- the interceptor to register (must not be null)
-
removeInterceptor
Remove a previously registered map interceptor.- Parameters:
interceptor- the interceptor to remove (must not be null)- Returns:
- true if the interceptor was found and removed
-
registerMapListener
-
registerMapListener
public void registerMapListener(DistributedMap.MapChangeListener<K, V> listener, EntryPredicate<K, V> predicate) -
registerLocalMapListener
Register a listener that fires only for events originating on THIS node.A "local" event is one produced by a direct call to
put,remove,compute, eviction, or TTL expiration on this instance — i.e. the current thread is the one mutating the store. Events replayed via Raft apply / backup replication (putDirect(K, V)/deleteDirect(K)) are NOT local and do not fire local listeners.Useful when you need to react exactly once per cluster write (on the leader) — cluster-wide listeners fire on every replica, which causes duplicate reactions for side-effectful handlers.
- Parameters:
listener- the listener to register
-
registerLocalMapListener
public void registerLocalMapListener(DistributedMap.MapChangeListener<K, V> listener, EntryPredicate<K, V> predicate) Register a local-only listener filtered by a predicate. SeeregisterLocalMapListener(MapChangeListener)andregisterMapListener(MapChangeListener, EntryPredicate).- Parameters:
listener- the listener to registerpredicate- event filter
-
deregisterMapListener
-
fireEntryMerged
Fire a MERGED event — used by WAN replication when two divergent histories are reconciled and a single canonical value is chosen. Unlike UPDATED, this signals that the new value came from a cross-cluster merge rather than a local write.Fires on cluster-wide listeners only (MERGED is never a local-originated event — it always comes from WAN apply).
- Parameters:
key- the merged key (not null)oldValue- the value before the merge, ornullwhen this node had nonenewValue- the winning value after the merge (not null)
-
beginBufferedSideEffects
public void beginBufferedSideEffects() -
flushBufferedSideEffects
public void flushBufferedSideEffects() -
discardBufferedSideEffects
public void discardBufferedSideEffects() -
captureEntryRollbackState
Capture the exact current state for a single key so callers can restore it without advancing revisions or replaying user-visible side effects. -
restoreEntryRollbackState
Restore a previously captured entry state without incrementing revisions, invoking write-through callbacks, or notifying listeners. -
toString
-
stopCleanup
public void stopCleanup()Stop the background TTL cleanup thread. -
setMaxMemoryBytes
public void setMaxMemoryBytes(long maxBytes) Set the maximum memory limit for this map. -
getCurrentMemoryBytes
public long getCurrentMemoryBytes()Get the current memory usage in bytes (estimated). -
processEntry
public <R extends @Nullable Object> @Nullable R processEntry(K key, CacheEntryProcessor<K, V, @NonNull R> processor) Execute a processor on a single key atomically.The processor has exclusive access to the entry while executing.
- Type Parameters:
R- the result type- Parameters:
key- the key to processprocessor- the processor to apply- Returns:
- the result of processing
-
processEntries
public <R extends @Nullable Object> Map<K,R> processEntries(Set<K> keys, CacheEntryProcessor<K, V, @NonNull R> processor) Execute a processor on multiple keys in parallel using virtual threads.Each key is processed independently and concurrently.
- Type Parameters:
R- the result type- Parameters:
keys- the keys to processprocessor- the processor to apply- Returns:
- a map of keys to processor results
-
executeOnAll
public <R extends @Nullable Object> Map<K,R> executeOnAll(CacheEntryProcessor<K, V, @NonNull R> processor) Execute a processor on all entries in the map in parallel using virtual threads.All entries are processed independently and concurrently.
- Type Parameters:
R- the result type- Parameters:
processor- the processor to apply- Returns:
- a map of all keys to processor results
-
aggregate
Aggregate all visible entries using a single aggregator instance.- Type Parameters:
R- the result type- Parameters:
aggregator- the aggregator to accumulate into- Returns:
- the aggregated result
-
aggregate
Aggregate all visible entries with two-stage map-reduce combine semantics.- Type Parameters:
R- the result type- Parameters:
aggregatorFactory- creates a fresh aggregator for each map stage- Returns:
- the aggregated result
-
aggregate
public <R> R aggregate(Supplier<? extends Aggregator<K, V, R>> aggregatorFactory, MapPredicate<K, V> predicate) Aggregate visible entries matchingpredicatewith two-stage map-reduce combine semantics.- Type Parameters:
R- the result type- Parameters:
aggregatorFactory- creates a fresh aggregator for each map stagepredicate- filters entries before accumulation- Returns:
- the aggregated result
-
takeSnapshot
public byte[] takeSnapshot()Description copied from interface:SnapshotableSerialize the current state of this data structure into a byte array.- Specified by:
takeSnapshotin interfaceSnapshotable- Returns:
- byte array containing the serialized state
-
captureSnapshot
Description copied from interface:SnapshotableCapture a full snapshot and atomically advance dirty tracking for the captured state.Implementations that detach dirty state must return a token so the caller can restore it if snapshot serialization or persistence later fails.
- Specified by:
captureSnapshotin interfaceSnapshotable- Returns:
- captured snapshot data and optional rollback token
-
restoreSnapshot
public void restoreSnapshot(byte[] data) Description copied from interface:SnapshotableRestore this data structure from a previously serialized state.Must restore the state atomically, clearing existing data and populating with the snapshot contents.
- Specified by:
restoreSnapshotin interfaceSnapshotable- Parameters:
data- the serialized state to restore from
-
snapshotId
Description copied from interface:SnapshotableGet a unique identifier for this data structure.Used to identify which data structure a snapshot belongs to in a full state machine snapshot.
- Specified by:
snapshotIdin interfaceSnapshotable- Returns:
- unique identifier for this data structure
-
supportsDeltaSnapshot
public boolean supportsDeltaSnapshot()Description copied from interface:SnapshotableWhether this data structure supports delta (incremental) snapshots.Structures that return
truemust also implementSnapshotable.takeDeltaSnapshot()andSnapshotable.clearDirtyState().- Specified by:
supportsDeltaSnapshotin interfaceSnapshotable- Returns:
- true if delta snapshots are supported
-
takeDeltaSnapshot
public byte[] takeDeltaSnapshot()Description copied from interface:SnapshotableSerialize only the changed state since the last snapshot into a byte array.Only called when
Snapshotable.supportsDeltaSnapshot()returnstrue. The default implementation falls back to a full snapshot.- Specified by:
takeDeltaSnapshotin interfaceSnapshotable- Returns:
- byte array containing only changed state, or empty if no changes
-
captureDeltaSnapshot
Description copied from interface:SnapshotableCapture a delta snapshot and atomically advance dirty tracking for the captured state.Implementations that detach dirty state must return a token so the caller can restore it if snapshot serialization or persistence later fails.
- Specified by:
captureDeltaSnapshotin interfaceSnapshotable- Returns:
- captured delta data and optional rollback token
-
clearDirtyState
public void clearDirtyState()Description copied from interface:SnapshotableClear the dirty-tracking state after a delta snapshot has been persisted.Called after a successful delta snapshot to reset change tracking. Default implementation is a no-op.
- Specified by:
clearDirtyStatein interfaceSnapshotable
-
rollbackDirtyState
Description copied from interface:SnapshotableRestore dirty-tracking state that was detached during snapshot capture.Called only when snapshot persistence fails after a capture completed successfully.
- Specified by:
rollbackDirtyStatein interfaceSnapshotable- Parameters:
dirtyStateToken- implementation-specific token returned by captureSnapshot/captureDeltaSnapshot
-
hasDirtyState
public boolean hasDirtyState()Description copied from interface:SnapshotableWhether any data has changed since the last snapshot.Used by the snapshot scheduler to skip structures with no changes during delta snapshots.
- Specified by:
hasDirtyStatein interfaceSnapshotable- Returns:
- true if changes exist since last clearDirtyState() call
-
setKryoSerializer
Wire the Kryo serializer used for map snapshots. Called byDataStructureRegistryso that snapshot serialization shares the same registered types as the rest of the server.
-