Skip to content

Commit 168450e

Browse files
committed
provides tests, fixes bugs and polish API
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 8c7dfa7 commit 168450e

35 files changed

+2122
-632
lines changed

rsocket-core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929

3030
implementation 'org.slf4j:slf4j-api'
3131

32+
testImplementation (project(":rsocket-transport-local"))
3233
testImplementation 'io.projectreactor:reactor-test'
3334
testImplementation 'org.assertj:assertj-core'
3435
testImplementation 'org.junit.jupiter:junit-jupiter-api'

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java

Lines changed: 74 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import reactor.core.Scannable;
3535
import reactor.core.publisher.Mono;
3636
import reactor.core.publisher.Operators;
37-
import reactor.core.publisher.SignalType;
3837
import reactor.util.annotation.NonNull;
3938
import reactor.util.annotation.Nullable;
4039

@@ -69,8 +68,15 @@ final class FireAndForgetRequesterMono extends Mono<Void> implements Subscriptio
6968
public void subscribe(CoreSubscriber<? super Void> actual) {
7069
long previousState = markSubscribed(STATE, this);
7170
if (isSubscribedOrTerminated(previousState)) {
72-
Operators.error(
73-
actual, new IllegalStateException("FireAndForgetMono allows only a single Subscriber"));
71+
final IllegalStateException e =
72+
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
73+
74+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
75+
if (requestInterceptor != null) {
76+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
77+
}
78+
79+
Operators.error(actual, e);
7480
return;
7581
}
7682

@@ -81,14 +87,28 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
8187
try {
8288
if (!isValid(mtu, this.maxFrameLength, p, false)) {
8389
lazyTerminate(STATE, this);
84-
p.release();
85-
actual.onError(
90+
91+
final IllegalArgumentException e =
8692
new IllegalArgumentException(
87-
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength)));
93+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
94+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
95+
if (requestInterceptor != null) {
96+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, p.metadata());
97+
}
98+
99+
p.release();
100+
101+
actual.onError(e);
88102
return;
89103
}
90104
} catch (IllegalReferenceCountException e) {
91105
lazyTerminate(STATE, this);
106+
107+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
108+
if (requestInterceptor != null) {
109+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
110+
}
111+
92112
actual.onError(e);
93113
return;
94114
}
@@ -98,22 +118,30 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
98118
streamId = this.requesterResponderSupport.getNextStreamId();
99119
} catch (Throwable t) {
100120
lazyTerminate(STATE, this);
121+
122+
final Throwable ut = Exceptions.unwrap(t);
123+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
124+
if (requestInterceptor != null) {
125+
requestInterceptor.onReject(ut, FrameType.REQUEST_FNF, p.metadata());
126+
}
127+
101128
p.release();
102-
actual.onError(Exceptions.unwrap(t));
129+
130+
actual.onError(ut);
103131
return;
104132
}
105133

106134
final RequestInterceptor interceptor = this.requestInterceptor;
107135
if (interceptor != null) {
108-
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.sliceMetadata());
136+
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.metadata());
109137
}
110138

