Skip to content

ensures onClose awaits all underlying components to be closed #1085

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.rsocket.resume;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.PayloadFrameCodec;
import io.rsocket.internal.UnboundedProcessor;
import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.LL_Result;
import reactor.core.Disposable;

public class InMemoryResumableFramesStoreStressTest {
boolean storeClosed;

InMemoryResumableFramesStore store =
new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 128);
boolean processorClosed;
UnboundedProcessor processor = new UnboundedProcessor(() -> processorClosed = true);

void subscribe() {
store.saveFrames(processor).subscribe();
store.onClose().subscribe(null, t -> storeClosed = true, () -> storeClosed = true);
}

@JCStressTest
@Outcome(
id = {"true, true"},
expect = ACCEPTABLE)
@State
public static class TwoSubscribesRaceStressTest extends InMemoryResumableFramesStoreStressTest {

Disposable d1;

final ByteBuf b1 =
PayloadFrameCodec.encode(
ByteBufAllocator.DEFAULT,
1,
false,
true,
false,
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello1"),
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello2"));
final ByteBuf b2 =
PayloadFrameCodec.encode(
ByteBufAllocator.DEFAULT,
3,
false,
true,
false,
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello3"),
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello4"));
final ByteBuf b3 =
PayloadFrameCodec.encode(
ByteBufAllocator.DEFAULT,
5,
false,
true,
false,
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello5"),
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello6"));

final ByteBuf c1 =
ErrorFrameCodec.encode(ByteBufAllocator.DEFAULT, 0, new ConnectionErrorException("closed"));

{
subscribe();
d1 = store.doOnDiscard(ByteBuf.class, ByteBuf::release).subscribe(ByteBuf::release, t -> {});
}

@Actor
public void producer1() {
processor.tryEmitNormal(b1);
processor.tryEmitNormal(b2);
processor.tryEmitNormal(b3);
}

@Actor
public void producer2() {
processor.tryEmitFinal(c1);
}

@Actor
public void producer3() {
d1.dispose();
store
.doOnDiscard(ByteBuf.class, ByteBuf::release)
.subscribe(ByteBuf::release, t -> {})
.dispose();
store
.doOnDiscard(ByteBuf.class, ByteBuf::release)
.subscribe(ByteBuf::release, t -> {})
.dispose();
store.doOnDiscard(ByteBuf.class, ByteBuf::release).subscribe(ByteBuf::release, t -> {});
}

@Actor
public void producer4() {
store.releaseFrames(0);
store.releaseFrames(0);
store.releaseFrames(0);
}

@Arbiter
public void arbiter(LL_Result r) {
r.r1 = storeClosed;
r.r2 = processorClosed;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public ClientServerInputMultiplexer(
this.source = source;
this.isClient = isClient;

this.serverReceiver = new InternalDuplexConnection(this, source);
this.clientReceiver = new InternalDuplexConnection(this, source);
this.serverReceiver = new InternalDuplexConnection(Type.SERVER, this, source);
this.clientReceiver = new InternalDuplexConnection(Type.CLIENT, this, source);
this.serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
}
Expand Down Expand Up @@ -195,8 +195,33 @@ int incrementAndGetCheckingState() {
}
}

@Override
public String toString() {
return "ClientServerInputMultiplexer{"
+ "serverReceiver="
+ serverReceiver
+ ", clientReceiver="
+ clientReceiver
+ ", serverConnection="
+ serverConnection
+ ", clientConnection="
+ clientConnection
+ ", source="
+ source
+ ", isClient="
+ isClient
+ ", s="
+ s
+ ", t="
+ t
+ ", state="
+ state
+ '}';
}

private static class InternalDuplexConnection extends Flux<ByteBuf>
implements Subscription, DuplexConnection {
private final Type type;
private final ClientServerInputMultiplexer clientServerInputMultiplexer;
private final DuplexConnection source;

Expand All @@ -207,7 +232,10 @@ private static class InternalDuplexConnection extends Flux<ByteBuf>
CoreSubscriber<? super ByteBuf> actual;

public InternalDuplexConnection(
ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection source) {
Type type,
ClientServerInputMultiplexer clientServerInputMultiplexer,
DuplexConnection source) {
this.type = type;
this.clientServerInputMultiplexer = clientServerInputMultiplexer;
this.source = source;
}
Expand Down Expand Up @@ -304,5 +332,17 @@ public Mono<Void> onClose() {
public double availability() {
return source.availability();
}

@Override
public String toString() {
return "InternalDuplexConnection{"
+ "type="
+ type
+ ", source="
+ source
+ ", state="
+ state
+ '}';
}
}
}
18 changes: 14 additions & 4 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;
Expand Down Expand Up @@ -633,8 +634,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
wrappedConnection = resumableDuplexConnection;
} else {
keepAliveHandler =
new KeepAliveHandler.DefaultKeepAliveHandler(
clientServerConnection);
new KeepAliveHandler.DefaultKeepAliveHandler();
wrappedConnection = clientServerConnection;
}

Expand All @@ -655,6 +655,11 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
requesterLeaseTracker = null;
}

final Sinks.Empty<Void> requesterOnAllClosedSink =
Sinks.unsafe().empty();
final Sinks.Empty<Void> responderOnAllClosedSink =
Sinks.unsafe().empty();

RSocket rSocketRequester =
new RSocketRequester(
multiplexer.asClientConnection(),
Expand All @@ -667,7 +672,11 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(int) keepAliveMaxLifeTime.toMillis(),
keepAliveHandler,
interceptors::initRequesterRequestInterceptor,
requesterLeaseTracker);
requesterLeaseTracker,
requesterOnAllClosedSink,
Mono.whenDelayError(
responderOnAllClosedSink.asMono(),
requesterOnAllClosedSink.asMono()));

RSocket wrappedRSocketRequester =
interceptors.initRequester(rSocketRequester);
Expand Down Expand Up @@ -715,7 +724,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(RequestInterceptor)
leases.sender)
: interceptors
::initResponderRequestInterceptor);
::initResponderRequestInterceptor,
responderOnAllClosedSink);

return wrappedRSocketRequester;
})
Expand Down
Loading