Skip to content

Commit 1936e8c

Browse files
author
Oleh Dokuka
committed
fixes removal ordering
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: OlegDokuka <[email protected]>
1 parent 17f5d74 commit 1936e8c

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

rsocket-core/src/main/java/io/rsocket/core/RequestStreamRequesterFlux.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,11 @@ void sendFirstPayload(Payload payload, long initialRequestN) {
238238
return;
239239
}
240240

241-
sm.remove(streamId, this);
242-
243241
final ByteBuf cancelFrame = CancelFrameCodec.encode(allocator, streamId);
244242
connection.sendFrame(streamId, cancelFrame);
245243

244+
sm.remove(streamId, this);
245+
246246
if (requestInterceptor != null) {
247247
requestInterceptor.onCancel(streamId, FrameType.REQUEST_STREAM);
248248
}
@@ -276,12 +276,13 @@ public final void cancel() {
276276

277277
if (isFirstFrameSent(previousState)) {
278278
final int streamId = this.streamId;
279-
this.requesterResponderSupport.remove(streamId, this);
280279

281280
ReassemblyUtils.synchronizedRelease(this, previousState);
282281

283282
this.connection.sendFrame(streamId, CancelFrameCodec.encode(this.allocator, streamId));
284283

284+
this.requesterResponderSupport.remove(streamId, this);
285+
285286
final RequestInterceptor requestInterceptor = this.requestInterceptor;
286287
if (requestInterceptor != null) {
287288
requestInterceptor.onCancel(streamId, FrameType.REQUEST_STREAM);
@@ -309,13 +310,14 @@ public final void handlePayload(Payload p) {
309310
}
310311

311312
final int streamId = this.streamId;
312-
this.requesterResponderSupport.remove(streamId, this);
313313

314314
final IllegalStateException cause =
315315
Exceptions.failWithOverflow(
316316
"The number of messages received exceeds the number requested");
317317
this.connection.sendFrame(streamId, CancelFrameCodec.encode(this.allocator, streamId));
318318

319+
this.requesterResponderSupport.remove(streamId, this);
320+
319321
final RequestInterceptor requestInterceptor = this.requestInterceptor;
320322
if (requestInterceptor != null) {
321323
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, cause);

rsocket-core/src/main/java/io/rsocket/core/RequestStreamResponderSubscriber.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ public void onNext(Payload p) {
144144
final ByteBuf errorFrame = ErrorFrameCodec.encode(allocator, streamId, e);
145145
sender.sendFrame(streamId, errorFrame);
146146

147+
this.requesterResponderSupport.remove(streamId, this);
148+
147149
final RequestInterceptor requestInterceptor = this.requestInterceptor;
148150
if (requestInterceptor != null) {
149151
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, e);
@@ -162,6 +164,8 @@ public void onNext(Payload p) {
162164
new CanceledException("Failed to validate payload. Cause" + e.getMessage()));
163165
sender.sendFrame(streamId, errorFrame);
164166

167+
this.requesterResponderSupport.remove(streamId, this);
168+
165169
final RequestInterceptor requestInterceptor = this.requestInterceptor;
166170
if (requestInterceptor != null) {
167171
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, e);
@@ -176,6 +180,8 @@ public void onNext(Payload p) {
176180
return;
177181
}
178182

183+
this.requesterResponderSupport.remove(streamId, this);
184+
179185
final RequestInterceptor requestInterceptor = this.requestInterceptor;
180186
if (requestInterceptor != null) {
181187
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, t);
@@ -195,8 +201,6 @@ boolean tryTerminateOnError() {
195201
return false;
196202
}
197203

198-
this.requesterResponderSupport.remove(this.streamId, this);
199-
200204
currentSubscription.cancel();
201205

202206
return true;
@@ -222,11 +226,12 @@ public void onError(Throwable t) {
222226
}
223227

224228
final int streamId = this.streamId;
225-
this.requesterResponderSupport.remove(streamId, this);
226229

227230
final ByteBuf errorFrame = ErrorFrameCodec.encode(this.allocator, streamId, t);
228231
this.connection.sendFrame(streamId, errorFrame);
229232

233+
this.requesterResponderSupport.remove(streamId, this);
234+
230235
final RequestInterceptor requestInterceptor = this.requestInterceptor;
231236
if (requestInterceptor != null) {
232237
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, t);
@@ -246,11 +251,12 @@ public void onComplete() {
246251
}
247252

248253
final int streamId = this.streamId;
249-
this.requesterResponderSupport.remove(streamId, this);
250254

251255
final ByteBuf completeFrame = PayloadFrameCodec.encodeComplete(this.allocator, streamId);
252256
this.connection.sendFrame(streamId, completeFrame);
253257

258+
this.requesterResponderSupport.remove(streamId, this);
259+
254260
final RequestInterceptor requestInterceptor = this.requestInterceptor;
255261
if (requestInterceptor != null) {
256262
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, null);
@@ -321,7 +327,6 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
321327
S.lazySet(this, Operators.cancelledSubscription());
322328

323329
final int streamId = this.streamId;
324-
this.requesterResponderSupport.remove(streamId, this);
325330

326331
this.frames = null;
327332
frames.release();
@@ -334,6 +339,8 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
334339
new CanceledException("Failed to reassemble payload. Cause: " + e.getMessage()));
335340
this.connection.sendFrame(streamId, errorFrame);
336341

342+
this.requesterResponderSupport.remove(streamId, this);
343+
337344
final RequestInterceptor requestInterceptor = this.requestInterceptor;
338345
if (requestInterceptor != null) {
339346
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, e);
@@ -354,7 +361,6 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
354361
this.done = true;
355362

356363
final int streamId = this.streamId;
357-
this.requesterResponderSupport.remove(streamId, this);
358364

359365
ReferenceCountUtil.safeRelease(frames);
360366

@@ -366,6 +372,8 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
366372
new CanceledException("Failed to reassemble payload. Cause: " + t.getMessage()));
367373
this.connection.sendFrame(streamId, errorFrame);
368374

375+
this.requesterResponderSupport.remove(streamId, this);
376+
369377
final RequestInterceptor requestInterceptor = this.requestInterceptor;
370378
if (requestInterceptor != null) {
371379
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, t);

0 commit comments

Comments
 (0)