Class ConsistencySubsystem
Purpose: The Consistency Subsystem provides linearizable (strongly consistent) operations for distributed primitives like locks, semaphores, atomic values, and latches. All operations are replicated via Raft consensus, ensuring global ordering and consistency.
Primitives Supported:
- LinearizableLock — distributed lock with fencing tokens
- LinearizableSemaphore — distributed semaphore (session-aware or sessionless)
- LinearizableAtomicLong — Linearizable atomic long
- LinearizableAtomicReference — Linearizable atomic reference
- LinearizableLatch — distributed countdown latch
Architecture:
- Each primitive type is managed in its own namespace (locks, semaphores, etc.).
- Primitives are routed to Raft groups via RaftGroupManager.
- Session management for session-aware primitives.
- Lazy creation of primitives on first access.
Thread Safety: This class is thread-safe. Multiple threads can safely create and use primitives concurrently.
Example Usage:
ConsistencySubsystem cp = new ConsistencySubsystem(raftGroupManager);
// Distributed lock
LinearizableLock lock = cp.getLock("my-lock");
long fence = lock.lock();
try {
doWork();
} finally {
lock.unlock();
}
// Distributed semaphore
LinearizableSemaphore sem = cp.getSemaphore("rate-limit", 10, false);
sem.acquire(1);
try {
limitedResource();
} finally {
sem.release(1);
}
// Atomic long
LinearizableAtomicLong counter = cp.getAtomicLong("counter", 0);
counter.incrementAndGet();
- Since:
- 1.0
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic final recordstatic final record -
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionConsistencySubsystem(RaftGroupManagerApi raftGroupManager) Creates a new ConsistencySubsystem with the given Raft group manager using the Hazelcast-compatible default five-minute CP session timeout.ConsistencySubsystem(RaftGroupManagerApi raftGroupManager, long sessionTimeoutMs) Creates a new ConsistencySubsystem with the given Raft group manager and session timeout. -
Method Summary
Modifier and TypeMethodDescriptionvoidcloseSession(String sessionId) Closes a session.Creates a new ConsistencySession for a client.booleandestroyAtomicLong(String atomicName) Destroys an atomic long by name.booleandestroyAtomicReference(String refName) Destroys an atomic reference by name.booleandestroyCountDownLatch(String latchName) Destroys a count down latch by name.booleandestroyLock(String lockName) Destroys a lock by name.booleandestroySemaphore(String semaphoreName) Destroys a semaphore by name.booleanforceCloseSession(String sessionId) Force-closes a managed CP session for operator recovery.Gets all active managed sessions.getAtomicLong(String atomicName, long initialValue) Gets or creates an LinearizableAtomicLong with the given name and initial value.intGets the number of atomic longs currently managed.<T> LinearizableAtomicReference<T> getAtomicReference(String refName, @Nullable T initialValue) Gets or creates an LinearizableAtomicReference with the given name and initial value.intGets the number of atomic references currently managed.getCountDownLatch(String latchName, long initialCount) Gets or creates an LinearizableLatch with the given name and initial count.intGets the number of count down latches currently managed.Gets or creates a LinearizableLock with the given name.intGets the number of locks currently managed.getSemaphore(String semaphoreName, int permitCount, boolean sessionAware) Gets or creates an LinearizableSemaphore with the given name and permit count.intGets the number of semaphores currently managed.@Nullable LinearizableSemaphoregetSemaphoreIfExists(String semaphoreName) Returns an existing semaphore without creating one.@Nullable ConsistencySessiongetSession(String sessionId) Gets a session by ID.intGets the number of active sessions.Gets the SessionManager for direct access (tests only).Gets statistics about managed sessions.Gets the FencingTokenGenerator for direct access (tests only).booleanheartbeatSession(String sessionId) Records a heartbeat for a managed session.Lists active CP sessions for administrative inspection.voidreleaseSemaphore(String semaphoreName, int initialPermits, int permits, @Nullable String sessionId) Releases permits back to a CP semaphore.reset()Resets all local CP subsystem state without touching AP data structures.voidrestoreFromSnapshot(HashMap<String, Object> snapshot) Restores all CP primitive state from a snapshot.voidsetExternalLinearizableReadRaftNode(@Nullable RaftNodeApi raftNode) voidshutdown()Shuts down the consistency subsystem, stopping the session expiration thread.Creates a serializable snapshot of all CP primitive state.toString()booleantryAcquireSemaphore(String semaphoreName, int initialPermits, int permits, @Nullable String sessionId, long timeout, TimeUnit unit) Attempts to acquire permits from a CP semaphore.
-
Field Details
-
DEFAULT_SESSION_TIMEOUT_MS
public static final long DEFAULT_SESSION_TIMEOUT_MS
-
-
Constructor Details
-
ConsistencySubsystem
Creates a new ConsistencySubsystem with the given Raft group manager using the Hazelcast-compatible default five-minute CP session timeout.- Parameters:
raftGroupManager- the RaftGroupManager for routing operations (must not be null)- Throws:
NullPointerException- if raftGroupManager is null
-
ConsistencySubsystem
Creates a new ConsistencySubsystem with the given Raft group manager and session timeout.- Parameters:
raftGroupManager- the RaftGroupManager for routing operations (must not be null)sessionTimeoutMs- the session timeout in milliseconds (must be positive)- Throws:
NullPointerException- if raftGroupManager is nullIllegalArgumentException- if sessionTimeoutMs invalid input: '<'= 0
-
-
Method Details
-
getLock
Gets or creates a LinearizableLock with the given name.If the lock already exists, the existing instance is returned. Lock creation is lazy and thread-safe.
- Parameters:
lockName- the name of the lock (must not be null or empty)- Returns:
- the LinearizableLock instance (never null)
- Throws:
NullPointerException- if lockName is nullIllegalArgumentException- if lockName is empty
-
getSemaphore
public LinearizableSemaphore getSemaphore(String semaphoreName, int permitCount, boolean sessionAware) Gets or creates an LinearizableSemaphore with the given name and permit count.If the semaphore already exists, the permitCount is ignored and the existing semaphore is returned.
- Parameters:
semaphoreName- the name of the semaphore (must not be null or empty)permitCount- the initial number of permits (must be non-negative)sessionAware- true for session-aware mode, false for sessionless- Returns:
- the LinearizableSemaphore instance
- Throws:
NullPointerException- if semaphoreName is nullIllegalArgumentException- if semaphoreName is empty or permitCount invalid input: '<' 0
-
getSemaphoreIfExists
Returns an existing semaphore without creating one. -
getAtomicLong
Gets or creates an LinearizableAtomicLong with the given name and initial value.If the atomic long already exists, the initialValue is ignored.
- Parameters:
atomicName- the name of the atomic long (must not be null or empty)initialValue- the initial value- Returns:
- the LinearizableAtomicLong instance
- Throws:
NullPointerException- if atomicName is nullIllegalArgumentException- if atomicName is empty
-
setExternalLinearizableReadRaftNode
-
getAtomicReference
public <T> LinearizableAtomicReference<T> getAtomicReference(String refName, @Nullable T initialValue) Gets or creates an LinearizableAtomicReference with the given name and initial value.If the atomic reference already exists, the initialValue is ignored.
- Type Parameters:
T- the type of the reference- Parameters:
refName- the name of the atomic reference (must not be null or empty)initialValue- the initial value (can be null)- Returns:
- the LinearizableAtomicReference instance
- Throws:
NullPointerException- if refName is nullIllegalArgumentException- if refName is empty
-
getCountDownLatch
Gets or creates an LinearizableLatch with the given name and initial count.If the latch already exists, the initialCount is ignored.
- Parameters:
latchName- the name of the latch (must not be null or empty)initialCount- the initial count (must be non-negative)- Returns:
- the LinearizableLatch instance
- Throws:
NullPointerException- if latchName is nullIllegalArgumentException- if latchName is empty or initialCount invalid input: '<' 0
-
createSession
Creates a new ConsistencySession for a client.- Returns:
- a new ConsistencySession with the configured timeout
-
getSession
Gets a session by ID.- Parameters:
sessionId- the session ID (must not be null)- Returns:
- the ConsistencySession, or null if not found
- Throws:
NullPointerException- if sessionId is null
-
closeSession
Closes a session.- Parameters:
sessionId- the session ID (must not be null)- Throws:
NullPointerException- if sessionId is null
-
forceCloseSession
Force-closes a managed CP session for operator recovery.- Parameters:
sessionId- the session ID (must not be null or blank)- Returns:
- true when an active managed session or compatibility wrapper was closed
-
heartbeatSession
Records a heartbeat for a managed session.- Parameters:
sessionId- the session ID (must not be null)- Returns:
- true if the heartbeat was recorded, false if the session is not found or inactive
- Throws:
NullPointerException- if sessionId is null
-
tryAcquireSemaphore
public boolean tryAcquireSemaphore(String semaphoreName, int initialPermits, int permits, @Nullable String sessionId, long timeout, TimeUnit unit) throws InterruptedException Attempts to acquire permits from a CP semaphore.- Parameters:
semaphoreName- the semaphore nameinitialPermits- the initial permit count used only if the semaphore does not yet existpermits- the number of permits to acquiresessionId- the optional session ID for session-aware acquisitiontimeout- the timeout valueunit- the timeout unit- Returns:
- true if permits were acquired, false if the timeout expired
- Throws:
InterruptedException- if interrupted while waiting
-
releaseSemaphore
public void releaseSemaphore(String semaphoreName, int initialPermits, int permits, @Nullable String sessionId) Releases permits back to a CP semaphore.- Parameters:
semaphoreName- the semaphore nameinitialPermits- the initial permit count used only if the semaphore does not yet existpermits- the number of permits to releasesessionId- the optional session ID for session-aware release
-
getSessionStats
Gets statistics about managed sessions.- Returns:
- SessionStats with current metrics
-
getActiveManagedSessions
Gets all active managed sessions.- Returns:
- a collection of active ManagedSession objects
-
listSessions
Lists active CP sessions for administrative inspection.- Returns:
- immutable session summaries sorted by session ID
-
getSessionManagerForTesting
Gets the SessionManager for direct access (tests only).- Returns:
- the SessionManager instance
-
getTokenGeneratorForTesting
Gets the FencingTokenGenerator for direct access (tests only).- Returns:
- the FencingTokenGenerator instance
-
getLockCount
public int getLockCount()Gets the number of locks currently managed.- Returns:
- the number of locks
-
getSemaphoreCount
public int getSemaphoreCount()Gets the number of semaphores currently managed.- Returns:
- the number of semaphores
-
getAtomicLongCount
public int getAtomicLongCount()Gets the number of atomic longs currently managed.- Returns:
- the number of atomic longs
-
getAtomicReferenceCount
public int getAtomicReferenceCount()Gets the number of atomic references currently managed.- Returns:
- the number of atomic references
-
getCountDownLatchCount
public int getCountDownLatchCount()Gets the number of count down latches currently managed.- Returns:
- the number of latches
-
getSessionCount
public int getSessionCount()Gets the number of active sessions.- Returns:
- the number of sessions
-
destroyLock
Destroys a lock by name.- Parameters:
lockName- the name of the lock (must not be null)- Returns:
- true if the lock was removed, false if it didn't exist
- Throws:
NullPointerException- if lockName is null
-
destroySemaphore
Destroys a semaphore by name.- Parameters:
semaphoreName- the name of the semaphore (must not be null)- Returns:
- true if the semaphore was removed, false if it didn't exist
- Throws:
NullPointerException- if semaphoreName is null
-
destroyAtomicLong
Destroys an atomic long by name.- Parameters:
atomicName- the name of the atomic long (must not be null)- Returns:
- true if the atomic long was removed, false if it didn't exist
- Throws:
NullPointerException- if atomicName is null
-
destroyAtomicReference
Destroys an atomic reference by name.- Parameters:
refName- the name of the atomic reference (must not be null)- Returns:
- true if the atomic reference was removed, false if it didn't exist
- Throws:
NullPointerException- if refName is null
-
destroyCountDownLatch
Destroys a count down latch by name.- Parameters:
latchName- the name of the latch (must not be null)- Returns:
- true if the latch was removed, false if it didn't exist
- Throws:
NullPointerException- if latchName is null
-
reset
Resets all local CP subsystem state without touching AP data structures.This is intended for operator recovery when CP metadata/state must be discarded. Existing primitive handles are invalidated where supported; callers should recreate CP proxies after the reset completes.
- Returns:
- counts of CP resources removed by the reset
-
takeSnapshot
Creates a serializable snapshot of all CP primitive state.Captures: atomic long names+values, lock names+held state, semaphore names+available permits+initial permits+session awareness, latch names+counts, and the fencing token generator counter.
- Returns:
- a HashMap representing the full CP state, suitable for Kryo serialization
-
restoreFromSnapshot
Restores all CP primitive state from a snapshot.Clears existing primitives and recreates them from the snapshot data. Held lock state is restored from the snapshot's session/fence metadata so linearizable lock ownership survives snapshot replay. Original thread identity is JVM-local and is not preserved across restore.
- Parameters:
snapshot- the snapshot data (must not be null)
-
shutdown
public void shutdown()Shuts down the consistency subsystem, stopping the session expiration thread. Must be called during server shutdown to prevent resource leaks. -
toString
-