Class WatchManager
Clients can watch a single key or a prefix pattern (e.g., "user:*").
When a watched key or prefix is mutated, all registered watchers receive
a WatchEvent via their WatchListener callback.
Watch lifecycle:
1. Call watch(String, WatchListener) to watch a specific key
2. Call watchPrefix(String, WatchListener) to watch all keys matching a prefix
3. Call cancel(long) with the returned watchId to cancel the watch
Thread-safe: uses ConcurrentHashMap for all tracking structures.
Event notification is performed asynchronously using virtual threads to avoid
blocking the mutation operation.
- Since:
- 1.5
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic enumEnumeration of event types for streaming events.static final recordStatistics snapshot for event streaming. -
Constructor Summary
ConstructorsConstructorDescriptionCreates a new WatchManager with default buffer and queue settings.WatchManager(int maxBufferSize, int maxQueueSizePerWatcher) Creates a new WatchManager with configurable buffer and queue sizes. -
Method Summary
Modifier and TypeMethodDescriptionbooleancancel(long watchId) Cancel a watch.voidcancelAllWatchesForConnection(String connectionId) Cancel all watches associated with a specific connection.booleancancelWatch(long watchId) Alias forcancel(long)for semantic clarity.intcompactHistory(long beforeRevision) Compact event history by removing events older than a specified revision.Get the list of active stream IDs.intGet the number of active watches.intGet the current size of the event buffer.getBufferedEvents(long fromRevision) Retrieves all buffered events with revision >= fromRevision.getBufferedEvents(String keyOrPrefix, long fromRevision) Retrieves buffered watch events from a given revision onward.longGet the oldest available revision in the event history.longGet the latest revision in the event history.Get statistics about active event streams.intGet the number of keys being watched (exact matches).intGet the number of prefixes being watched.intGet the number of range watches.voidnotifyWatchers(WatchEvent event) Notify all watchers that a key has been mutated.voidshutdown()Shuts down the notification executor.booleanstopStream(String streamId) Stop a specific event stream.streamEvents(String prefix, Consumer<WatchEvent> consumer) Start streaming all events matching a prefix to the given consumer.longwatch(String key, WatchListener listener) Register a watch on a single key.longwatch(String key, WatchListener listener, @Nullable String connectionId) Register a watch on a single key, associated with a connection.longwatchFromRevision(String key, long startRevision, WatchListener listener) Register a watch on a single key, resuming from a specific revision.longwatchPrefix(String prefix, WatchListener listener) Register a watch on all keys matching a prefix.longwatchPrefix(String prefix, WatchListener listener, @Nullable String connectionId) Register a watch on all keys matching a prefix, associated with a connection.longwatchRange(String startKey, String endKey, WatchListener listener) Register a watch on a range of keys [startKey, endKey).
-
Constructor Details
-
WatchManager
public WatchManager()Creates a new WatchManager with default buffer and queue settings. Default: 1000-event buffer, 500-event per-watcher queue. -
WatchManager
public WatchManager(int maxBufferSize, int maxQueueSizePerWatcher) Creates a new WatchManager with configurable buffer and queue sizes.- Parameters:
maxBufferSize- maximum number of events to buffer (must be positive)maxQueueSizePerWatcher- maximum queue size per watcher (must be positive)- Throws:
IllegalArgumentException- if sizes are not positive
-
-
Method Details
-
watch
Register a watch on a single key.- Parameters:
key- the key to watch (must not be null)listener- callback to invoke on mutations (must not be null)- Returns:
- a unique watch ID; pass this to
cancel(long)to stop watching - Throws:
NullPointerException- if key or listener is null
-
watch
Register a watch on a single key, associated with a connection.- Parameters:
key- the key to watch (must not be null)listener- callback to invoke on mutations (must not be null)connectionId- the connection ID to associate with this watch (may be null)- Returns:
- a unique watch ID; pass this to
cancel(long)to stop watching - Throws:
NullPointerException- if key or listener is null
-
watchPrefix
Register a watch on all keys matching a prefix.A prefix of "user:" will match keys like "user:123", "user:456", etc. Matching is done by simple string prefix, not glob patterns.
- Parameters:
prefix- the key prefix to watch (e.g., "user:", must not be null)listener- callback to invoke on mutations (must not be null)- Returns:
- a unique watch ID; pass this to
cancel(long)to stop watching - Throws:
NullPointerException- if prefix or listener is null
-
watchPrefix
Register a watch on all keys matching a prefix, associated with a connection.- Parameters:
prefix- the key prefix to watch (must not be null)listener- callback to invoke on mutations (must not be null)connectionId- the connection ID to associate with this watch (may be null)- Returns:
- a unique watch ID; pass this to
cancel(long)to stop watching - Throws:
NullPointerException- if prefix or listener is null
-
watchFromRevision
Register a watch on a single key, resuming from a specific revision.Replays buffered events from the given revision, then continues watching for new events. This enables clients to resume watching from where they left off.
- Parameters:
key- the key to watch (must not be null)startRevision- the minimum revision to replay fromlistener- callback to invoke on events- Returns:
- a unique watch ID
- Throws:
NullPointerException- if key or listener is null
-
watchRange
Register a watch on a range of keys [startKey, endKey).A range watch receives events for all keys k where startKey invalid input: '<'= k invalid input: '<' endKey. This matches etcd's range watch semantics.
- Parameters:
startKey- the beginning of the range (inclusive, must not be null)endKey- the end of the range (exclusive, must not be null)listener- callback to invoke on mutations (must not be null)- Returns:
- a unique watch ID; pass this to
cancel(long)to stop watching - Throws:
NullPointerException- if startKey, endKey, or listener is nullIllegalArgumentException- if startKey >= endKey
-
cancel
public boolean cancel(long watchId) Cancel a watch.The watch will no longer receive events after this call returns.
- Parameters:
watchId- the watch ID returned bywatch(String, WatchListener)orwatchPrefix(String, WatchListener)- Returns:
- true if the watch existed and was canceled; false if it didn't exist
-
notifyWatchers
Notify all watchers that a key has been mutated.This method is called by
DistributedMapafter a put or delete operation. Events are delivered asynchronously using virtual threads to avoid blocking the mutation operation.Events are also buffered for history replay.
- Parameters:
event- the watch event to deliver (must not be null)- Throws:
NullPointerException- if event is null
-
getBufferedEvents
Retrieves buffered watch events from a given revision onward.Returns events that match the given key/prefix pattern and have revision >= fromRevision. Useful for replaying history when a watcher reconnects.
- Parameters:
keyOrPrefix- the key or prefix to filter byfromRevision- minimum revision to include- Returns:
- an unmodifiable list of matching buffered events
-
getBufferedEvents
Retrieves all buffered events with revision >= fromRevision.- Parameters:
fromRevision- minimum revision to include- Returns:
- an unmodifiable list of all buffered events matching the revision
-
getCompactRevision
public long getCompactRevision()Get the oldest available revision in the event history.This is the compact revision: the earliest revision that can still be replayed. Events before this revision have been evicted from the buffer.
- Returns:
- the oldest revision available in buffer, or 0 if buffer is empty
-
getLatestRevision
public long getLatestRevision()Get the latest revision in the event history.This is the most recent revision that has been recorded.
- Returns:
- the latest revision available in buffer, or 0 if buffer is empty
-
cancelWatch
public boolean cancelWatch(long watchId) Alias forcancel(long)for semantic clarity. Cancels a watch with the given ID.- Parameters:
watchId- the watch ID to cancel- Returns:
- true if the watch existed and was canceled; false otherwise
-
getActiveWatchCount
public int getActiveWatchCount()Get the number of active watches. -
getWatchedKeyCount
public int getWatchedKeyCount()Get the number of keys being watched (exact matches). -
getWatchedPrefixCount
public int getWatchedPrefixCount()Get the number of prefixes being watched. -
getWatchedRangeCount
public int getWatchedRangeCount()Get the number of range watches. -
getBufferedEventCount
public int getBufferedEventCount()Get the current size of the event buffer. -
cancelAllWatchesForConnection
Cancel all watches associated with a specific connection. Must be called when a client disconnects to prevent subscription leaks.- Parameters:
connectionId- the connection identifier whose watches should be cancelled
-
shutdown
public void shutdown()Shuts down the notification executor. Should be called when the WatchManager is no longer needed. -
streamEvents
Start streaming all events matching a prefix to the given consumer.Creates a streaming watch that continuously delivers events matching the prefix. Events are delivered asynchronously using virtual threads. The caller must eventually call
stopStream(String)to clean up resources.- Parameters:
prefix- the key prefix to match (e.g., "user:", must not be null)consumer- callback to invoke for each event (must not be null)- Returns:
- a unique stream ID that can be used with
stopStream(String) - Throws:
NullPointerException- if prefix or consumer is null
-
stopStream
Stop a specific event stream.Cancels the underlying watch and marks the stream as inactive. Subsequent events will not be delivered to the consumer.
- Parameters:
streamId- the stream ID returned bystreamEvents(String, Consumer)- Returns:
- true if the stream existed and was stopped; false if it didn't exist
-
getActiveStreams
-
getStreamStats
Get statistics about active event streams.Returns a snapshot of current streaming statistics including total events processed, active stream count, event rate, and average delivery latency.
- Returns:
- a WatchStreamStats snapshot
-
compactHistory
public int compactHistory(long beforeRevision) Compact event history by removing events older than a specified revision.This method removes buffered events with revision less than beforeRevision, allowing the system to reclaim memory from old events that are no longer needed for replay. Use this in conjunction with checkpoint/snapshot mechanisms to ensure clients don't rely on very old event history.
- Parameters:
beforeRevision- the revision threshold; events with revision invalid input: '<' beforeRevision are removed- Returns:
- the number of events removed
-