Class TcpServer
java.lang.Object
com.loomcache.server.network.TcpServer
TCP server using Java 25 virtual threads.
Architecture: - One virtual thread per connection (blocking I/O is fine — vthreads unmount on block) - ServerSocket.accept() in a loop on a virtual thread - Each connection gets its own read loop, dispatching messages to a handler - Outbound writes are synchronized per-connection to prevent interleaving
Tuning: - TCP_NODELAY=true (disable Nagle — low latency) - SO_SNDBUF/SO_RCVBUF=256KB (high throughput) - SO_KEEPALIVE=true (detect dead connections)
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic final recordBandwidth statistics for the server.static final recordInformation about a single connection.static final recordServer health status snapshot.static final recordServer-wide statistics snapshot. -
Constructor Summary
ConstructorsConstructorDescriptionTcpServer(int port, String nodeId, int instanceNumber, MessageHandler messageHandler) TcpServer(int port, String nodeId, int instanceNumber, MessageHandler messageHandler, @Nullable TlsConfig tlsConfig, boolean pipelinedExecution, int commandQueueCapacity, int commandExecutorThreads, @Nullable io.micrometer.core.instrument.MeterRegistry meterRegistry) TcpServer(int port, String nodeId, int instanceNumber, MessageHandler messageHandler, EndpointTlsConfig endpointTlsConfig, boolean pipelinedExecution, int commandQueueCapacity, int commandExecutorThreads) TcpServer(int port, String nodeId, int instanceNumber, MessageHandler messageHandler, TlsConfig tlsConfig) TcpServer(int port, String nodeId, int instanceNumber, MessageHandler messageHandler, TlsConfig tlsConfig, boolean pipelinedExecution, int commandQueueCapacity, int commandExecutorThreads) TcpServer(@Nullable String bindHost, int port, String nodeId, int instanceNumber, MessageHandler messageHandler, EndpointTlsConfig endpointTlsConfig, boolean pipelinedExecution, int commandQueueCapacity, int commandExecutorThreads) -
Method Summary
Modifier and TypeMethodDescriptionvoidaddSocketInterceptor(SocketInterceptor interceptor) Register an inbound socket interceptor invoked after accept/socket configuration and before connection registration.voidblockOutboundTo(String peerId) Block outbound messages to a specific peer.voidvoidclearClientResponseDrops(MessageType requestType) voidclearClientResponseDrops(MessageType requestType, @Nullable MessageType responseType) voidvoidcloseConnection(String peerId) Close a connection by peer identifier.intvoidInitiate an outbound connection to another node (no cluster ID).voidInitiate an outbound connection to another node.voiddelayNextOutboundTo(String peerId, MessageType messageType, long delayMs) Delay the next outbound peer message of the given type to the target peer.intdisconnectClient(String clientAddress) Force disconnect all connections from a specific client IP.voiddisconnectPeer(String peerId) Disconnect a peer transport and remove its canonical connection mapping.booleandrain(long timeoutMs) Wait for in-flight requests to complete with a timeout.voiddropNextClientResponseForRequest(MessageType requestType) Drop the next client response emitted for the given request type.voiddropNextClientResponseForRequest(MessageType requestType, MessageType responseType) Drop the next client response emitted for the given request and response types.voiddropNextClientResponsesForRequest(MessageType requestType, int count) Drop the nextcountclient responses emitted for the given request type.voiddropNextClientResponsesForRequest(MessageType requestType, @Nullable MessageType responseType, int count) Drop the nextcountclient responses emitted for the given request and response types.voiddropNextOutboundTo(String peerId, MessageType messageType) Drop the next outbound peer message of the given type to the target peer.voidduplicateNextOutboundTo(String peerId, MessageType messageType) Duplicate the next outbound peer message of the given type to the target peer.voidforceClientBackpressureForTests(boolean forced) Deterministically force or clear client-facing backpressure responses for tests.intCurrent number of active connections.Get current bandwidth statistics.longgetConnectionAge(String connectionId) Get the age of a connection in milliseconds since it was established.Get information about all active connections.getConnectionsByClient(String clientAddress) Get all connections from a specific client IP address.intCurrent number of idle connections (idle >= configured idle timeout).intgetPort()Get a comprehensive health status snapshot of the server.getStats()Get comprehensive server statistics snapshot.booleanCheck if the server is currently accepting new connections.booleanisCurrentConnection(String connectionId, long createdAtNanos) Check whether the current live connection for a peer matches the supplied session token.booleanCheck if the server is in draining state (no new connections accepted, but existing connections allowed to continue).booleanCheck if the server is healthy and accepting connections.booleanisOutboundBlocked(String peerId) Check if outbound messages to a specific peer are currently blocked.booleanbooleanvoidPause accepting new connections without stopping the server.booleanpromoteConnectionIdentity(ConnectionContext ctx, String peerId) Promote a temporary connection identity to a validated peer ID after the cluster handshake succeeds.voidrecordBytesReceived(long bytes) Record bytes received on a connection.voidrecordBytesSent(long bytes) Record bytes sent on a connection.voidrecordRequest(long latencyNanos) Record a request processed by the server.intremainingClientResponseDropsForRequest(MessageType requestType, @Nullable MessageType responseType) booleanremoveSocketInterceptor(SocketInterceptor interceptor) Remove a previously registered inbound socket interceptor.voidResume accepting new connections after pausing.booleansendMessage(String peerId, Message msg) Send a message to a specific connected peer.voidsetConnectionLimit(int maxConnections) Dynamically adjust the maximum number of connections the server will accept.voidsetGracefulShutdownTimeoutMs(long timeoutMs) Set the timeout for graceful shutdown (connection draining).voidsetIdGeneratorNodeId(int idGeneratorNodeId) voidsetIdleTimeoutMs(long timeoutMs) Set the idle timeout used by read timeouts and the idle sweep.voidsetLocalProtocolFeatures(EnumSet<ProtocolFeatures> localProtocolFeatures) voidsetMaxConnections(int max) Set the maximum number of concurrent connections.voidsetReadTimeoutMs(int timeoutMs) Set the socket read timeout used for accepted connections.voidsetSlowOperationDetector(@Nullable SlowOperationDetector detector) voidsetStaticRaftBootstrapFingerprint(@Nullable String staticRaftBootstrapFingerprint) voidstart()Start the TCP server.voidstop()Stop the server gracefully.voidStop accepting new client connections and requests, but allow existing connections to continue processing in-flight requests.longTotal inbound connections accepted since start.longTotal messages rejected (legacy placeholder, always returns 0).longTotal connections closed since start.longTotal connections closed due to idle timeout.longTotal messages received since start.longTotal messages sent since start.longTotal outbound connections established since start.longTotal send errors since start.voidRestore outbound messages to all peers, clearing all blocked links.voidunblockOutboundTo(String peerId) Restore outbound messages to a specific peer.int
-
Constructor Details
-
TcpServer
public TcpServer(int port, String nodeId, int instanceNumber, MessageHandler messageHandler, TlsConfig tlsConfig) -
TcpServer
-
TcpServer
public TcpServer(int port, String nodeId, int instanceNumber, MessageHandler messageHandler, TlsConfig tlsConfig, boolean pipelinedExecution, int commandQueueCapacity, int commandExecutorThreads) -
TcpServer
public TcpServer(int port, String nodeId, int instanceNumber, MessageHandler messageHandler, EndpointTlsConfig endpointTlsConfig, boolean pipelinedExecution, int commandQueueCapacity, int commandExecutorThreads) -
TcpServer
public TcpServer(@Nullable String bindHost, int port, String nodeId, int instanceNumber, MessageHandler messageHandler, EndpointTlsConfig endpointTlsConfig, boolean pipelinedExecution, int commandQueueCapacity, int commandExecutorThreads) -
TcpServer
public TcpServer(int port, String nodeId, int instanceNumber, MessageHandler messageHandler, @Nullable TlsConfig tlsConfig, boolean pipelinedExecution, int commandQueueCapacity, int commandExecutorThreads, @Nullable io.micrometer.core.instrument.MeterRegistry meterRegistry)
-
-
Method Details
-
setIdGeneratorNodeId
public void setIdGeneratorNodeId(int idGeneratorNodeId) -
setStaticRaftBootstrapFingerprint
-
setLocalProtocolFeatures
-
getPort
public int getPort() -
getBindAddress
-
setSlowOperationDetector
-
start
Start the TCP server. Non-blocking — returns immediately. The accept loop runs on a virtual thread.- Throws:
IOException
-
stop
public void stop()Stop the server gracefully. Closes all connections. -
sendMessage
-
connectTo
-
connectTo
-
isRunning
public boolean isRunning() -
promoteConnectionIdentity
Promote a temporary connection identity to a validated peer ID after the cluster handshake succeeds. -
isDraining
public boolean isDraining()Check if the server is in draining state (no new connections accepted, but existing connections allowed to continue). -
stopAcceptingNewConnections
public void stopAcceptingNewConnections()Stop accepting new client connections and requests, but allow existing connections to continue processing in-flight requests. This is used during graceful shutdown. -
drain
public boolean drain(long timeoutMs) Wait for in-flight requests to complete with a timeout. Returns immediately if there are no connections.- Parameters:
timeoutMs- the timeout in milliseconds to wait for requests to complete- Returns:
- true if all connections were closed cleanly, false if timeout expired
-
connectionCount
public int connectionCount() -
verifiedClusterConnectionCount
public int verifiedClusterConnectionCount() -
isTlsEnabled
public boolean isTlsEnabled() -
addSocketInterceptor
Register an inbound socket interceptor invoked after accept/socket configuration and before connection registration. -
removeSocketInterceptor
Remove a previously registered inbound socket interceptor. -
closeConnection
Close a connection by peer identifier. Intended for external health-monitor callbacks that need to tear down the transport. -
disconnectPeer
Disconnect a peer transport and remove its canonical connection mapping. -
totalAcceptedConnections
public long totalAcceptedConnections()Total inbound connections accepted since start. -
totalOutboundConnections
public long totalOutboundConnections()Total outbound connections established since start. -
totalClosedConnections
public long totalClosedConnections()Total connections closed since start. -
totalIdleClosedConnections
public long totalIdleClosedConnections()Total connections closed due to idle timeout. -
totalSendErrors
public long totalSendErrors()Total send errors since start. -
totalMessagesSent
public long totalMessagesSent()Total messages sent since start. -
totalMessagesReceived
public long totalMessagesReceived()Total messages received since start. -
getActiveConnectionCount
public int getActiveConnectionCount()Current number of active connections. -
getIdleConnectionCount
public int getIdleConnectionCount()Current number of idle connections (idle >= configured idle timeout). -
totalCircuitOpenRejections
public long totalCircuitOpenRejections()Total messages rejected (legacy placeholder, always returns 0). -
getStats
Get comprehensive server statistics snapshot. Includes connection counts, bytes transferred, requests, latency, and uptime.- Returns:
- ServerStats record with current server metrics
-
getConnectionInfo
Get information about all active connections. Returns a snapshot list of ConnectionInfo records.- Returns:
- list of ConnectionInfo for active connections
-
isHealthy
public boolean isHealthy()Check if the server is healthy and accepting connections. Returns false if the server is not running, is draining, or has reached max connections.- Returns:
- true if server can accept new connections, false otherwise
-
setMaxConnections
public void setMaxConnections(int max) Set the maximum number of concurrent connections. Enforced in acceptLoop when accepting new connections.- Parameters:
max- maximum concurrent connections
-
setReadTimeoutMs
public void setReadTimeoutMs(int timeoutMs) Set the socket read timeout used for accepted connections.- Parameters:
timeoutMs- timeout in milliseconds
-
setIdleTimeoutMs
public void setIdleTimeoutMs(long timeoutMs) Set the idle timeout used by read timeouts and the idle sweep.- Parameters:
timeoutMs- timeout in milliseconds
-
setGracefulShutdownTimeoutMs
public void setGracefulShutdownTimeoutMs(long timeoutMs) Set the timeout for graceful shutdown (connection draining). During graceful shutdown, the server waits up to this duration for active connections to complete processing before forcefully closing them.- Parameters:
timeoutMs- timeout in milliseconds
-
recordBytesReceived
public void recordBytesReceived(long bytes) Record bytes received on a connection. Called by ConnectionContext or message processing to track total bytes.- Parameters:
bytes- number of bytes received
-
recordBytesSent
public void recordBytesSent(long bytes) Record bytes sent on a connection. Called by ConnectionContext when sending messages.- Parameters:
bytes- number of bytes sent
-
recordRequest
public void recordRequest(long latencyNanos) Record a request processed by the server. Optionally provide latency for average calculation.- Parameters:
latencyNanos- request processing latency in nanoseconds
-
getConnectionsByClient
-
disconnectClient
Force disconnect all connections from a specific client IP.- Parameters:
clientAddress- the client IP address to disconnect- Returns:
- number of connections disconnected
-
setConnectionLimit
public void setConnectionLimit(int maxConnections) Dynamically adjust the maximum number of connections the server will accept. Must be positive. Setting this to a lower value does not disconnect existing connections.- Parameters:
maxConnections- the new maximum connection limit (must be > 0)
-
getBandwidthStats
Get current bandwidth statistics.- Returns:
- BandwidthStats record with total and per-second rates
-
getConnectionAge
Get the age of a connection in milliseconds since it was established.- Parameters:
connectionId- the connection ID (peer ID)- Returns:
- age in milliseconds, or -1 if connection not found
-
isCurrentConnection
Check whether the current live connection for a peer matches the supplied session token.- Parameters:
connectionId- the connection ID (peer ID)createdAtNanos- the connection creation timestamp token- Returns:
trueif the peer is still connected via that exact session
-
isAcceptingConnections
public boolean isAcceptingConnections()Check if the server is currently accepting new connections.- Returns:
- true if accepting connections, false if paused or draining
-
pauseAccepting
public void pauseAccepting()Pause accepting new connections without stopping the server. Existing connections remain active. Useful for maintenance or load shedding. -
resumeAccepting
public void resumeAccepting()Resume accepting new connections after pausing. -
getServerHealthStatus
Get a comprehensive health status snapshot of the server.- Returns:
- ServerHealthStatus record with uptime, connections, and bandwidth stats
-
blockOutboundTo
Block outbound messages to a specific peer. Messages sent viasendMessage(String, Message)to this peer will be silently dropped. The TCP connection remains alive; this simulates a transport-level fault without process death.- Parameters:
peerId- the peer ID to block outbound messages to
-
unblockOutboundTo
Restore outbound messages to a specific peer.- Parameters:
peerId- the peer ID to unblock
-
unblockAllOutbound
public void unblockAllOutbound()Restore outbound messages to all peers, clearing all blocked links. -
isOutboundBlocked
Check if outbound messages to a specific peer are currently blocked.- Parameters:
peerId- the peer ID to check- Returns:
- true if outbound messages to this peer are being dropped
-
dropNextClientResponseForRequest
Drop the next client response emitted for the given request type. -
dropNextClientResponseForRequest
Drop the next client response emitted for the given request and response types. -
dropNextClientResponsesForRequest
Drop the nextcountclient responses emitted for the given request type. -
dropNextClientResponsesForRequest
public void dropNextClientResponsesForRequest(MessageType requestType, @Nullable MessageType responseType, int count) Drop the nextcountclient responses emitted for the given request and response types. -
clearClientResponseDrops
public void clearClientResponseDrops() -
clearClientResponseDrops
-
clearClientResponseDrops
-
remainingClientResponseDropsForRequest
public int remainingClientResponseDropsForRequest(MessageType requestType, @Nullable MessageType responseType) -
dropNextOutboundTo
Drop the next outbound peer message of the given type to the target peer. -
duplicateNextOutboundTo
Duplicate the next outbound peer message of the given type to the target peer. -
delayNextOutboundTo
Delay the next outbound peer message of the given type to the target peer. -
clearOutboundMessageFaults
public void clearOutboundMessageFaults() -
forceClientBackpressureForTests
public void forceClientBackpressureForTests(boolean forced) Deterministically force or clear client-facing backpressure responses for tests.
-