Class ConsistencySubsystem

java.lang.Object
com.loomcache.server.cp.ConsistencySubsystem

public final class ConsistencySubsystem extends Object
Consistency Subsystem — manages Raft-based strongly consistent distributed primitives.

Purpose: The Consistency Subsystem provides linearizable (strongly consistent) operations for distributed primitives like locks, semaphores, atomic values, and latches. All operations are replicated via Raft consensus, ensuring global ordering and consistency.

Primitives Supported:

  • LinearizableLock — distributed lock with fencing tokens
  • LinearizableSemaphore — distributed semaphore (session-aware or sessionless)
  • LinearizableAtomicLong — Linearizable atomic long
  • LinearizableAtomicReference — Linearizable atomic reference
  • LinearizableLatch — distributed countdown latch

Architecture:

  • Each primitive type is managed in its own namespace (locks, semaphores, etc.).
  • Primitives are routed to Raft groups via RaftGroupManager.
  • Session management for session-aware primitives.
  • Lazy creation of primitives on first access.

Thread Safety: This class is thread-safe. Multiple threads can safely create and use primitives concurrently.

Example Usage:

ConsistencySubsystem cp = new ConsistencySubsystem(raftGroupManager);

// Distributed lock
LinearizableLock lock = cp.getLock("my-lock");
long fence = lock.lock();
try {
    doWork();
} finally {
    lock.unlock();
}

// Distributed semaphore
LinearizableSemaphore sem = cp.getSemaphore("rate-limit", 10, false);
sem.acquire(1);
try {
    limitedResource();
} finally {
    sem.release(1);
}

