22
22
import io .rsocket .frame .KeepAliveFrameFlyweight ;
23
23
import io .rsocket .resume .ResumeStateHolder ;
24
24
import java .time .Duration ;
25
- import java .util .Optional ;
26
25
import java .util .concurrent .atomic .AtomicReference ;
27
26
import reactor .core .Disposable ;
28
27
import reactor .core .Disposables ;
32
31
import reactor .core .publisher .UnicastProcessor ;
33
32
34
33
abstract class KeepAliveHandler implements Disposable {
35
- protected final ByteBufAllocator allocator ;
34
+ final ByteBufAllocator allocator ;
36
35
private final Duration keepAlivePeriod ;
37
- private final Duration keepAliveTimeout ;
38
- private volatile Optional < ResumeStateHolder > resumeStateHolder = Optional . empty () ;
36
+ private final long keepAliveTimeout ;
37
+ private volatile ResumeStateHolder resumeStateHolder ;
39
38
private final UnicastProcessor <ByteBuf > sent = UnicastProcessor .create ();
40
39
private final MonoProcessor <KeepAlive > timeout = MonoProcessor .create ();
41
40
private final AtomicReference <Disposable > intervalDisposable = new AtomicReference <>();
@@ -55,7 +54,7 @@ private KeepAliveHandler(
55
54
ByteBufAllocator allocator , Duration keepAlivePeriod , Duration keepAliveTimeout ) {
56
55
this .allocator = allocator ;
57
56
this .keepAlivePeriod = keepAlivePeriod ;
58
- this .keepAliveTimeout = keepAliveTimeout ;
57
+ this .keepAliveTimeout = keepAliveTimeout . toMillis () ;
59
58
}
60
59
61
60
public void start () {
@@ -90,11 +89,11 @@ public long receive(ByteBuf keepAliveFrame) {
90
89
}
91
90
92
91
public void resumeState (ResumeStateHolder resumeStateHolder ) {
93
- this .resumeStateHolder = Optional . of ( resumeStateHolder ) ;
92
+ this .resumeStateHolder = resumeStateHolder ;
94
93
}
95
94
96
95
public boolean hasResumeState () {
97
- return resumeStateHolder . isPresent () ;
96
+ return resumeStateHolder != null ;
98
97
}
99
98
100
99
public Flux <ByteBuf > send () {
@@ -113,13 +112,13 @@ void doSend(ByteBuf frame) {
113
112
114
113
void doCheckTimeout () {
115
114
long now = System .currentTimeMillis ();
116
- if (now - lastReceivedMillis >= keepAliveTimeout . toMillis () ) {
117
- timeout .onNext (new KeepAlive (keepAlivePeriod .toMillis (), keepAliveTimeout . toMillis () ));
115
+ if (now - lastReceivedMillis >= keepAliveTimeout ) {
116
+ timeout .onNext (new KeepAlive (keepAlivePeriod .toMillis (), keepAliveTimeout ));
118
117
}
119
118
}
120
119
121
- Long obtainLastReceivedPos () {
122
- return resumeStateHolder . map ( ResumeStateHolder :: impliedPosition ). orElse ( 0L ) ;
120
+ long obtainLastReceivedPos () {
121
+ return resumeStateHolder != null ? resumeStateHolder . impliedPosition () : 0 ;
123
122
}
124
123
125
124
private static class Server extends KeepAliveHandler {
0 commit comments