Class BackpressureController

java.lang.Object
com.loomcache.server.network.BackpressureController

public class BackpressureController extends Object
Backpressure and flow control mechanism for distributed cache networking.

Implements watermark-based backpressure on a per-peer basis to prevent overwhelming individual peers with outbound messages. When a peer's outbound queue depth exceeds the high watermark, canSend(String) returns false until the queue drains back to the low watermark.

Watermark-based flow control:

  • High watermark (default 1000 messages): stop accepting new messages
  • Low watermark (default 500 messages): resume accepting messages
  • Once high watermark is hit, canSend(peerId) returns false until queue drains to low watermark, preventing thrashing

Metrics tracked per peer:

  • Current queue depth (message count)
  • Pending bytes (secondary metric)
  • Total messages throttled (dropped or rejected)
  • Peak queue depth
  • Throttle state (in-backpressure or recovering)

Thread safety: All operations are thread-safe using concurrent data structures (ConcurrentHashMap, AtomicInteger, AtomicLong) and a ReadWriteLock for peer metadata. Suitable for high-concurrency environments with virtual threads.

Logging:

  • INFO when peer enters backpressure
  • INFO when peer recovers from backpressure
  • WARN if messages are dropped due to backpressure
Since:
1.0
  • Constructor Details

    • BackpressureController

      public BackpressureController()
      Creates a BackpressureController with default watermarks.

      Default watermarks:

      • High: 1000 messages
      • Low: 500 messages
    • BackpressureController

      public BackpressureController(int highWatermark, int lowWatermark)
      Creates a BackpressureController with custom watermarks.
      Parameters:
      highWatermark - queue depth at which to begin backpressure
      lowWatermark - queue depth at which to resume accepting messages
      Throws:
      IllegalArgumentException - if watermarks are invalid
  • Method Details

    • canSend

      public boolean canSend(String peerId)
      Check if we can send a message to a specific peer.

      Returns false if the peer is in backpressure (queue >= high watermark). Once backpressure is triggered, it remains true until the queue drains back to the low watermark, preventing thrashing.

      Parameters:
      peerId - the peer identifier (non-null)
      Returns:
      true if we can safely send, false if peer is backpressured
      Throws:
      NullPointerException - if peerId is null
    • tryReserveSendSlot

      public boolean tryReserveSendSlot(String peerId)
      Atomically reserve one outbound send slot for a peer.

      This combines the admission check and queue-depth increment into a single per-peer critical section so concurrent senders cannot overshoot the high watermark by racing canSend(String) and messageSent(String) separately.

      Parameters:
      peerId - the peer identifier
      Returns:
      true if the slot was reserved, false if the peer is currently backpressured
    • checkAndThrowIfBackpressured

      public void checkAndThrowIfBackpressured(String peerId)
      Checks if a peer is backpressured and throws BackpressureException if so.

      This is a convenience method that combines canSend(String) with exception throwing. Use this when you want to fail fast with an exception rather than checking a boolean return value.

      Parameters:
      peerId - the peer identifier
      Throws:
      BackpressureException - if the peer is currently backpressured
    • messageSent

      public void messageSent(String peerId)
      Record that a message has been sent to a peer.

      Increments the outbound queue depth for this peer. Should be called immediately after a message is enqueued for transmission (before it's actually written to the socket), to maintain accurate backpressure state.

      Parameters:
      peerId - the peer identifier
    • messageAcked

      public void messageAcked(String peerId)
      Record that a message sent to a peer has been acknowledged.

      Decrements the outbound queue depth for this peer when the remote peer acknowledges receipt. This allows the queue to drain and eventually exit backpressure.

      Parameters:
      peerId - the peer identifier
    • messageDropped

      public void messageDropped(String peerId)
      Record that a message to a peer has been dropped due to backpressure.

      Increments the throttled message counter for this peer and globally. Logs a warning to alert operators of flow control events.

      Parameters:
      peerId - the peer identifier
    • addPendingBytes

      public void addPendingBytes(String peerId, long bytes)
      Record pending bytes for a peer (secondary metric).

      Tracks the total size of pending outbound data to this peer. This metric is useful for monitoring memory consumption but does not directly affect backpressure decisions (which use message count).

      Parameters:
      peerId - the peer identifier
      bytes - number of bytes to add to the pending count
    • clearPendingBytes

      public void clearPendingBytes(String peerId, long bytes)
      Clear pending bytes for a peer (e.g., after flush).
      Parameters:
      peerId - the peer identifier
      bytes - number of bytes to subtract from the pending count
    • isOverloaded

      public boolean isOverloaded()
      Check if this node is globally overloaded.

      Returns true if any tracked peer is currently in backpressure, indicating the node cannot keep up with outbound replication. This is a node-level check suitable for rejecting incoming write commands at the network layer.

      Returns:
      true if the node is under backpressure
    • forceOverloadedForTests

      public void forceOverloadedForTests(boolean overloaded)
      Deterministically force global overload for integration tests exercising the real RESPONSE_SERVER_BUSY path without manufacturing peer-queue saturation.
    • isForcedOverloadedForTests

      public boolean isForcedOverloadedForTests()
    • peerRemoved

      public void peerRemoved(String peerId)
      Clean up tracking for a removed peer.

      Should be called when a peer disconnects or is removed from the cluster to release tracking resources.

      Parameters:
      peerId - the peer identifier
    • getStats

      public @Nullable BackpressureController.BackpressureStats getStats(String peerId)
      Get comprehensive backpressure statistics for a specific peer.
      Parameters:
      peerId - the peer identifier (non-null)
      Returns:
      a BackpressureStats object containing metrics, or null if peer is unknown
      Throws:
      NullPointerException - if peerId is null
    • setHighWatermark

      public void setHighWatermark(int newHighWatermark)
      Update the high watermark threshold.

      Changes take effect immediately. Peers currently at or above the new threshold will enter backpressure on the next canSend(String) check.

      Parameters:
      newHighWatermark - the new high watermark threshold
      Throws:
      IllegalArgumentException - if less than or equal to the current low watermark
    • setLowWatermark

      public void setLowWatermark(int newLowWatermark)
      Update the low watermark threshold.

      Changes take effect immediately. Peers in backpressure will check against the new threshold on the next canSend(String) call.

      Parameters:
      newLowWatermark - the new low watermark threshold
      Throws:
      IllegalArgumentException - if greater than or equal to the current high watermark
    • getTotalMessagesSent

      public long getTotalMessagesSent()
      Get total messages sent across all peers since startup.
      Returns:
      total message count
    • getTotalMessagesThrottled

      public long getTotalMessagesThrottled()
      Get total messages dropped due to backpressure across all peers since startup.
      Returns:
      total throttled message count
    • getPeerCount

      public int getPeerCount()
      Get the number of currently tracked peers.
      Returns:
      peer count
    • resetGlobalMetrics

      public void resetGlobalMetrics()
      Reset all global metrics (but not per-peer state).

      Useful for benchmarking or diagnostic resets. Per-peer queue depths and backpressure states are not affected.

    • resetAllMetrics

      public void resetAllMetrics()
      Reset all per-peer metrics and backpressure state.

      Use with caution—this clears all tracking data and may cause message accounting issues if peers are still sending/receiving.