Skip to content

Commit d50385e

Browse files
committed
provides mechanism for terminates queue on calling clear
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 60d2b07 commit d50385e

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

rsocket-core/src/main/java/io/rsocket/core/CleanOnClearQueueDecorator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
import java.util.Collection;
66
import java.util.Iterator;
77
import java.util.Queue;
8+
import java.util.concurrent.atomic.AtomicBoolean;
89

9-
final class CleanOnClearQueueDecorator implements Queue<Payload> {
10+
final class CleanOnClearQueueDecorator extends AtomicBoolean implements Queue<Payload> {
1011
final Queue<Payload> delegate;
1112

1213
CleanOnClearQueueDecorator(Queue<Payload> delegate) {
@@ -15,6 +16,7 @@ final class CleanOnClearQueueDecorator implements Queue<Payload> {
1516

1617
@Override
1718
public void clear() {
19+
set(true);
1820
Payload p;
1921
while ((p = delegate.poll()) != null) {
2022
ReferenceCountUtil.safeRelease(p);
@@ -83,6 +85,10 @@ public boolean retainAll(Collection<?> c) {
8385

8486
@Override
8587
public boolean offer(Payload payload) {
88+
if (get()) {
89+
ReferenceCountUtil.safeRelease(payload);
90+
return true;
91+
}
8692
return delegate.offer(payload);
8793
}
8894

0 commit comments

Comments
 (0)