Class WatchManager

java.lang.Object
com.loomcache.server.watch.WatchManager

public class WatchManager extends Object
Manages active watches on keys and key prefixes.

Clients can watch a single key or a prefix pattern (e.g., "user:*"). When a watched key or prefix is mutated, all registered watchers receive a WatchEvent via their WatchListener callback.

Watch lifecycle: 1. Call watch(String, WatchListener) to watch a specific key 2. Call watchPrefix(String, WatchListener) to watch all keys matching a prefix 3. Call cancel(long) with the returned watchId to cancel the watch

Thread-safe: uses ConcurrentHashMap for all tracking structures. Event notification is performed asynchronously using virtual threads to avoid blocking the mutation operation.

Since:
1.5
  • Constructor Details

    • WatchManager

      public WatchManager()
      Creates a new WatchManager with default buffer and queue settings. Default: 1000-event buffer, 500-event per-watcher queue.
    • WatchManager

      public WatchManager(int maxBufferSize, int maxQueueSizePerWatcher)
      Creates a new WatchManager with configurable buffer and queue sizes.
      Parameters:
      maxBufferSize - maximum number of events to buffer (must be positive)
      maxQueueSizePerWatcher - maximum queue size per watcher (must be positive)
      Throws:
      IllegalArgumentException - if sizes are not positive
  • Method Details

    • watch

      public long watch(String key, WatchListener listener)
      Register a watch on a single key.
      Parameters:
      key - the key to watch (must not be null)
      listener - callback to invoke on mutations (must not be null)
      Returns:
      a unique watch ID; pass this to cancel(long) to stop watching
      Throws:
      NullPointerException - if key or listener is null
    • watch

      public long watch(String key, WatchListener listener, @Nullable String connectionId)
      Register a watch on a single key, associated with a connection.
      Parameters:
      key - the key to watch (must not be null)
      listener - callback to invoke on mutations (must not be null)
      connectionId - the connection ID to associate with this watch (may be null)
      Returns:
      a unique watch ID; pass this to cancel(long) to stop watching
      Throws:
      NullPointerException - if key or listener is null
    • watchPrefix

      public long watchPrefix(String prefix, WatchListener listener)
      Register a watch on all keys matching a prefix.

      A prefix of "user:" will match keys like "user:123", "user:456", etc. Matching is done by simple string prefix, not glob patterns.

      Parameters:
      prefix - the key prefix to watch (e.g., "user:", must not be null)
      listener - callback to invoke on mutations (must not be null)
      Returns:
      a unique watch ID; pass this to cancel(long) to stop watching
      Throws:
      NullPointerException - if prefix or listener is null
    • watchPrefix

      public long watchPrefix(String prefix, WatchListener listener, @Nullable String connectionId)
      Register a watch on all keys matching a prefix, associated with a connection.
      Parameters:
      prefix - the key prefix to watch (must not be null)
      listener - callback to invoke on mutations (must not be null)
      connectionId - the connection ID to associate with this watch (may be null)
      Returns:
      a unique watch ID; pass this to cancel(long) to stop watching
      Throws:
      NullPointerException - if prefix or listener is null
    • watchFromRevision

      public long watchFromRevision(String key, long startRevision, WatchListener listener)
      Register a watch on a single key, resuming from a specific revision.

      Replays buffered events from the given revision, then continues watching for new events. This enables clients to resume watching from where they left off.

      Parameters:
      key - the key to watch (must not be null)
      startRevision - the minimum revision to replay from
      listener - callback to invoke on events
      Returns:
      a unique watch ID
      Throws:
      NullPointerException - if key or listener is null
    • watchRange

      public long watchRange(String startKey, String endKey, WatchListener listener)
      Register a watch on a range of keys [startKey, endKey).

      A range watch receives events for all keys k where startKey invalid input: '<'= k invalid input: '<' endKey. This matches etcd's range watch semantics.

      Parameters:
      startKey - the beginning of the range (inclusive, must not be null)
      endKey - the end of the range (exclusive, must not be null)
      listener - callback to invoke on mutations (must not be null)
      Returns:
      a unique watch ID; pass this to cancel(long) to stop watching
      Throws:
      NullPointerException - if startKey, endKey, or listener is null
      IllegalArgumentException - if startKey >= endKey
    • cancel

      public boolean cancel(long watchId)
      Cancel a watch.

      The watch will no longer receive events after this call returns.

      Parameters:
      watchId - the watch ID returned by watch(String, WatchListener) or watchPrefix(String, WatchListener)
      Returns:
      true if the watch existed and was canceled; false if it didn't exist
    • notifyWatchers

      public void notifyWatchers(WatchEvent event)
      Notify all watchers that a key has been mutated.

      This method is called by DistributedMap after a put or delete operation. Events are delivered asynchronously using virtual threads to avoid blocking the mutation operation.

      Events are also buffered for history replay.

      Parameters:
      event - the watch event to deliver (must not be null)
      Throws:
      NullPointerException - if event is null
    • getBufferedEvents

      public List<WatchEvent> getBufferedEvents(String keyOrPrefix, long fromRevision)
      Retrieves buffered watch events from a given revision onward.

      Returns events that match the given key/prefix pattern and have revision >= fromRevision. Useful for replaying history when a watcher reconnects.

      Parameters:
      keyOrPrefix - the key or prefix to filter by
      fromRevision - minimum revision to include
      Returns:
      an unmodifiable list of matching buffered events
    • getBufferedEvents

      public List<WatchEvent> getBufferedEvents(long fromRevision)
      Retrieves all buffered events with revision >= fromRevision.
      Parameters:
      fromRevision - minimum revision to include
      Returns:
      an unmodifiable list of all buffered events matching the revision
    • getCompactRevision

      public long getCompactRevision()
      Get the oldest available revision in the event history.

      This is the compact revision: the earliest revision that can still be replayed. Events before this revision have been evicted from the buffer.

      Returns:
      the oldest revision available in buffer, or 0 if buffer is empty
    • getLatestRevision

      public long getLatestRevision()
      Get the latest revision in the event history.

      This is the most recent revision that has been recorded.

      Returns:
      the latest revision available in buffer, or 0 if buffer is empty
    • cancelWatch

      public boolean cancelWatch(long watchId)
      Alias for cancel(long) for semantic clarity. Cancels a watch with the given ID.
      Parameters:
      watchId - the watch ID to cancel
      Returns:
      true if the watch existed and was canceled; false otherwise
    • getActiveWatchCount

      public int getActiveWatchCount()
      Get the number of active watches.
    • getWatchedKeyCount

      public int getWatchedKeyCount()
      Get the number of keys being watched (exact matches).
    • getWatchedPrefixCount

      public int getWatchedPrefixCount()
      Get the number of prefixes being watched.
    • getWatchedRangeCount

      public int getWatchedRangeCount()
      Get the number of range watches.
    • getBufferedEventCount

      public int getBufferedEventCount()
      Get the current size of the event buffer.
    • cancelAllWatchesForConnection

      public void cancelAllWatchesForConnection(String connectionId)
      Cancel all watches associated with a specific connection. Must be called when a client disconnects to prevent subscription leaks.
      Parameters:
      connectionId - the connection identifier whose watches should be cancelled
    • shutdown

      public void shutdown()
      Shuts down the notification executor. Should be called when the WatchManager is no longer needed.
    • streamEvents

      public String streamEvents(String prefix, Consumer<WatchEvent> consumer)
      Start streaming all events matching a prefix to the given consumer.

      Creates a streaming watch that continuously delivers events matching the prefix. Events are delivered asynchronously using virtual threads. The caller must eventually call stopStream(String) to clean up resources.

      Parameters:
      prefix - the key prefix to match (e.g., "user:", must not be null)
      consumer - callback to invoke for each event (must not be null)
      Returns:
      a unique stream ID that can be used with stopStream(String)
      Throws:
      NullPointerException - if prefix or consumer is null
    • stopStream

      public boolean stopStream(String streamId)
      Stop a specific event stream.

      Cancels the underlying watch and marks the stream as inactive. Subsequent events will not be delivered to the consumer.

      Parameters:
      streamId - the stream ID returned by streamEvents(String, Consumer)
      Returns:
      true if the stream existed and was stopped; false if it didn't exist
    • getActiveStreams

      public List<String> getActiveStreams()
      Get the list of active stream IDs.
      Returns:
      an unmodifiable list of currently active stream IDs
    • getStreamStats

      public WatchManager.WatchStreamStats getStreamStats()
      Get statistics about active event streams.

      Returns a snapshot of current streaming statistics including total events processed, active stream count, event rate, and average delivery latency.

      Returns:
      a WatchStreamStats snapshot
    • compactHistory

      public int compactHistory(long beforeRevision)
      Compact event history by removing events older than a specified revision.

      This method removes buffered events with revision less than beforeRevision, allowing the system to reclaim memory from old events that are no longer needed for replay. Use this in conjunction with checkpoint/snapshot mechanisms to ensure clients don't rely on very old event history.

      Parameters:
      beforeRevision - the revision threshold; events with revision invalid input: '<' beforeRevision are removed
      Returns:
      the number of events removed