Skip to content

Commit 2814dd8

Browse files
committed
fixes concurrent queue consumption
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent a87f227 commit 2814dd8

File tree

1 file changed

+5
-0
lines changed

1 file changed

+5
-0
lines changed

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,10 @@ public void cancel() {
352352
}
353353
cancelled = true;
354354

355+
if (outputFused) {
356+
return;
357+
}
358+
355359
if (WIP.getAndIncrement(this) == 0) {
356360
this.clear();
357361
hasDownstream = false;
@@ -418,6 +422,7 @@ public int requestFusion(int requestedMode) {
418422

419423
@Override
420424
public void dispose() {
425+
super.dispose();
421426
cancel();
422427
}
423428

0 commit comments

Comments
 (0)