Class DataStructureRegistry

java.lang.Object
com.loomcache.server.datastructures.DataStructureRegistry

public class DataStructureRegistry extends Object
Central registry for all distributed data structures on a node.

Provides lazy creation (get-or-create) semantics: requesting a map/queue/topic by name either returns the existing instance or creates a new one.

Thread Safety: Uses ConcurrentHashMap storage with synchronized creation per registry type so instance-cap checks stay atomic across names. Memory limits and listener manager are volatile for visibility across threads.

Since:
1.0
  • Field Details

  • Constructor Details

    • DataStructureRegistry

      public DataStructureRegistry(int instanceNumber)
      Create a new data structure registry.
      Parameters:
      instanceNumber - the node instance number for logging
    • DataStructureRegistry

      public DataStructureRegistry(int instanceNumber, int idGeneratorNodeId)
    • DataStructureRegistry

      public DataStructureRegistry(int instanceNumber, KryoSerializer kryoSerializer)
      Create a new data structure registry.
      Parameters:
      instanceNumber - the node instance number for logging
      kryoSerializer - serializer used by distributed executors
    • DataStructureRegistry

      public DataStructureRegistry(int instanceNumber, int idGeneratorNodeId, KryoSerializer kryoSerializer)
  • Method Details

    • setPartitionRouter

      public void setPartitionRouter(PartitionRouter partitionRouter)
      Set the partition router. Called by CacheNode at startup when loomcache.server.sharding.enabled=true. When unset, the registry operates in single-group mode (current v1.x behavior).
      Parameters:
      partitionRouter - the router (non-null)
      Throws:
      NullPointerException - if partitionRouter is null
    • setKryoSerializer

      public void setKryoSerializer(KryoSerializer kryoSerializer)
    • setExecutorMemberSupplier

      public void setExecutorMemberSupplier(Supplier<Collection<NodeInfo>> executorMemberSupplier)
    • getKryoSerializer

      public KryoSerializer getKryoSerializer()
    • setMaxMemoryBytesPerMap

      public void setMaxMemoryBytesPerMap(long maxBytes)
      Set the maximum memory limit for all maps in this registry.

      The limit is applied to new maps created after this call and to existing maps via setMaxMemoryBytes().

      Parameters:
      maxBytes - the maximum bytes per map (must be > 0)
      Throws:
      IllegalArgumentException - if maxBytes invalid input: '<'= 0
    • setDefaultMapConfig

      public void setDefaultMapConfig(DistributedMapConfig config)
    • setMaxInstancesPerType

      public void setMaxInstancesPerType(int max)
    • setMaxExecutorServices

      public void setMaxExecutorServices(int max)
    • runWithoutWanReplication

      public void runWithoutWanReplication(Runnable action)
      Run a local mutation without emitting it back into WAN replication.
    • registerDistributedObjectListener

      public void registerDistributedObjectListener(DistributedObjectListener listener)
      Register a listener to be notified when distributed objects are created or destroyed on this node. Listeners fire on:
      • First successful getOrCreate* call for a given name (create)
      • Explicit destroy* or remove*IfSame call (destroy)
      Parameters:
      listener - the listener to register (not null)
    • unregisterDistributedObjectListener

      public void unregisterDistributedObjectListener(DistributedObjectListener listener)
      Unregister a previously-registered listener. No-op when the listener was not registered.
      Parameters:
      listener - the listener to remove (not null)
    • getOrCreateMap

      public DistributedMap<String,String> getOrCreateMap(String name)
    • getMap

      public @Nullable DistributedMap<String,String> getMap(String name)
    • putSqlMapping

      public boolean putSqlMapping(SqlMapping mapping, boolean replaceExisting)
    • getSqlMapping

      public Optional<SqlMapping> getSqlMapping(String name)
    • sqlMappings

      public Map<String, SqlMapping> sqlMappings()
    • putSqlIndex

      public boolean putSqlIndex(SqlIndex index, boolean replaceExisting)
    • getSqlIndex

      public Optional<SqlIndex> getSqlIndex(String name)
    • sqlIndexes

      public Map<String,SqlIndex> sqlIndexes()
    • applyDeclarativeIndexes

      public int applyDeclarativeIndexes(Map<String, List<IndexConfig>> mapIndexes)
    • putSqlView

      public boolean putSqlView(SqlView view, boolean replaceExisting)
    • getSqlView

      public Optional<SqlView> getSqlView(String name)
    • sqlViews

      public Map<String,SqlView> sqlViews()
    • putSqlType

      public boolean putSqlType(SqlType type, boolean replaceExisting)
    • getSqlType

      public Optional<SqlType> getSqlType(String name)
    • sqlTypes

      public Map<String,SqlType> sqlTypes()
    • registerMap

      public void registerMap(String name, DistributedMap<String,String> map)
    • setLocalWriterId

      public void setLocalWriterId(String id)
    • removeMapIfSame

      public boolean removeMapIfSame(String name, DistributedMap<String,String> map)
    • ensurePartitionWriteRedirect

      public void ensurePartitionWriteRedirect(int partitionId, int sourceGroup, PartitionRouter router, Function<Integer, DataStructureRegistry> registryLookup)
    • freezeMapCreation

      public void freezeMapCreation()
    • unfreezeMapCreation

      public void unfreezeMapCreation()
    • getOrCreateQueue

      public DistributedQueue<String> getOrCreateQueue(String name)
    • getQueue

      public @Nullable DistributedQueue<String> getQueue(String name)
    • registerQueue

      public void registerQueue(String name, DistributedQueue<String> queue)
    • setQueueConfigs

      public void setQueueConfigs(Map<String, QueueConfig> configs)
    • getQueueConfigs

      public Map<String, QueueConfig> getQueueConfigs()
    • setDataConnectionConfigs

      public void setDataConnectionConfigs(Map<String, DataConnectionConfig> configs)
    • getDataConnectionConfigs

      public Map<String, DataConnectionConfig> getDataConnectionConfigs()
    • setGenericMapStoreConfigs

      public void setGenericMapStoreConfigs(Map<String, GenericMapStoreConfig> configs)
    • getGenericMapStoreConfigs

      public Map<String, GenericMapStoreConfig> getGenericMapStoreConfigs()
    • removeQueueIfSame

      public boolean removeQueueIfSame(String name, DistributedQueue<String> queue)
    • getOrCreateTopic

      public DistributedTopic<String> getOrCreateTopic(String name)
    • getTopic

      public @Nullable DistributedTopic<String> getTopic(String name)
    • registerTopic

      public void registerTopic(String name, DistributedTopic<String> topic)
    • getOrCreateSet

      public DistributedSet<String> getOrCreateSet(String name)
    • getSet

      public @Nullable DistributedSet<String> getSet(String name)
    • registerSet

      public void registerSet(String name, DistributedSet<String> set)
    • removeSetIfSame

      public boolean removeSetIfSame(String name, DistributedSet<String> set)
    • getOrCreateMultiMap

      public DistributedMultiMap getOrCreateMultiMap(String name)
    • getMultiMap

      public @Nullable DistributedMultiMap getMultiMap(String name)
    • registerMultiMap

      public void registerMultiMap(String name, DistributedMultiMap multiMap)
    • getOrCreateRingbuffer

      public DistributedRingbuffer<String> getOrCreateRingbuffer(String name, int capacity)
    • getRingbuffer

      public @Nullable DistributedRingbuffer<String> getRingbuffer(String name)
    • registerRingbuffer

      public void registerRingbuffer(String name, DistributedRingbuffer<String> ringbuffer)
    • getOrCreateList

      public DistributedList<String> getOrCreateList(String name)
      Get or create a distributed list with the default CP (linearizable) consistency mode.
      Parameters:
      name - the list name (must not be blank)
      Returns:
      the existing or newly created list
    • getOrCreateList

      public DistributedList<String> getOrCreateList(String name, DistributedList.ConsistencyMode mode)
      Get or create a distributed list with the specified consistency mode.

      If a list with this name already exists, the existing instance is returned regardless of the requested mode. The consistency mode is immutable after creation.

      Parameters:
      name - the list name (must not be blank)
      mode - the consistency mode (CP or AP)
      Returns:
      the existing or newly created list
    • getList

      public @Nullable DistributedList<String> getList(String name)
    • registerList

      public void registerList(String name, DistributedList<String> list)
    • listCount

      public int listCount()
    • getOrCreatePriorityQueue

      public DistributedPriorityQueue<String> getOrCreatePriorityQueue(String name)
    • getPriorityQueue

      public @Nullable DistributedPriorityQueue<String> getPriorityQueue(String name)
    • registerPriorityQueue

      public void registerPriorityQueue(String name, DistributedPriorityQueue<String> priorityQueue)
    • setPriorityQueueConfigs

      public void setPriorityQueueConfigs(Map<String, PriorityQueueConfig> configs)
    • getPriorityQueueConfigs

      public Map<String, PriorityQueueConfig> getPriorityQueueConfigs()
    • priorityQueueCount

      public int priorityQueueCount()
    • getOrCreateReliableTopic

      public ReliableTopic<String> getOrCreateReliableTopic(String name)
      Get or create a reliable topic with default capacity.
      Parameters:
      name - the topic name (must not be blank)
      Returns:
      the existing or newly created reliable topic
    • getOrCreateReliableTopic

      public ReliableTopic<String> getOrCreateReliableTopic(String name, int capacity)
      Get or create a reliable topic with specified ringbuffer capacity.

      If a reliable topic with this name already exists, the existing instance is returned regardless of the requested capacity.

      Parameters:
      name - the topic name (must not be blank)
      capacity - the ringbuffer capacity for new topics
      Returns:
      the existing or newly created reliable topic
    • getReliableTopic

      public @Nullable ReliableTopic<String> getReliableTopic(String name)
    • registerReliableTopic

      public void registerReliableTopic(String name, ReliableTopic<String> topic)
    • reliableTopicCount

      public int reliableTopicCount()
    • getOrCreatePNCounter

      public PNCounter getOrCreatePNCounter(String name)
    • getPNCounter

      public @Nullable PNCounter getPNCounter(String name)
    • bindPNCounterSession

      public long bindPNCounterSession(String name, String sessionId)
    • getPNCounterSessionEpoch

      public @Nullable Long getPNCounterSessionEpoch(String name, String sessionId)
    • isPNCounterSessionBound

      public boolean isPNCounterSessionBound(String name, String sessionId, long epoch)
    • pnCounterCount

      public int pnCounterCount()
    • getOrCreateGSet

      public GSet<String> getOrCreateGSet(String name)
    • getGSet

      public @Nullable GSet<String> getGSet(String name)
    • registerGSet

      public void registerGSet(String name, GSet<String> gSet)
    • gSetCount

      public int gSetCount()
    • getOrCreateORSet

      public ORSet<String> getOrCreateORSet(String name)
    • getORSet

      public @Nullable ORSet<String> getORSet(String name)
    • registerORSet

      public void registerORSet(String name, ORSet<String> orSet)
    • orSetCount

      public int orSetCount()
    • getOrCreateLWWRegister

      public LWWRegister<String> getOrCreateLWWRegister(String name)
    • getLWWRegister

      public @Nullable LWWRegister<String> getLWWRegister(String name)
    • registerLWWRegister

      public void registerLWWRegister(String name, LWWRegister<String> register)
    • lwwRegisterCount

      public int lwwRegisterCount()
    • getOrCreateIdGenerator

      public SnowflakeIdGenerator getOrCreateIdGenerator(String name)
    • getIdGenerator

      public @Nullable SnowflakeIdGenerator getIdGenerator(String name)
    • registerIdGenerator

      public void registerIdGenerator(String name, SnowflakeIdGenerator generator)
    • idGeneratorCount

      public int idGeneratorCount()
    • createContinuousQueryCache

      public ContinuousQueryCache<String,String> createContinuousQueryCache(String cqcName, String mapName, MapPredicate<String,String> predicate)
      Create a continuous query cache on a named map with the given predicate.

      If a CQC with this name already exists, the existing instance is returned. The CQC registers as a listener on the source map and auto-populates.

      Parameters:
      cqcName - the CQC name (must not be blank)
      mapName - the source map name (must exist or be auto-created)
      predicate - the filter predicate
      Returns:
      the existing or newly created CQC
    • getContinuousQueryCache

      public @Nullable ContinuousQueryCache<String,String> getContinuousQueryCache(String name)
    • destroyContinuousQueryCache

      public boolean destroyContinuousQueryCache(String name)
      Destroy a CQC by name: deregisters its listener and removes it from the registry.
      Parameters:
      name - the CQC name
      Returns:
      true if the CQC was found and destroyed
    • shutdown

      public void shutdown()
      Shut down registry-owned resources without changing their logical data.

      CacheNode invokes this during orderly shutdown after request draining. Maps need this to flush write-behind MapStore queues and close stores; executors need it to stop their worker and scheduler pools.

    • cqcCount

      public int cqcCount()
    • getOrCreateExecutor

      public DistributedExecutorService getOrCreateExecutor(String name)
      Get or create a distributed executor service with the given name.

      If an executor with this name already exists, the existing instance is returned. Uses the registry's configured KryoSerializer for task/result serialization.

      Parameters:
      name - the executor name (must not be blank)
      Returns:
      the existing or newly created executor service
    • getExecutor

      public @Nullable DistributedExecutorService getExecutor(String name)
      Get an existing executor service by name, or null if it does not exist.
      Parameters:
      name - the executor name
      Returns:
      the executor service, or null if not found
    • getMapNames

      public Set<String> getMapNames()
      Returns the set of all map names in the registry.
      Returns:
      an unmodifiable set of map names
    • getQueueNames

      public Set<String> getQueueNames()
      Returns the set of all queue names in the registry.
      Returns:
      an unmodifiable set of queue names
    • getSetNames

      public Set<String> getSetNames()
      Returns the set of all set names in the registry.
      Returns:
      an unmodifiable set of set names
    • getTopicNames

      public Set<String> getTopicNames()
      Returns the set of all topic names in the registry.
      Returns:
      an unmodifiable set of topic names
    • getExecutorNames

      public Set<String> getExecutorNames()
      Returns the set of all executor names in the registry.
      Returns:
      an unmodifiable set of executor names
    • executorCount

      public int executorCount()
      Returns the number of executor services in the registry.
      Returns:
      the executor count
    • mapNames

      public Set<String> mapNames()
      Returns the set of all map names in the registry.
      Returns:
      an unmodifiable set of map names
    • mapCount

      public int mapCount()
    • queueCount

      public int queueCount()
    • topicCount

      public int topicCount()
    • setCount

      public int setCount()
    • multiMapCount

      public int multiMapCount()
    • ringbufferCount

      public int ringbufferCount()
    • hasMigratableData

      public boolean hasMigratableData()
      Returns whether this registry currently stores user data that would need to move during a partition ownership cutover.

      This intentionally ignores control-plane metadata such as partition-router and CP-subsystem snapshots. Empty structures do not count as migratable data.

    • nonMapMigratableDataTypes

      public Set<String> nonMapMigratableDataTypes()
      Returns non-map structure types that currently hold user data.

      The partition rebalancer currently has map-specific copy/sweep/delete semantics. A non-empty result means a map-only partition ownership cutover would leave some routed state behind in the old group.

    • snapshotAllData

      public HashMap<String,Object> snapshotAllData()
      Creates a serializable snapshot of all data structures. Returns a HashMap containing all maps, queues, sets, sorted sets, locks, and counters — suitable for Kryo serialization.

      This is called during Raft auto-compaction to persist the full state machine state. On recovery, the snapshot is deserialized and replayed to restore state without replaying the entire WAL.

      Returns:
      a Map representing the full state of all data structures
    • restoreFromSnapshot

      public void restoreFromSnapshot(HashMap<String,Object> snapshot)
      Restores all data structures from a snapshot. Clears existing data and populates all structures from the provided snapshot map.

      This is called during recovery to restore the state machine from a persisted snapshot, typically after deserializing a Kryo-encoded snapshot from disk.

      Parameters:
      snapshot - a Map containing the snapshot data (expected keys: maps, queues, sets, sortedSets, counters)