111139
try {
112140
if (isTerminated(this.state)) {
113141
p.release();
114142

115143
if (interceptor != null) {
116-
interceptor.onEnd(streamId, SignalType.CANCEL);
144+
interceptor.onCancel(streamId);
117145
}
118146

119147
return;
@@ -125,7 +153,7 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
125153
lazyTerminate(STATE, this);
126154

127155
if (interceptor != null) {
128-
interceptor.onEnd(streamId, SignalType.ON_ERROR);
156+
interceptor.onTerminate(streamId, e);
129157
}
130158

131159
actual.onError(e);
@@ -135,7 +163,7 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
135163
lazyTerminate(STATE, this);
136164

137165
if (interceptor != null) {
138-
interceptor.onEnd(streamId, SignalType.ON_COMPLETE);
166+
interceptor.onTerminate(streamId, null);
139167
}
140168

141169
actual.onComplete();
@@ -162,19 +190,41 @@ public Void block(Duration m) {
162190
public Void block() {
163191
long previousState = markSubscribed(STATE, this);
164192
if (isSubscribedOrTerminated(previousState)) {
165-
throw new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
193+
final IllegalStateException e =
194+
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
195+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
196+
if (requestInterceptor != null) {
197+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
198+
}
199+
throw e;
166200
}
167201

168202
final Payload p = this.payload;
169203
try {
170204
if (!isValid(this.mtu, this.maxFrameLength, p, false)) {
171205
lazyTerminate(STATE, this);
206+
207+
final IllegalArgumentException e =
208+
new IllegalArgumentException(
209+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
210+
211+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
212+
if (requestInterceptor != null) {
213+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, p.metadata());
214+
}
215+
172216
p.release();
173-
throw new IllegalArgumentException(
174-
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
217+
218+
throw e;
175219
}
176220
} catch (IllegalReferenceCountException e) {
177221
lazyTerminate(STATE, this);
222+
223+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
224+
if (requestInterceptor != null) {
225+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
226+
}
227+
178228
throw Exceptions.propagate(e);
179229
}
180230

@@ -183,13 +233,20 @@ public Void block() {
183233
streamId = this.requesterResponderSupport.getNextStreamId();
184234
} catch (Throwable t) {
185235
lazyTerminate(STATE, this);
236+
237+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
238+
if (requestInterceptor != null) {
239+
requestInterceptor.onReject(Exceptions.unwrap(t), FrameType.REQUEST_FNF, p.metadata());
240+
}
241+
186242
p.release();
243+
187244
throw Exceptions.propagate(t);
188245
}
189246

190247
final RequestInterceptor interceptor = this.requestInterceptor;
191248
if (interceptor != null) {
192-
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.sliceMetadata());
249+
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.metadata());
193250
}
194251

195252
try {
@@ -205,7 +262,7 @@ public Void block() {
205262
lazyTerminate(STATE, this);
206263

207264
if (interceptor != null) {
208-
interceptor.onEnd(streamId, SignalType.ON_ERROR);
265+
interceptor.onTerminate(streamId, e);
209266
}
210267

211268
throw Exceptions.propagate(e);
@@ -214,7 +271,7 @@ public Void block() {
214271
lazyTerminate(STATE, this);
215272

216273
if (interceptor != null) {
217-
interceptor.onEnd(streamId, SignalType.ON_COMPLETE);
274+
interceptor.onTerminate(streamId, null);
218275
}
219276

220277
return null;

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetResponderSubscriber.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.slf4j.LoggerFactory;
2929
import reactor.core.CoreSubscriber;
3030
import reactor.core.publisher.Mono;
31-
import reactor.core.publisher.SignalType;
3231
import reactor.util.annotation.Nullable;
3332

3433
final class FireAndForgetResponderSubscriber
@@ -102,7 +101,7 @@ public void onNext(Void voidVal) {}
102101
public void onError(Throwable t) {
103102
final RequestInterceptor requestInterceptor = this.requestInterceptor;
104103
if (requestInterceptor != null) {
105-
requestInterceptor.onEnd(this.streamId, SignalType.ON_ERROR);
104+
requestInterceptor.onTerminate(this.streamId, t);
106105
}
107106

108107
logger.debug("Dropped Outbound error", t);
@@ -112,7 +111,7 @@ public void onError(Throwable t) {
112111
public void onComplete() {
113112
final RequestInterceptor requestInterceptor = this.requestInterceptor;
114113
if (requestInterceptor != null) {
115-
requestInterceptor.onEnd(this.streamId, SignalType.ON_COMPLETE);
114+
requestInterceptor.onTerminate(this.streamId, null);
116115
}
117116
}
118117

@@ -132,7 +131,7 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
132131

133132
final RequestInterceptor requestInterceptor = this.requestInterceptor;
134133
if (requestInterceptor != null) {
135-
requestInterceptor.onEnd(streamId, SignalType.ON_ERROR);
134+
requestInterceptor.onTerminate(streamId, t);
136135
}
137136

138137
logger.debug("Reassembly has failed", t);
@@ -152,7 +151,7 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
152151

153152
final RequestInterceptor requestInterceptor = this.requestInterceptor;
154153
if (requestInterceptor != null) {
155-
requestInterceptor.onEnd(this.streamId, SignalType.ON_ERROR);
154+
requestInterceptor.onTerminate(this.streamId, t);
156155
}
157156

158157
logger.debug("Reassembly has failed", t);
@@ -176,7 +175,7 @@ public final void handleCancel() {
176175

177176
final RequestInterceptor requestInterceptor = this.requestInterceptor;
178177
if (requestInterceptor != null) {
179-
requestInterceptor.onEnd(streamId, SignalType.CANCEL);
178+
requestInterceptor.onCancel(streamId);
180179
}
181180
}
182181
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
631631
(int) keepAliveInterval.toMillis(),
632632
(int) keepAliveMaxLifeTime.toMillis(),
633633
keepAliveHandler,
634-
interceptors.initRequesterRequestInterceptor(),
634+
interceptors::initRequesterRequestInterceptor,
635635
requesterLeaseHandler);
636636

637637
RSocket wrappedRSocketRequester =
@@ -671,7 +671,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
671671
mtu,
672672
maxFrameLength,
673673
maxInboundPayloadSize,
674-
interceptors.initResponderRequestInterceptor());
674+
interceptors::initResponderRequestInterceptor);
675675

676676
return wrappedRSocketRequester;
677677
})

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.rsocket.plugins.RequestInterceptor;
3737
import java.nio.channels.ClosedChannelException;
3838
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
39+
import java.util.function.Function;
3940
import java.util.function.Supplier;
4041
import org.reactivestreams.Publisher;
4142
import org.slf4j.Logger;
@@ -76,7 +77,7 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
7677
int keepAliveTickPeriod,
7778
int keepAliveAckTimeout,
7879
@Nullable KeepAliveHandler keepAliveHandler,
79-
@Nullable RequestInterceptor requestInterceptor,
80+
Function<RSocket, RequestInterceptor> requestInterceptorFunction,
8081
RequesterLeaseHandler leaseHandler) {
8182
super(
8283
mtu,
@@ -85,7 +86,7 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
8586
payloadDecoder,
8687
connection,
8788
streamIdSupplier,
88-
requestInterceptor);
89+
requestInterceptorFunction);
8990

9091
this.leaseHandler = leaseHandler;
9192
this.onClose = MonoProcessor.create();

0 commit comments

Comments
 (0)