Skip to content

Commit f21ce2e

Browse files
committed
provides more tests
part of the tests are on racing (ignored for now) another few on verification that elements are discarded properly Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 7bc4ed9 commit f21ce2e

File tree

11 files changed

+640
-235
lines changed

11 files changed

+640
-235
lines changed

rsocket-core/build.gradle

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,23 @@ dependencies {
4646
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
4747
}
4848

49+
if (JavaVersion.current().java9Compatible) {
50+
sourceSets {
51+
java9impl.java.srcDirs = ['src/test/java9impl']
52+
}
53+
54+
dependencies {
55+
testCompile sourceSets.java9impl.output
56+
}
57+
}
58+
else {
59+
sourceSets {
60+
java8impl.java.srcDirs = ['src/test/java8impl']
61+
}
62+
63+
dependencies {
64+
testCompile sourceSets.java8impl.output
65+
}
66+
}
67+
4968
description = "Core functionality for the RSocket library"

rsocket-core/src/main/java/io/rsocket/core/CleanOnClearQueueDecorator.java

Lines changed: 0 additions & 128 deletions
This file was deleted.

rsocket-core/src/main/java/io/rsocket/core/LimitRateOperator.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.rsocket.core;
1818

19-
import static io.rsocket.core.LimitRateOperator.applyLimitRateOperator;
2019
import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
2120
import static io.rsocket.keepalive.KeepAliveSupport.ClientKeepAliveSupport;
2221
import static io.rsocket.keepalive.KeepAliveSupport.KeepAlive;
@@ -281,8 +280,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
281280
int streamId = streamIdSupplier.nextStreamId(receivers);
282281

283282
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
284-
final UnicastProcessor<Payload> receiver =
285-
UnicastProcessor.create(new CleanOnClearQueueDecorator(Queues.<Payload>unbounded().get()));
283+
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
286284
final AtomicBoolean payloadReleasedFlag = new AtomicBoolean(false);
287285

288286
receivers.put(streamId, receiver);
@@ -362,8 +360,7 @@ private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Paylo
362360
final AtomicBoolean payloadReleasedFlag = new AtomicBoolean(false);
363361
final int streamId = streamIdSupplier.nextStreamId(receivers);
364362

365-
final UnicastProcessor<Payload> receiver =
366-
UnicastProcessor.create(new CleanOnClearQueueDecorator(Queues.<Payload>unbounded().get()));
363+
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
367364
final BaseSubscriber<Payload> upstreamSubscriber =
368365
new BaseSubscriber<Payload>() {
369366

@@ -432,7 +429,7 @@ public void accept(long n) {
432429
receivers.put(streamId, receiver);
433430

434431
inboundFlux
435-
.transform(f -> applyLimitRateOperator(f, Queues.SMALL_BUFFER_SIZE))
432+
.limitRate(Queues.SMALL_BUFFER_SIZE)
436433
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
437434
.subscribe(upstreamSubscriber);
438435
if (!payloadReleasedFlag.getAndSet(true)) {

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.rsocket.core;
1818

19-
import static io.rsocket.core.LimitRateOperator.applyLimitRateOperator;
2019
import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
2120

2221
import io.netty.buffer.ByteBuf;
@@ -476,14 +475,13 @@ protected void hookFinally(SignalType type) {
476475

477476
sendingSubscriptions.put(streamId, subscriber);
478477
response
479-
.transform(f -> applyLimitRateOperator(f, Queues.SMALL_BUFFER_SIZE))
478+
.limitRate(Queues.SMALL_BUFFER_SIZE)
480479
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
481480
.subscribe(subscriber);
482481
}
483482

484483
private void handleChannel(int streamId, Payload payload, int initialRequestN) {
485-
UnicastProcessor<Payload> frames =
486-
UnicastProcessor.create(new CleanOnClearQueueDecorator(Queues.<Payload>unbounded().get()));
484+
UnicastProcessor<Payload> frames = UnicastProcessor.create();
487485
channelProcessors.put(streamId, frames);
488486

489487
Flux<Payload> payloads =

rsocket-core/src/main/java/io/rsocket/frame/decoder/ZeroCopyPayloadDecoder.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,35 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.Unpooled;
5+
import io.netty.util.IllegalReferenceCountException;
6+
import io.netty.util.ReferenceCounted;
57
import io.rsocket.Payload;
68
import io.rsocket.frame.*;
79
import io.rsocket.util.ByteBufPayload;
10+
import reactor.core.publisher.Hooks;
811

912
/**
1013
* Frame decoder that decodes a frame to a payload without copying. The caller is responsible for
1114
* for releasing the payload to free memory when they no long need it.
1215
*/
1316
public class ZeroCopyPayloadDecoder implements PayloadDecoder {
17+
18+
static {
19+
Hooks.onNextDropped(
20+
o -> {
21+
if (o instanceof ReferenceCounted) {
22+
ReferenceCounted referenceCounted = (ReferenceCounted) o;
23+
if (referenceCounted.refCnt() > 0) {
24+
try {
25+
referenceCounted.release();
26+
} catch (IllegalReferenceCountException e) {
27+
// ignored
28+
}
29+
}
30+
}
31+
});
32+
}
33+
1434
@Override
1535
public Payload apply(ByteBuf byteBuf) {
1636
ByteBuf m;

0 commit comments

Comments
 (0)