Class DataStructureRegistry
Provides lazy creation (get-or-create) semantics: requesting a map/queue/topic by name either returns the existing instance or creates a new one.
Thread Safety: Uses ConcurrentHashMap storage with synchronized creation per registry type so instance-cap checks stay atomic across names. Memory limits and listener manager are volatile for visibility across threads.
- Since:
- 1.0
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final intstatic final StringType identifier for continuous query caches.static final StringType identifier for executor services.static final StringType identifier for grow-only set CRDTs.static final StringType identifier for Snowflake ID generators.static final StringType identifier for lists.static final StringType identifier for last-writer-wins register CRDTs.static final StringType identifier for maps — seeDistributedObjectListenerdocs.static final StringType identifier for multimaps.static final StringType identifier for observed-remove set CRDTs.static final StringType identifier for PN-counters.static final StringType identifier for priority queues.static final StringType identifier for queues.static final StringType identifier for reliable topics.static final StringType identifier for ringbuffers.static final StringType identifier for sets.static final StringType identifier for topics. -
Constructor Summary
ConstructorsConstructorDescriptionDataStructureRegistry(int instanceNumber) Create a new data structure registry.DataStructureRegistry(int instanceNumber, int idGeneratorNodeId) DataStructureRegistry(int instanceNumber, int idGeneratorNodeId, KryoSerializer kryoSerializer) DataStructureRegistry(int instanceNumber, KryoSerializer kryoSerializer) Create a new data structure registry. -
Method Summary
Modifier and TypeMethodDescriptionintapplyDeclarativeIndexes(Map<String, List<IndexConfig>> mapIndexes) longbindPNCounterSession(String name, String sessionId) intcqcCount()createContinuousQueryCache(String cqcName, String mapName, MapPredicate<String, String> predicate) Create a continuous query cache on a named map with the given predicate.booleanDestroy a CQC by name: deregisters its listener and removes it from the registry.voidensurePartitionWriteRedirect(int partitionId, int sourceGroup, PartitionRouter router, Function<Integer, DataStructureRegistry> registryLookup) intReturns the number of executor services in the registry.void@Nullable ContinuousQueryCache<String, String> @Nullable DistributedExecutorServicegetExecutor(String name) Get an existing executor service by name, or null if it does not exist.Returns the set of all executor names in the registry.@Nullable SnowflakeIdGeneratorgetIdGenerator(String name) @Nullable DistributedList<String> @Nullable LWWRegister<String> getLWWRegister(String name) @Nullable DistributedMap<String, String> Returns the set of all map names in the registry.@Nullable DistributedMultiMapgetMultiMap(String name) getOrCreateExecutor(String name) Get or create a distributed executor service with the given name.getOrCreateGSet(String name) getOrCreateIdGenerator(String name) getOrCreateList(String name) Get or create a distributed list with the default CP (linearizable) consistency mode.getOrCreateList(String name, DistributedList.ConsistencyMode mode) Get or create a distributed list with the specified consistency mode.getOrCreateLWWRegister(String name) getOrCreateMap(String name) getOrCreateMultiMap(String name) getOrCreateORSet(String name) getOrCreatePNCounter(String name) getOrCreateQueue(String name) Get or create a reliable topic with default capacity.getOrCreateReliableTopic(String name, int capacity) Get or create a reliable topic with specified ringbuffer capacity.getOrCreateRingbuffer(String name, int capacity) getOrCreateSet(String name) getOrCreateTopic(String name) @Nullable PNCountergetPNCounter(String name) @Nullable LonggetPNCounterSessionEpoch(String name, String sessionId) @Nullable DistributedPriorityQueue<String> getPriorityQueue(String name) @Nullable DistributedQueue<String> Returns the set of all queue names in the registry.@Nullable ReliableTopic<String> getReliableTopic(String name) @Nullable DistributedRingbuffer<String> getRingbuffer(String name) @Nullable DistributedSet<String> Returns the set of all set names in the registry.getSqlIndex(String name) getSqlMapping(String name) getSqlType(String name) getSqlView(String name) @Nullable DistributedTopic<String> Returns the set of all topic names in the registry.intbooleanReturns whether this registry currently stores user data that would need to move during a partition ownership cutover.intbooleanisPNCounterSessionBound(String name, String sessionId, long epoch) intintintmapCount()mapNames()Returns the set of all map names in the registry.intReturns non-map structure types that currently hold user data.intintintbooleanputSqlIndex(SqlIndex index, boolean replaceExisting) booleanputSqlMapping(SqlMapping mapping, boolean replaceExisting) booleanputSqlType(SqlType type, boolean replaceExisting) booleanputSqlView(SqlView view, boolean replaceExisting) intvoidRegister a listener to be notified when distributed objects are created or destroyed on this node.voidregisterGSet(String name, GSet<String> gSet) voidregisterIdGenerator(String name, SnowflakeIdGenerator generator) voidregisterList(String name, DistributedList<String> list) voidregisterLWWRegister(String name, LWWRegister<String> register) voidregisterMap(String name, DistributedMap<String, String> map) voidregisterMultiMap(String name, DistributedMultiMap multiMap) voidregisterORSet(String name, ORSet<String> orSet) voidregisterPriorityQueue(String name, DistributedPriorityQueue<String> priorityQueue) voidregisterQueue(String name, DistributedQueue<String> queue) voidregisterReliableTopic(String name, ReliableTopic<String> topic) voidregisterRingbuffer(String name, DistributedRingbuffer<String> ringbuffer) voidregisterSet(String name, DistributedSet<String> set) voidregisterTopic(String name, DistributedTopic<String> topic) intbooleanremoveMapIfSame(String name, DistributedMap<String, String> map) booleanremoveQueueIfSame(String name, DistributedQueue<String> queue) booleanremoveSetIfSame(String name, DistributedSet<String> set) voidrestoreFromSnapshot(HashMap<String, Object> snapshot) Restores all data structures from a snapshot.intvoidrunWithoutWanReplication(Runnable action) Run a local mutation without emitting it back into WAN replication.intsetCount()voidsetDataConnectionConfigs(Map<String, DataConnectionConfig> configs) voidvoidsetExecutorMemberSupplier(Supplier<Collection<NodeInfo>> executorMemberSupplier) voidsetGenericMapStoreConfigs(Map<String, GenericMapStoreConfig> configs) voidsetKryoSerializer(KryoSerializer kryoSerializer) voidvoidsetMaxExecutorServices(int max) voidsetMaxInstancesPerType(int max) voidsetMaxMemoryBytesPerMap(long maxBytes) Set the maximum memory limit for all maps in this registry.voidsetPartitionRouter(PartitionRouter partitionRouter) Set the partition router.voidsetPriorityQueueConfigs(Map<String, PriorityQueueConfig> configs) voidsetQueueConfigs(Map<String, QueueConfig> configs) voidshutdown()Shut down registry-owned resources without changing their logical data.Creates a serializable snapshot of all data structures.sqlTypes()sqlViews()intvoidvoidUnregister a previously-registered listener.
-
Field Details
-
DEFAULT_MAX_EXECUTOR_SERVICES
public static final int DEFAULT_MAX_EXECUTOR_SERVICES- See Also:
-
TYPE_MAP
Type identifier for maps — seeDistributedObjectListenerdocs.- See Also:
-
TYPE_QUEUE
-
TYPE_TOPIC
-
TYPE_SET
-
TYPE_MULTIMAP
-
TYPE_RINGBUFFER
-
TYPE_LIST
-
TYPE_PRIORITY_QUEUE
-
TYPE_RELIABLE_TOPIC
-
TYPE_PN_COUNTER
-
TYPE_G_SET
-
TYPE_OR_SET
-
TYPE_LWW_REGISTER
Type identifier for last-writer-wins register CRDTs.- See Also:
-
TYPE_ID_GENERATOR
-
TYPE_CQC
-
TYPE_EXECUTOR
-
-
Constructor Details
-
DataStructureRegistry
public DataStructureRegistry(int instanceNumber) Create a new data structure registry.- Parameters:
instanceNumber- the node instance number for logging
-
DataStructureRegistry
public DataStructureRegistry(int instanceNumber, int idGeneratorNodeId) -
DataStructureRegistry
Create a new data structure registry.- Parameters:
instanceNumber- the node instance number for loggingkryoSerializer- serializer used by distributed executors
-
DataStructureRegistry
public DataStructureRegistry(int instanceNumber, int idGeneratorNodeId, KryoSerializer kryoSerializer)
-
-
Method Details
-
setPartitionRouter
Set the partition router. Called byCacheNodeat startup whenloomcache.server.sharding.enabled=true. When unset, the registry operates in single-group mode (current v1.x behavior).- Parameters:
partitionRouter- the router (non-null)- Throws:
NullPointerException- ifpartitionRouteris null
-
setKryoSerializer
-
setExecutorMemberSupplier
-
getKryoSerializer
-
setMaxMemoryBytesPerMap
public void setMaxMemoryBytesPerMap(long maxBytes) Set the maximum memory limit for all maps in this registry.The limit is applied to new maps created after this call and to existing maps via setMaxMemoryBytes().
- Parameters:
maxBytes- the maximum bytes per map (must be > 0)- Throws:
IllegalArgumentException- if maxBytes invalid input: '<'= 0
-
setDefaultMapConfig
-
setMaxInstancesPerType
public void setMaxInstancesPerType(int max) -
setMaxExecutorServices
public void setMaxExecutorServices(int max) -
runWithoutWanReplication
Run a local mutation without emitting it back into WAN replication. -
registerDistributedObjectListener
Register a listener to be notified when distributed objects are created or destroyed on this node. Listeners fire on:- First successful
getOrCreate*call for a given name (create) - Explicit
destroy*orremove*IfSamecall (destroy)
- Parameters:
listener- the listener to register (not null)
- First successful
-
unregisterDistributedObjectListener
Unregister a previously-registered listener. No-op when the listener was not registered.- Parameters:
listener- the listener to remove (not null)
-
getOrCreateMap
-
getMap
-
putSqlMapping
-
getSqlMapping
-
sqlMappings
-
putSqlIndex
-
getSqlIndex
-
sqlIndexes
-
applyDeclarativeIndexes
-
putSqlView
-
getSqlView
-
sqlViews
-
putSqlType
-
getSqlType
-
sqlTypes
-
registerMap
-
setLocalWriterId
-
removeMapIfSame
-
ensurePartitionWriteRedirect
public void ensurePartitionWriteRedirect(int partitionId, int sourceGroup, PartitionRouter router, Function<Integer, DataStructureRegistry> registryLookup) -
freezeMapCreation
public void freezeMapCreation() -
unfreezeMapCreation
public void unfreezeMapCreation() -
getOrCreateQueue
-
getQueue
-
registerQueue
-
setQueueConfigs
-
getQueueConfigs
-
setDataConnectionConfigs
-
getDataConnectionConfigs
-
setGenericMapStoreConfigs
-
getGenericMapStoreConfigs
-
removeQueueIfSame
-
getOrCreateTopic
-
getTopic
-
registerTopic
-
getOrCreateSet
-
getSet
-
registerSet
-
removeSetIfSame
-
getOrCreateMultiMap
-
getMultiMap
-
registerMultiMap
-
getOrCreateRingbuffer
-
getRingbuffer
-
registerRingbuffer
-
getOrCreateList
Get or create a distributed list with the default CP (linearizable) consistency mode.- Parameters:
name- the list name (must not be blank)- Returns:
- the existing or newly created list
-
getOrCreateList
Get or create a distributed list with the specified consistency mode.If a list with this name already exists, the existing instance is returned regardless of the requested mode. The consistency mode is immutable after creation.
- Parameters:
name- the list name (must not be blank)mode- the consistency mode (CP or AP)- Returns:
- the existing or newly created list
-
getList
-
registerList
-
listCount
public int listCount() -
getOrCreatePriorityQueue
-
getPriorityQueue
-
registerPriorityQueue
-
setPriorityQueueConfigs
-
getPriorityQueueConfigs
-
priorityQueueCount
public int priorityQueueCount() -
getOrCreateReliableTopic
Get or create a reliable topic with default capacity.- Parameters:
name- the topic name (must not be blank)- Returns:
- the existing or newly created reliable topic
-
getOrCreateReliableTopic
Get or create a reliable topic with specified ringbuffer capacity.If a reliable topic with this name already exists, the existing instance is returned regardless of the requested capacity.
- Parameters:
name- the topic name (must not be blank)capacity- the ringbuffer capacity for new topics- Returns:
- the existing or newly created reliable topic
-
getReliableTopic
-
registerReliableTopic
-
reliableTopicCount
public int reliableTopicCount() -
getOrCreatePNCounter
-
getPNCounter
-
bindPNCounterSession
-
getPNCounterSessionEpoch
-
isPNCounterSessionBound
-
pnCounterCount
public int pnCounterCount() -
getOrCreateGSet
-
getGSet
-
registerGSet
-
gSetCount
public int gSetCount() -
getOrCreateORSet
-
getORSet
-
registerORSet
-
orSetCount
public int orSetCount() -
getOrCreateLWWRegister
-
getLWWRegister
-
registerLWWRegister
-
lwwRegisterCount
public int lwwRegisterCount() -
getOrCreateIdGenerator
-
getIdGenerator
-
registerIdGenerator
-
idGeneratorCount
public int idGeneratorCount() -
createContinuousQueryCache
public ContinuousQueryCache<String,String> createContinuousQueryCache(String cqcName, String mapName, MapPredicate<String, String> predicate) Create a continuous query cache on a named map with the given predicate.If a CQC with this name already exists, the existing instance is returned. The CQC registers as a listener on the source map and auto-populates.
- Parameters:
cqcName- the CQC name (must not be blank)mapName- the source map name (must exist or be auto-created)predicate- the filter predicate- Returns:
- the existing or newly created CQC
-
getContinuousQueryCache
-
destroyContinuousQueryCache
Destroy a CQC by name: deregisters its listener and removes it from the registry.- Parameters:
name- the CQC name- Returns:
- true if the CQC was found and destroyed
-
shutdown
public void shutdown()Shut down registry-owned resources without changing their logical data.CacheNode invokes this during orderly shutdown after request draining. Maps need this to flush write-behind MapStore queues and close stores; executors need it to stop their worker and scheduler pools.
-
cqcCount
public int cqcCount() -
getOrCreateExecutor
Get or create a distributed executor service with the given name.If an executor with this name already exists, the existing instance is returned. Uses the registry's configured
KryoSerializerfor task/result serialization.- Parameters:
name- the executor name (must not be blank)- Returns:
- the existing or newly created executor service
-
getExecutor
Get an existing executor service by name, or null if it does not exist.- Parameters:
name- the executor name- Returns:
- the executor service, or null if not found
-
getMapNames
-
getQueueNames
-
getSetNames
-
getTopicNames
-
getExecutorNames
-
executorCount
public int executorCount()Returns the number of executor services in the registry.- Returns:
- the executor count
-
mapNames
-
mapCount
public int mapCount() -
queueCount
public int queueCount() -
topicCount
public int topicCount() -
setCount
public int setCount() -
multiMapCount
public int multiMapCount() -
ringbufferCount
public int ringbufferCount() -
hasMigratableData
public boolean hasMigratableData()Returns whether this registry currently stores user data that would need to move during a partition ownership cutover.This intentionally ignores control-plane metadata such as partition-router and CP-subsystem snapshots. Empty structures do not count as migratable data.
-
nonMapMigratableDataTypes
Returns non-map structure types that currently hold user data.The partition rebalancer currently has map-specific copy/sweep/delete semantics. A non-empty result means a map-only partition ownership cutover would leave some routed state behind in the old group.
-
snapshotAllData
Creates a serializable snapshot of all data structures. Returns a HashMap containing all maps, queues, sets, sorted sets, locks, and counters — suitable for Kryo serialization.This is called during Raft auto-compaction to persist the full state machine state. On recovery, the snapshot is deserialized and replayed to restore state without replaying the entire WAL.
- Returns:
- a Map representing the full state of all data structures
-
restoreFromSnapshot
Restores all data structures from a snapshot. Clears existing data and populates all structures from the provided snapshot map.This is called during recovery to restore the state machine from a persisted snapshot, typically after deserializing a Kryo-encoded snapshot from disk.
- Parameters:
snapshot- a Map containing the snapshot data (expected keys: maps, queues, sets, sortedSets, counters)
-