Skip to content

Commit e355d53

Browse files
committed
cleanups
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent b76e58a commit e355d53

File tree

5 files changed

+32
-50
lines changed

5 files changed

+32
-50
lines changed

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,7 @@ public boolean resumableFrameReceived(ByteBuf frame) {
151151
}
152152
}
153153

154-
@Override
155-
public void pauseImplied() {
154+
void pauseImplied() {
156155
for (; ; ) {
157156
final long impliedPosition = this.impliedPosition;
158157

@@ -163,8 +162,7 @@ public void pauseImplied() {
163162
}
164163
}
165164

166-
@Override
167-
public void resumeImplied() {
165+
void resumeImplied() {
168166
for (; ; ) {
169167
final long impliedPosition = this.impliedPosition;
170168

@@ -269,6 +267,7 @@ public void request(long n) {}
269267

270268
@Override
271269
public void cancel() {
270+
pauseImplied();
272271
state = 0;
273272
}
274273

@@ -285,6 +284,7 @@ public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
285284
}
286285

287286
this.actual = actual;
287+
resumeImplied();
288288
STATE.compareAndSet(this, 0, 1);
289289
}
290290
}

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -74,30 +74,33 @@ public boolean connect(DuplexConnection nextConnection) {
7474

7575
activeConnection.dispose();
7676

77-
final FrameReceivingSubscriber frameReceivingSubscriber =
78-
new FrameReceivingSubscriber(resumableFramesStore, receiveSubscriber);
79-
this.activeReceivingSubscriber = frameReceivingSubscriber;
80-
final Disposable disposable =
81-
resumableFramesStore
82-
.resumeStream()
83-
.subscribe(f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f));
84-
resumableFramesStore.resumeImplied();
85-
nextConnection.receive().subscribe(frameReceivingSubscriber);
86-
nextConnection
87-
.onClose()
88-
.doFinally(
89-
__ -> {
90-
frameReceivingSubscriber.dispose();
91-
disposable.dispose();
92-
resumableFramesStore.pauseImplied();
93-
})
94-
.subscribe();
77+
initConnection(nextConnection);
78+
9579
return true;
9680
} else {
9781
return false;
9882
}
9983
}
10084

85+
void initConnection(DuplexConnection nextConnection) {
86+
final FrameReceivingSubscriber frameReceivingSubscriber =
87+
new FrameReceivingSubscriber(resumableFramesStore, receiveSubscriber);
88+
this.activeReceivingSubscriber = frameReceivingSubscriber;
89+
final Disposable disposable =
90+
resumableFramesStore
91+
.resumeStream()
92+
.subscribe(f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f));
93+
nextConnection.receive().subscribe(frameReceivingSubscriber);
94+
nextConnection
95+
.onClose()
96+
.doFinally(
97+
__ -> {
98+
frameReceivingSubscriber.dispose();
99+
disposable.dispose();
100+
})
101+
.subscribe();
102+
}
103+
101104
public void disconnect() {
102105
final DuplexConnection activeConnection = this.activeConnection;
103106
if (activeConnection != DisposedConnection.INSTANCE) {
@@ -172,6 +175,7 @@ public void dispose() {
172175
}
173176

174177
framesSaverDisposable.dispose();
178+
activeReceivingSubscriber.dispose();
175179
savableFramesSender.dispose();
176180
onClose.onComplete();
177181
}
@@ -189,27 +193,7 @@ public SocketAddress remoteAddress() {
189193
@Override
190194
public void request(long n) {
191195
if (state == 1 && STATE.compareAndSet(this, 1, 2)) {
192-
final DuplexConnection connection = this.activeConnection;
193-
if (connection != null) {
194-
final FrameReceivingSubscriber frameReceivingSubscriber =
195-
new FrameReceivingSubscriber(resumableFramesStore, receiveSubscriber);
196-
this.activeReceivingSubscriber = frameReceivingSubscriber;
197-
final Disposable disposable =
198-
resumableFramesStore
199-
.resumeStream()
200-
.subscribe(f -> connection.sendFrame(FrameHeaderCodec.streamId(f), f));
201-
resumableFramesStore.resumeImplied();
202-
connection.receive().subscribe(activeReceivingSubscriber);
203-
connection
204-
.onClose()
205-
.doFinally(
206-
__ -> {
207-
frameReceivingSubscriber.dispose();
208-
disposable.dispose();
209-
resumableFramesStore.pauseImplied();
210-
})
211-
.subscribe();
212-
}
196+
initConnection(this.activeConnection);
213197
}
214198
}
215199

rsocket-core/src/main/java/io/rsocket/resume/ResumableFramesStore.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,11 @@ public interface ResumableFramesStore extends Closeable {
4747
/** @return Implied frame position as defined by RSocket protocol */
4848
long frameImpliedPosition();
4949

50-
void pauseImplied();
51-
52-
void resumeImplied();
53-
5450
/**
5551
* Received resumable frame as defined by RSocket protocol. Implementation must increment frame
5652
* implied position
53+
*
54+
* @return {@code true} if information about the frame has been stored
5755
*/
5856
boolean resumableFrameReceived(ByteBuf frame);
5957
}

rsocket-test/src/main/java/io/rsocket/test/LeaksTrackingByteBufAllocator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ public LeaksTrackingByteBufAllocator assertHasNoLeaks() {
6060

6161
final Duration awaitZeroRefCntDuration = this.awaitZeroRefCntDuration;
6262
if (!unreleased.isEmpty() && !awaitZeroRefCntDuration.isZero()) {
63-
long end = System.nanoTime() + awaitZeroRefCntDuration.toNanos();
63+
long endTimeInMillis = System.currentTimeMillis() + awaitZeroRefCntDuration.toMillis();
6464
boolean hasUnreleased;
65-
while (System.nanoTime() < end) {
65+
while (System.currentTimeMillis() <= endTimeInMillis) {
6666
hasUnreleased = false;
6767
for (ByteBuf bb : unreleased) {
6868
if (bb.refCnt() != 0) {

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ class TransportPair<T, S extends Closeable> implements Disposable {
446446
private static final String metadata = "metadata";
447447

448448
private final LeaksTrackingByteBufAllocator byteBufAllocator =
449-
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofSeconds(10));
449+
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofMinutes(1));
450450

451451
private final TestRSocket responder;
452452

0 commit comments

Comments
 (0)