Skip to content

provides request intercepting api #944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rsocket-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {

implementation 'org.slf4j:slf4j-api'

testImplementation (project(":rsocket-transport-local"))
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.junit.jupiter:junit-jupiter-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.frame.FrameType;
import io.rsocket.plugins.RequestInterceptor;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
Expand All @@ -51,21 +52,31 @@ final class FireAndForgetRequesterMono extends Mono<Void> implements Subscriptio
final RequesterResponderSupport requesterResponderSupport;
final DuplexConnection connection;

@Nullable final RequestInterceptor requestInterceptor;

FireAndForgetRequesterMono(Payload payload, RequesterResponderSupport requesterResponderSupport) {
this.allocator = requesterResponderSupport.getAllocator();
this.payload = payload;
this.mtu = requesterResponderSupport.getMtu();
this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
this.requesterResponderSupport = requesterResponderSupport;
this.connection = requesterResponderSupport.getDuplexConnection();
this.requestInterceptor = requesterResponderSupport.getRequestInterceptor();
}

@Override
public void subscribe(CoreSubscriber<? super Void> actual) {
long previousState = markSubscribed(STATE, this);
if (isSubscribedOrTerminated(previousState)) {
Operators.error(
actual, new IllegalStateException("FireAndForgetMono allows only a single Subscriber"));
final IllegalStateException e =
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
}

Operators.error(actual, e);
return;
}

Expand All @@ -76,14 +87,28 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
try {
if (!isValid(mtu, this.maxFrameLength, p, false)) {
lazyTerminate(STATE, this);
p.release();
actual.onError(

final IllegalArgumentException e =
new IllegalArgumentException(
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength)));
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, p.metadata());
}

p.release();

actual.onError(e);
return;
}
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
}

actual.onError(e);
return;
}
Expand All @@ -93,26 +118,54 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
streamId = this.requesterResponderSupport.getNextStreamId();
} catch (Throwable t) {
lazyTerminate(STATE, this);

final Throwable ut = Exceptions.unwrap(t);
final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(ut, FrameType.REQUEST_FNF, p.metadata());
}

p.release();
actual.onError(Exceptions.unwrap(t));

actual.onError(ut);
return;
}

final RequestInterceptor interceptor = this.requestInterceptor;
if (interceptor != null) {
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.metadata());
}

try {
if (isTerminated(this.state)) {
p.release();

if (interceptor != null) {
interceptor.onCancel(streamId);
}

return;
}

sendReleasingPayload(
streamId, FrameType.REQUEST_FNF, mtu, p, this.connection, this.allocator, true);
} catch (Throwable e) {
lazyTerminate(STATE, this);

if (interceptor != null) {
interceptor.onTerminate(streamId, e);
}

actual.onError(e);
return;
}

lazyTerminate(STATE, this);

if (interceptor != null) {
interceptor.onTerminate(streamId, null);
}

actual.onComplete();
}

Expand All @@ -137,19 +190,41 @@ public Void block(Duration m) {
public Void block() {
long previousState = markSubscribed(STATE, this);
if (isSubscribedOrTerminated(previousState)) {
throw new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
final IllegalStateException e =
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
}
throw e;
}

final Payload p = this.payload;
try {
if (!isValid(this.mtu, this.maxFrameLength, p, false)) {
lazyTerminate(STATE, this);

final IllegalArgumentException e =
new IllegalArgumentException(
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, p.metadata());
}

p.release();
throw new IllegalArgumentException(
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));

throw e;
}
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
}

throw Exceptions.propagate(e);
}

Expand All @@ -158,10 +233,22 @@ public Void block() {
streamId = this.requesterResponderSupport.getNextStreamId();
} catch (Throwable t) {
lazyTerminate(STATE, this);

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(Exceptions.unwrap(t), FrameType.REQUEST_FNF, p.metadata());
}

p.release();

throw Exceptions.propagate(t);
}

final RequestInterceptor interceptor = this.requestInterceptor;
if (interceptor != null) {
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.metadata());
}