// Atomic long
LinearizableAtomicLong counter = cp.getAtomicLong("counter", 0);
counter.incrementAndGet();
Since:
1.0
  • Field Details

    • DEFAULT_SESSION_TIMEOUT_MS

      public static final long DEFAULT_SESSION_TIMEOUT_MS
  • Constructor Details

    • ConsistencySubsystem

      public ConsistencySubsystem(RaftGroupManagerApi raftGroupManager)
      Creates a new ConsistencySubsystem with the given Raft group manager using the Hazelcast-compatible default five-minute CP session timeout.
      Parameters:
      raftGroupManager - the RaftGroupManager for routing operations (must not be null)
      Throws:
      NullPointerException - if raftGroupManager is null
    • ConsistencySubsystem

      public ConsistencySubsystem(RaftGroupManagerApi raftGroupManager, long sessionTimeoutMs)
      Creates a new ConsistencySubsystem with the given Raft group manager and session timeout.
      Parameters:
      raftGroupManager - the RaftGroupManager for routing operations (must not be null)
      sessionTimeoutMs - the session timeout in milliseconds (must be positive)
      Throws:
      NullPointerException - if raftGroupManager is null
      IllegalArgumentException - if sessionTimeoutMs invalid input: '<'= 0
  • Method Details

    • getLock

      public LinearizableLock getLock(String lockName)
      Gets or creates a LinearizableLock with the given name.

      If the lock already exists, the existing instance is returned. Lock creation is lazy and thread-safe.

      Parameters:
      lockName - the name of the lock (must not be null or empty)
      Returns:
      the LinearizableLock instance (never null)
      Throws:
      NullPointerException - if lockName is null
      IllegalArgumentException - if lockName is empty
    • getSemaphore

      public LinearizableSemaphore getSemaphore(String semaphoreName, int permitCount, boolean sessionAware)
      Gets or creates an LinearizableSemaphore with the given name and permit count.

      If the semaphore already exists, the permitCount is ignored and the existing semaphore is returned.

      Parameters:
      semaphoreName - the name of the semaphore (must not be null or empty)
      permitCount - the initial number of permits (must be non-negative)
      sessionAware - true for session-aware mode, false for sessionless
      Returns:
      the LinearizableSemaphore instance
      Throws:
      NullPointerException - if semaphoreName is null
      IllegalArgumentException - if semaphoreName is empty or permitCount invalid input: '<' 0
    • getSemaphoreIfExists

      public @Nullable LinearizableSemaphore getSemaphoreIfExists(String semaphoreName)
      Returns an existing semaphore without creating one.
    • getAtomicLong

      public LinearizableAtomicLong getAtomicLong(String atomicName, long initialValue)
      Gets or creates an LinearizableAtomicLong with the given name and initial value.

      If the atomic long already exists, the initialValue is ignored.

      Parameters:
      atomicName - the name of the atomic long (must not be null or empty)
      initialValue - the initial value
      Returns:
      the LinearizableAtomicLong instance
      Throws:
      NullPointerException - if atomicName is null
      IllegalArgumentException - if atomicName is empty
    • setExternalLinearizableReadRaftNode

      public void setExternalLinearizableReadRaftNode(@Nullable RaftNodeApi raftNode)
    • getAtomicReference

      public <T> LinearizableAtomicReference<T> getAtomicReference(String refName, @Nullable T initialValue)
      Gets or creates an LinearizableAtomicReference with the given name and initial value.

      If the atomic reference already exists, the initialValue is ignored.

      Type Parameters:
      T - the type of the reference
      Parameters:
      refName - the name of the atomic reference (must not be null or empty)
      initialValue - the initial value (can be null)
      Returns:
      the LinearizableAtomicReference instance
      Throws:
      NullPointerException - if refName is null
      IllegalArgumentException - if refName is empty
    • getCountDownLatch

      public LinearizableLatch getCountDownLatch(String latchName, long initialCount)
      Gets or creates an LinearizableLatch with the given name and initial count.

      If the latch already exists, the initialCount is ignored.

      Parameters:
      latchName - the name of the latch (must not be null or empty)
      initialCount - the initial count (must be non-negative)
      Returns:
      the LinearizableLatch instance
      Throws:
      NullPointerException - if latchName is null
      IllegalArgumentException - if latchName is empty or initialCount invalid input: '<' 0
    • createSession

      public ConsistencySession createSession()
      Creates a new ConsistencySession for a client.
      Returns:
      a new ConsistencySession with the configured timeout
    • getSession

      public @Nullable ConsistencySession getSession(String sessionId)
      Gets a session by ID.
      Parameters:
      sessionId - the session ID (must not be null)
      Returns:
      the ConsistencySession, or null if not found
      Throws:
      NullPointerException - if sessionId is null
    • closeSession

      public void closeSession(String sessionId)
      Closes a session.
      Parameters:
      sessionId - the session ID (must not be null)
      Throws:
      NullPointerException - if sessionId is null
    • forceCloseSession

      public boolean forceCloseSession(String sessionId)
      Force-closes a managed CP session for operator recovery.
      Parameters:
      sessionId - the session ID (must not be null or blank)
      Returns:
      true when an active managed session or compatibility wrapper was closed
    • heartbeatSession

      public boolean heartbeatSession(String sessionId)
      Records a heartbeat for a managed session.
      Parameters:
      sessionId - the session ID (must not be null)
      Returns:
      true if the heartbeat was recorded, false if the session is not found or inactive
      Throws:
      NullPointerException - if sessionId is null
    • tryAcquireSemaphore

      public boolean tryAcquireSemaphore(String semaphoreName, int initialPermits, int permits, @Nullable String sessionId, long timeout, TimeUnit unit) throws InterruptedException
      Attempts to acquire permits from a CP semaphore.
      Parameters:
      semaphoreName - the semaphore name
      initialPermits - the initial permit count used only if the semaphore does not yet exist
      permits - the number of permits to acquire
      sessionId - the optional session ID for session-aware acquisition
      timeout - the timeout value
      unit - the timeout unit
      Returns:
      true if permits were acquired, false if the timeout expired
      Throws:
      InterruptedException - if interrupted while waiting
    • releaseSemaphore

      public void releaseSemaphore(String semaphoreName, int initialPermits, int permits, @Nullable String sessionId)
      Releases permits back to a CP semaphore.
      Parameters:
      semaphoreName - the semaphore name
      initialPermits - the initial permit count used only if the semaphore does not yet exist
      permits - the number of permits to release
      sessionId - the optional session ID for session-aware release
    • getSessionStats

      public SessionStats getSessionStats()
      Gets statistics about managed sessions.
      Returns:
      SessionStats with current metrics
    • getActiveManagedSessions

      public Collection<SessionManager.ManagedSession> getActiveManagedSessions()
      Gets all active managed sessions.
      Returns:
      a collection of active ManagedSession objects
    • listSessions

      public List<ConsistencySubsystem.CpSessionInfo> listSessions()
      Lists active CP sessions for administrative inspection.
      Returns:
      immutable session summaries sorted by session ID
    • getSessionManagerForTesting

      public SessionManager getSessionManagerForTesting()
      Gets the SessionManager for direct access (tests only).
      Returns:
      the SessionManager instance
    • getTokenGeneratorForTesting

      public FencingTokenGenerator getTokenGeneratorForTesting()
      Gets the FencingTokenGenerator for direct access (tests only).
      Returns:
      the FencingTokenGenerator instance
    • getLockCount

      public int getLockCount()
      Gets the number of locks currently managed.
      Returns:
      the number of locks
    • getSemaphoreCount

      public int getSemaphoreCount()
      Gets the number of semaphores currently managed.
      Returns:
      the number of semaphores
    • getAtomicLongCount

      public int getAtomicLongCount()
      Gets the number of atomic longs currently managed.
      Returns:
      the number of atomic longs
    • getAtomicReferenceCount

      public int getAtomicReferenceCount()
      Gets the number of atomic references currently managed.
      Returns:
      the number of atomic references
    • getCountDownLatchCount

      public int getCountDownLatchCount()
      Gets the number of count down latches currently managed.
      Returns:
      the number of latches
    • getSessionCount

      public int getSessionCount()
      Gets the number of active sessions.
      Returns:
      the number of sessions
    • destroyLock

      public boolean destroyLock(String lockName)
      Destroys a lock by name.
      Parameters:
      lockName - the name of the lock (must not be null)
      Returns:
      true if the lock was removed, false if it didn't exist
      Throws:
      NullPointerException - if lockName is null
    • destroySemaphore

      public boolean destroySemaphore(String semaphoreName)
      Destroys a semaphore by name.
      Parameters:
      semaphoreName - the name of the semaphore (must not be null)
      Returns:
      true if the semaphore was removed, false if it didn't exist
      Throws:
      NullPointerException - if semaphoreName is null
    • destroyAtomicLong

      public boolean destroyAtomicLong(String atomicName)
      Destroys an atomic long by name.
      Parameters:
      atomicName - the name of the atomic long (must not be null)
      Returns:
      true if the atomic long was removed, false if it didn't exist
      Throws:
      NullPointerException - if atomicName is null
    • destroyAtomicReference

      public boolean destroyAtomicReference(String refName)
      Destroys an atomic reference by name.
      Parameters:
      refName - the name of the atomic reference (must not be null)
      Returns:
      true if the atomic reference was removed, false if it didn't exist
      Throws:
      NullPointerException - if refName is null
    • destroyCountDownLatch

      public boolean destroyCountDownLatch(String latchName)
      Destroys a count down latch by name.
      Parameters:
      latchName - the name of the latch (must not be null)
      Returns:
      true if the latch was removed, false if it didn't exist
      Throws:
      NullPointerException - if latchName is null
    • reset

      Resets all local CP subsystem state without touching AP data structures.

      This is intended for operator recovery when CP metadata/state must be discarded. Existing primitive handles are invalidated where supported; callers should recreate CP proxies after the reset completes.

      Returns:
      counts of CP resources removed by the reset
    • takeSnapshot

      public HashMap<String,Object> takeSnapshot()
      Creates a serializable snapshot of all CP primitive state.

      Captures: atomic long names+values, lock names+held state, semaphore names+available permits+initial permits+session awareness, latch names+counts, and the fencing token generator counter.

      Returns:
      a HashMap representing the full CP state, suitable for Kryo serialization
    • restoreFromSnapshot

      public void restoreFromSnapshot(HashMap<String,Object> snapshot)
      Restores all CP primitive state from a snapshot.

      Clears existing primitives and recreates them from the snapshot data. Held lock state is restored from the snapshot's session/fence metadata so linearizable lock ownership survives snapshot replay. Original thread identity is JVM-local and is not preserved across restore.

      Parameters:
      snapshot - the snapshot data (must not be null)
    • shutdown

      public void shutdown()
      Shuts down the consistency subsystem, stopping the session expiration thread. Must be called during server shutdown to prevent resource leaks.
    • toString

      public String toString()
      Overrides:
      toString in class Object