Skip to content

Commit cc4595b

Browse files
committed
provides tests, fixes bugs and polish API
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 8c7dfa7 commit cc4595b

22 files changed

+1605
-338
lines changed

rsocket-core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929

3030
implementation 'org.slf4j:slf4j-api'
3131

32+
testImplementation (project(":rsocket-transport-local"))
3233
testImplementation 'io.projectreactor:reactor-test'
3334
testImplementation 'org.assertj:assertj-core'
3435
testImplementation 'org.junit.jupiter:junit-jupiter-api'

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,15 @@ final class FireAndForgetRequesterMono extends Mono<Void> implements Subscriptio
6969
public void subscribe(CoreSubscriber<? super Void> actual) {
7070
long previousState = markSubscribed(STATE, this);
7171
if (isSubscribedOrTerminated(previousState)) {
72-
Operators.error(
73-
actual, new IllegalStateException("FireAndForgetMono allows only a single Subscriber"));
72+
final IllegalStateException e =
73+
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
74+
75+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
76+
if (requestInterceptor != null) {
77+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
78+
}
79+
80+
Operators.error(actual, e);
7481
return;
7582
}
7683

@@ -81,14 +88,28 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
8188
try {
8289
if (!isValid(mtu, this.maxFrameLength, p, false)) {
8390
lazyTerminate(STATE, this);
91+
8492
p.release();
85-
actual.onError(
93+
94+
final IllegalArgumentException e =
8695
new IllegalArgumentException(
87-
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength)));
96+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
97+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
98+
if (requestInterceptor != null) {
99+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
100+
}
101+
102+
actual.onError(e);
88103
return;
89104
}
90105
} catch (IllegalReferenceCountException e) {
91106
lazyTerminate(STATE, this);
107+
108+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
109+
if (requestInterceptor != null) {
110+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
111+
}
112+
92113
actual.onError(e);
93114
return;
94115
}
@@ -98,14 +119,22 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
98119
streamId = this.requesterResponderSupport.getNextStreamId();
99120
} catch (Throwable t) {
100121
lazyTerminate(STATE, this);
122+
101123
p.release();
102-
actual.onError(Exceptions.unwrap(t));
124+
125+
final Throwable ut = Exceptions.unwrap(t);
126+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
127+
if (requestInterceptor != null) {
128+
requestInterceptor.onReject(ut, FrameType.REQUEST_FNF);
129+
}
130+
131+
actual.onError(ut);
103132
return;
104133
}
105134

106135
final RequestInterceptor interceptor = this.requestInterceptor;
107136
if (interceptor != null) {
108-
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.sliceMetadata());
137+
interceptor.onStart(streamId, FrameType.REQUEST_FNF);
109138
}
110139

111140
try {
@@ -162,19 +191,40 @@ public Void block(Duration m) {
162191
public Void block() {
163192
long previousState = markSubscribed(STATE, this);
164193
if (isSubscribedOrTerminated(previousState)) {
165-
throw new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
194+
final IllegalStateException e =
195+
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
196+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
197+
if (requestInterceptor != null) {
198+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
199+
}
200+
throw e;
166201
}
167202

168203
final Payload p = this.payload;
169204
try {
170205
if (!isValid(this.mtu, this.maxFrameLength, p, false)) {
171206
lazyTerminate(STATE, this);
207+
172208
p.release();
173-
throw new IllegalArgumentException(
174-
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
209+
210+
final IllegalArgumentException e =
211+
new IllegalArgumentException(
212+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
213+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
214+
if (requestInterceptor != null) {
215+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
216+
}
217+
218+
throw e;
175219
}
176220
} catch (IllegalReferenceCountException e) {
177221
lazyTerminate(STATE, this);
222+
223+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
224+
if (requestInterceptor != null) {
225+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
226+
}
227+
178228
throw Exceptions.propagate(e);
179229
}
180230

@@ -183,13 +233,20 @@ public Void block() {
183233
streamId = this.requesterResponderSupport.getNextStreamId();
184234
} catch (Throwable t) {
185235
lazyTerminate(STATE, this);
236+
186237
p.release();
238+
239+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
240+
if (requestInterceptor != null) {
241+
requestInterceptor.onReject(t, FrameType.REQUEST_FNF);
242+
}
243+
187244
throw Exceptions.propagate(t);
188245
}
189246

190247
final RequestInterceptor interceptor = this.requestInterceptor;
191248
if (interceptor != null) {
192-
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.sliceMetadata());
249+
interceptor.onStart(streamId, FrameType.REQUEST_FNF);
193250
}
194251

195252
try {

0 commit comments

Comments
 (0)