try {
sendReleasingPayload(
streamId,
Expand All @@ -173,10 +260,20 @@ public Void block() {
true);
} catch (Throwable e) {
lazyTerminate(STATE, this);

if (interceptor != null) {
interceptor.onTerminate(streamId, e);
}

throw Exceptions.propagate(e);
}

lazyTerminate(STATE, this);

if (interceptor != null) {
interceptor.onTerminate(streamId, null);
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RequestInterceptor;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

final class FireAndForgetResponderSubscriber
implements CoreSubscriber<Void>, ResponderFrameHandler {
Expand All @@ -42,6 +44,8 @@ final class FireAndForgetResponderSubscriber
final RSocket handler;
final int maxInboundPayloadSize;

@Nullable final RequestInterceptor requestInterceptor;

CompositeByteBuf frames;

private FireAndForgetResponderSubscriber() {
Expand All @@ -51,6 +55,19 @@ private FireAndForgetResponderSubscriber() {
this.maxInboundPayloadSize = 0;
this.requesterResponderSupport = null;
this.handler = null;
this.requestInterceptor = null;
this.frames = null;
}

FireAndForgetResponderSubscriber(
int streamId, RequesterResponderSupport requesterResponderSupport) {
this.streamId = streamId;
this.allocator = null;
this.payloadDecoder = null;
this.maxInboundPayloadSize = 0;
this.requesterResponderSupport = null;
this.handler = null;
this.requestInterceptor = requesterResponderSupport.getRequestInterceptor();
this.frames = null;
}

Expand All @@ -65,6 +82,7 @@ private FireAndForgetResponderSubscriber() {
this.maxInboundPayloadSize = requesterResponderSupport.getMaxInboundPayloadSize();
this.requesterResponderSupport = requesterResponderSupport;
this.handler = handler;
this.requestInterceptor = requesterResponderSupport.getRequestInterceptor();

this.frames =
ReassemblyUtils.addFollowingFrame(
Expand All @@ -81,11 +99,21 @@ public void onNext(Void voidVal) {}

@Override
public void onError(Throwable t) {
final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onTerminate(this.streamId, t);
}

logger.debug("Dropped Outbound error", t);
}

@Override
public void onComplete() {}
public void onComplete() {
final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onTerminate(this.streamId, null);
}
}

@Override
public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLastPayload) {
Expand All @@ -95,11 +123,17 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
ReassemblyUtils.addFollowingFrame(
frames, followingFrame, hasFollows, this.maxInboundPayloadSize);
} catch (IllegalStateException t) {
this.requesterResponderSupport.remove(this.streamId, this);
final int streamId = this.streamId;
this.requesterResponderSupport.remove(streamId, this);

this.frames = null;
frames.release();

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onTerminate(streamId, t);
}

logger.debug("Reassembly has failed", t);
return;
}
Expand All @@ -114,6 +148,12 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
frames.release();
} catch (Throwable t) {
ReferenceCountUtil.safeRelease(frames);

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onTerminate(this.streamId, t);
}

logger.debug("Reassembly has failed", t);
return;
}
Expand All @@ -127,9 +167,16 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
public final void handleCancel() {
final CompositeByteBuf frames = this.frames;
if (frames != null) {
this.requesterResponderSupport.remove(this.streamId, this);
final int streamId = this.streamId;
this.requesterResponderSupport.remove(streamId, this);

this.frames = null;
frames.release();

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onCancel(streamId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(int) keepAliveInterval.toMillis(),
(int) keepAliveMaxLifeTime.toMillis(),
keepAliveHandler,
interceptors::initRequesterRequestInterceptor,
requesterLeaseHandler);

RSocket wrappedRSocketRequester =
Expand Down Expand Up @@ -669,7 +670,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
responderLeaseHandler,
mtu,
maxFrameLength,
maxInboundPayloadSize);
maxInboundPayloadSize,
interceptors::initResponderRequestInterceptor);

return wrappedRSocketRequester;
})
Expand Down
Loading