Skip to content

Commit 63e4635

Browse files
committed
Fix NIO heartbeat
1 parent d71ad4c commit 63e4635

File tree

3 files changed

+11
-4
lines changed

3 files changed

+11
-4
lines changed

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void run() {
7373

7474
for (SelectionKey selectionKey : selector.keys()) {
7575
SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) selectionKey.attachment();
76-
if (state.getConnection() != null && state.getConnection().getHeartbeat() > 0) {
76+
if (state.getConnection() != null && state.getHeartbeatNanoSeconds() > 0) {
7777
long now = System.nanoTime();
7878
if ((now - state.getLastActivity()) > state.getHeartbeatNanoSeconds() * 2) {
7979
try {

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.IOException;
2525
import java.net.InetAddress;
2626
import java.net.SocketException;
27+
import java.time.Duration;
2728

2829
/**
2930
*
@@ -61,6 +62,9 @@ public int getPort() {
6162
@Override
6263
public void setTimeout(int timeoutMs) throws SocketException {
6364
state.getChannel().socket().setSoTimeout(timeoutMs);
65+
if (state.getConnection() != null) {
66+
state.setHeartbeat(Duration.ofSeconds(state.getConnection().getHeartbeat()));
67+
}
6468
}
6569

6670
@Override

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class SocketChannelFrameHandlerState {
4343
private final NioQueue writeQueue;
4444

4545
private volatile AMQConnection connection;
46-
private volatile long heartbeat;
46+
private volatile long heartbeatNanoSeconds = -1;
4747

4848
/** should be used only in the NIO read thread */
4949
private long lastActivity;
@@ -156,7 +156,10 @@ public AMQConnection getConnection() {
156156

157157
public void setConnection(AMQConnection connection) {
158158
this.connection = connection;
159-
this.heartbeat = Duration.ofSeconds(connection.getHeartbeat()).toNanos();
159+
}
160+
161+
void setHeartbeat(Duration ht) {
162+
this.heartbeatNanoSeconds = ht.toNanos();
160163
}
161164

162165
public void setLastActivity(long lastActivity) {
@@ -168,7 +171,7 @@ public long getLastActivity() {
168171
}
169172

170173
long getHeartbeatNanoSeconds() {
171-
return this.heartbeat;
174+
return this.heartbeatNanoSeconds;
172175
}
173176

174177
void prepareForWriteSequence() {

0 commit comments

Comments
 (0)