Class BackpressureController
Implements watermark-based backpressure on a per-peer basis to prevent
overwhelming individual peers with outbound messages. When a peer's outbound
queue depth exceeds the high watermark, canSend(String) returns false
until the queue drains back to the low watermark.
Watermark-based flow control:
- High watermark (default 1000 messages): stop accepting new messages
- Low watermark (default 500 messages): resume accepting messages
- Once high watermark is hit,
canSend(peerId)returns false until queue drains to low watermark, preventing thrashing
Metrics tracked per peer:
- Current queue depth (message count)
- Pending bytes (secondary metric)
- Total messages throttled (dropped or rejected)
- Peak queue depth
- Throttle state (in-backpressure or recovering)
Thread safety: All operations are thread-safe using concurrent data structures (ConcurrentHashMap, AtomicInteger, AtomicLong) and a ReadWriteLock for peer metadata. Suitable for high-concurrency environments with virtual threads.
Logging:
- INFO when peer enters backpressure
- INFO when peer recovers from backpressure
- WARN if messages are dropped due to backpressure
- Since:
- 1.0
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic final recordImmutable snapshot of backpressure statistics for a single peer. -
Constructor Summary
ConstructorsConstructorDescriptionCreates a BackpressureController with default watermarks.BackpressureController(int highWatermark, int lowWatermark) Creates a BackpressureController with custom watermarks. -
Method Summary
Modifier and TypeMethodDescriptionvoidaddPendingBytes(String peerId, long bytes) Record pending bytes for a peer (secondary metric).booleanCheck if we can send a message to a specific peer.voidcheckAndThrowIfBackpressured(String peerId) Checks if a peer is backpressured and throws BackpressureException if so.voidclearPendingBytes(String peerId, long bytes) Clear pending bytes for a peer (e.g., after flush).voidforceOverloadedForTests(boolean overloaded) Deterministically force global overload for integration tests exercising the real RESPONSE_SERVER_BUSY path without manufacturing peer-queue saturation.intGet the number of currently tracked peers.@Nullable BackpressureController.BackpressureStatsGet comprehensive backpressure statistics for a specific peer.longGet total messages sent across all peers since startup.longGet total messages dropped due to backpressure across all peers since startup.booleanbooleanCheck if this node is globally overloaded.voidmessageAcked(String peerId) Record that a message sent to a peer has been acknowledged.voidmessageDropped(String peerId) Record that a message to a peer has been dropped due to backpressure.voidmessageSent(String peerId) Record that a message has been sent to a peer.voidpeerRemoved(String peerId) Clean up tracking for a removed peer.voidReset all per-peer metrics and backpressure state.voidReset all global metrics (but not per-peer state).voidsetHighWatermark(int newHighWatermark) Update the high watermark threshold.voidsetLowWatermark(int newLowWatermark) Update the low watermark threshold.booleantryReserveSendSlot(String peerId) Atomically reserve one outbound send slot for a peer.
-
Constructor Details
-
BackpressureController
public BackpressureController()Creates a BackpressureController with default watermarks.Default watermarks:
- High: 1000 messages
- Low: 500 messages
-
BackpressureController
public BackpressureController(int highWatermark, int lowWatermark) Creates a BackpressureController with custom watermarks.- Parameters:
highWatermark- queue depth at which to begin backpressurelowWatermark- queue depth at which to resume accepting messages- Throws:
IllegalArgumentException- if watermarks are invalid
-
-
Method Details
-
canSend
Check if we can send a message to a specific peer.Returns false if the peer is in backpressure (queue >= high watermark). Once backpressure is triggered, it remains true until the queue drains back to the low watermark, preventing thrashing.
- Parameters:
peerId- the peer identifier (non-null)- Returns:
- true if we can safely send, false if peer is backpressured
- Throws:
NullPointerException- if peerId is null
-
tryReserveSendSlot
Atomically reserve one outbound send slot for a peer.This combines the admission check and queue-depth increment into a single per-peer critical section so concurrent senders cannot overshoot the high watermark by racing
canSend(String)andmessageSent(String)separately.- Parameters:
peerId- the peer identifier- Returns:
- true if the slot was reserved, false if the peer is currently backpressured
-
checkAndThrowIfBackpressured
Checks if a peer is backpressured and throws BackpressureException if so.This is a convenience method that combines
canSend(String)with exception throwing. Use this when you want to fail fast with an exception rather than checking a boolean return value.- Parameters:
peerId- the peer identifier- Throws:
BackpressureException- if the peer is currently backpressured
-
messageSent
Record that a message has been sent to a peer.Increments the outbound queue depth for this peer. Should be called immediately after a message is enqueued for transmission (before it's actually written to the socket), to maintain accurate backpressure state.
- Parameters:
peerId- the peer identifier
-
messageAcked
Record that a message sent to a peer has been acknowledged.Decrements the outbound queue depth for this peer when the remote peer acknowledges receipt. This allows the queue to drain and eventually exit backpressure.
- Parameters:
peerId- the peer identifier
-
messageDropped
Record that a message to a peer has been dropped due to backpressure.Increments the throttled message counter for this peer and globally. Logs a warning to alert operators of flow control events.
- Parameters:
peerId- the peer identifier
-
addPendingBytes
Record pending bytes for a peer (secondary metric).Tracks the total size of pending outbound data to this peer. This metric is useful for monitoring memory consumption but does not directly affect backpressure decisions (which use message count).
- Parameters:
peerId- the peer identifierbytes- number of bytes to add to the pending count
-
clearPendingBytes
Clear pending bytes for a peer (e.g., after flush).- Parameters:
peerId- the peer identifierbytes- number of bytes to subtract from the pending count
-
isOverloaded
public boolean isOverloaded()Check if this node is globally overloaded.Returns true if any tracked peer is currently in backpressure, indicating the node cannot keep up with outbound replication. This is a node-level check suitable for rejecting incoming write commands at the network layer.
- Returns:
- true if the node is under backpressure
-
forceOverloadedForTests
public void forceOverloadedForTests(boolean overloaded) Deterministically force global overload for integration tests exercising the real RESPONSE_SERVER_BUSY path without manufacturing peer-queue saturation. -
isForcedOverloadedForTests
public boolean isForcedOverloadedForTests() -
peerRemoved
Clean up tracking for a removed peer.Should be called when a peer disconnects or is removed from the cluster to release tracking resources.
- Parameters:
peerId- the peer identifier
-
getStats
Get comprehensive backpressure statistics for a specific peer.- Parameters:
peerId- the peer identifier (non-null)- Returns:
- a BackpressureStats object containing metrics, or null if peer is unknown
- Throws:
NullPointerException- if peerId is null
-
setHighWatermark
public void setHighWatermark(int newHighWatermark) Update the high watermark threshold.Changes take effect immediately. Peers currently at or above the new threshold will enter backpressure on the next
canSend(String)check.- Parameters:
newHighWatermark- the new high watermark threshold- Throws:
IllegalArgumentException- if less than or equal to the current low watermark
-
setLowWatermark
public void setLowWatermark(int newLowWatermark) Update the low watermark threshold.Changes take effect immediately. Peers in backpressure will check against the new threshold on the next
canSend(String)call.- Parameters:
newLowWatermark- the new low watermark threshold- Throws:
IllegalArgumentException- if greater than or equal to the current high watermark
-
getTotalMessagesSent
public long getTotalMessagesSent()Get total messages sent across all peers since startup.- Returns:
- total message count
-
getTotalMessagesThrottled
public long getTotalMessagesThrottled()Get total messages dropped due to backpressure across all peers since startup.- Returns:
- total throttled message count
-
getPeerCount
public int getPeerCount()Get the number of currently tracked peers.- Returns:
- peer count
-
resetGlobalMetrics
public void resetGlobalMetrics()Reset all global metrics (but not per-peer state).Useful for benchmarking or diagnostic resets. Per-peer queue depths and backpressure states are not affected.
-
resetAllMetrics
public void resetAllMetrics()Reset all per-peer metrics and backpressure state.Use with caution—this clears all tracking data and may cause message accounting issues if peers are still sending/receiving.
-