Skip to content

Commit 93f9d5c

Browse files
committed
provides dedicated an operator per request
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent b3d823c commit 93f9d5c

File tree

66 files changed

+11804
-3449
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+11804
-3449
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.core;
17+
18+
import io.netty.util.collection.IntObjectHashMap;
19+
import io.netty.util.collection.IntObjectMap;
20+
21+
abstract class AbstractStreamManager implements StreamManager {
22+
23+
final StreamIdSupplier streamIdSupplier;
24+
final IntObjectMap<FrameHandler> activeStreams;
25+
26+
protected AbstractStreamManager(StreamIdSupplier streamIdSupplier) {
27+
this.streamIdSupplier = streamIdSupplier;
28+
this.activeStreams = new IntObjectHashMap<>();
29+
}
30+
31+
@Override
32+
public synchronized int getNextId() {
33+
return this.streamIdSupplier.nextStreamId(this.activeStreams);
34+
}
35+
36+
@Override
37+
public synchronized int addAndGetNextId(FrameHandler frameHandler) {
38+
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
39+
final int streamId = this.streamIdSupplier.nextStreamId(activeStreams);
40+
41+
activeStreams.put(streamId, frameHandler);
42+
43+
return streamId;
44+
}
45+
46+
@Override
47+
public synchronized FrameHandler get(int streamId) {
48+
return this.activeStreams.get(streamId);
49+
}
50+
51+
@Override
52+
public synchronized boolean remove(int streamId, FrameHandler frameHandler) {
53+
return this.activeStreams.remove(streamId, frameHandler);
54+
}
55+
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.core;
17+
18+
import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
19+
import static io.rsocket.core.PayloadValidationUtils.isValid;
20+
import static io.rsocket.core.SendUtils.sendReleasingPayload;
21+
import static io.rsocket.core.StateUtils.isSubscribed;
22+
import static io.rsocket.core.StateUtils.isTerminated;
23+
import static io.rsocket.core.StateUtils.lazyTerminate;
24+
import static io.rsocket.core.StateUtils.markSubscribed;
25+
import static io.rsocket.core.StateUtils.markTerminated;
26+
27+
import io.netty.buffer.ByteBuf;
28+
import io.netty.buffer.ByteBufAllocator;
29+
import io.netty.util.IllegalReferenceCountException;
30+
import io.rsocket.Payload;
31+
import io.rsocket.frame.FrameType;
32+
import io.rsocket.internal.UnboundedProcessor;
33+
import java.time.Duration;
34+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
35+
import org.reactivestreams.Subscription;
36+
import reactor.core.CoreSubscriber;
37+
import reactor.core.Exceptions;
38+
import reactor.core.Scannable;
39+
import reactor.core.publisher.Mono;
40+
import reactor.core.publisher.Operators;
41+
import reactor.util.annotation.NonNull;
42+
import reactor.util.annotation.Nullable;
43+
44+
final class FireAndForgetRequesterMono extends Mono<Void> implements Subscription, Scannable {
45+
46+
volatile long state;
47+
48+
static final AtomicLongFieldUpdater<FireAndForgetRequesterMono> STATE =
49+
AtomicLongFieldUpdater.newUpdater(FireAndForgetRequesterMono.class, "state");
50+
51+
final ByteBufAllocator allocator;
52+
final Payload payload;
53+
final int mtu;
54+
final int maxFrameLength;
55+
final StreamManager streamManager;
56+
final UnboundedProcessor<ByteBuf> sendProcessor;
57+
58+
FireAndForgetRequesterMono(
59+
ByteBufAllocator allocator,
60+
Payload payload,
61+
int mtu,
62+
int maxFrameLength,
63+
StreamManager streamManager,
64+
UnboundedProcessor<ByteBuf> sendProcessor) {
65+
this.allocator = allocator;
66+
this.payload = payload;
67+
this.mtu = mtu;
68+
this.maxFrameLength = maxFrameLength;
69+
this.streamManager = streamManager;
70+
this.sendProcessor = sendProcessor;
71+
}
72+
73+
@Override
74+
public void subscribe(CoreSubscriber<? super Void> actual) {
75+
long previousState = markSubscribed(STATE, this);
76+
if (isSubscribed(previousState) || isTerminated(previousState)) {
77+
Operators.error(
78+
actual, new IllegalStateException("FireAndForgetMono allows only a single Subscriber"));
79+
return;
80+
}
81+
82+
actual.onSubscribe(this);
83+
84+
final Payload p = this.payload;
85+
int mtu = this.mtu;
86+
try {
87+
if (!isValid(mtu, this.maxFrameLength, p, false)) {
88+
lazyTerminate(STATE, this);
89+
p.release();
90+
actual.onError(
91+
new IllegalArgumentException(
92+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength)));
93+
return;
94+
}
95+
} catch (IllegalReferenceCountException e) {
96+
lazyTerminate(STATE, this);
97+
actual.onError(e);
98+
return;
99+
}
100+
101+
final int streamId;
102+
try {
103+
streamId = this.streamManager.getNextId();
104+
} catch (Throwable t) {
105+
lazyTerminate(STATE, this);
106+
p.release();
107+
actual.onError(Exceptions.unwrap(t));
108+
return;
109+
}
110+
111+
try {
112+
if (isTerminated(this.state)) {
113+
p.release();
114+
return;
115+
}
116+
117+
sendReleasingPayload(
118+
streamId, FrameType.REQUEST_FNF, mtu, p, this.sendProcessor, this.allocator, true);
119+
} catch (Throwable e) {
120+
lazyTerminate(STATE, this);
121+
actual.onError(e);
122+
return;
123+
}
124+
125+
lazyTerminate(STATE, this);
126+
actual.onComplete();
127+
}
128+
129+
@Override
130+
public void request(long n) {
131+
// no ops
132+
}
133+
134+
@Override
135+
public void cancel() {
136+
markTerminated(STATE, this);
137+
}
138+
139+
@Override
140+
@Nullable
141+
public Void block(Duration m) {
142+
return block();
143+
}
144+
145+
@Override
146+
@Nullable
147+
public Void block() {
148+
long previousState = markSubscribed(STATE, this);
149+
if (isSubscribed(previousState) || isTerminated(previousState)) {
150+
throw new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
151+
}
152+
153+
final Payload p = this.payload;
154+
try {
155+
if (!isValid(this.mtu, this.maxFrameLength, p, false)) {
156+
lazyTerminate(STATE, this);
157+
p.release();
158+
throw new IllegalArgumentException(
159+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
160+
}
161+
} catch (IllegalReferenceCountException e) {
162+
lazyTerminate(STATE, this);
163+
throw Exceptions.propagate(e);
164+
}
165+
166+
final int streamId;
167+
try {
168+
streamId = this.streamManager.getNextId();
169+
} catch (Throwable t) {
170+
lazyTerminate(STATE, this);
171+
p.release();
172+
throw Exceptions.propagate(t);
173+
}
174+
175+
try {
176+
sendReleasingPayload(
177+
streamId,
178+
FrameType.REQUEST_FNF,
179+
this.mtu,
180+
this.payload,
181+
this.sendProcessor,
182+
this.allocator,
183+
true);
184+
} catch (Throwable e) {
185+
lazyTerminate(STATE, this);
186+
throw Exceptions.propagate(e);
187+
}
188+
189+
lazyTerminate(STATE, this);
190+
return null;
191+
}
192+
193+
@Override
194+
public Object scanUnsafe(Scannable.Attr key) {
195+
return null; // no particular key to be represented, still useful in hooks
196+
}
197+
198+
@Override
199+
@NonNull
200+
public String stepName() {
201+
return "source(FireAndForgetMono)";
202+
}
203+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.core;
17+
18+
import io.netty.buffer.ByteBuf;
19+
import io.netty.buffer.ByteBufAllocator;
20+
import io.netty.buffer.CompositeByteBuf;
21+
import io.netty.util.ReferenceCountUtil;
22+
import io.rsocket.Payload;
23+
import io.rsocket.RSocket;
24+
import io.rsocket.frame.decoder.PayloadDecoder;
25+
import org.reactivestreams.Subscription;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
import reactor.core.CoreSubscriber;
29+
import reactor.core.publisher.Mono;
30+
31+
final class FireAndForgetResponderSubscriber
32+
implements CoreSubscriber<Void>, ResponderFrameHandler {
33+
34+
static final Logger logger = LoggerFactory.getLogger(FireAndForgetResponderSubscriber.class);
35+
36+
static final FireAndForgetResponderSubscriber INSTANCE = new FireAndForgetResponderSubscriber();
37+
38+
final int streamId;
39+
final ByteBufAllocator allocator;
40+
final PayloadDecoder payloadDecoder;
41+
final StreamManager streamManager;
42+
final RSocket handler;
43+
final int maxInboundPayloadSize;
44+
45+
CompositeByteBuf frames;
46+
47+
private FireAndForgetResponderSubscriber() {
48+
this.streamId = 0;
49+
this.allocator = null;
50+
this.payloadDecoder = null;
51+
this.maxInboundPayloadSize = 0;
52+
this.streamManager = null;
53+
this.handler = null;
54+
this.frames = null;
55+
}
56+
57+
FireAndForgetResponderSubscriber(
58+
int streamId,
59+
ByteBuf firstFrame,
60+
ByteBufAllocator allocator,
61+
PayloadDecoder payloadDecoder,
62+
int maxInboundPayloadSize,
63+
StreamManager streamManager,
64+
RSocket handler) {
65+
this.streamId = streamId;
66+
this.allocator = allocator;
67+
this.payloadDecoder = payloadDecoder;
68+
this.maxInboundPayloadSize = maxInboundPayloadSize;
69+
this.streamManager = streamManager;
70+
this.handler = handler;
71+
72+
this.frames =
73+
ReassemblyUtils.addFollowingFrame(
74+
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
75+
}
76+
77+
@Override
78+
public void onSubscribe(Subscription s) {
79+
s.request(Long.MAX_VALUE);
80+
}
81+
82+
@Override
83+
public void onNext(Void voidVal) {}
84+
85+
@Override
86+
public void onError(Throwable t) {
87+
logger.debug("Dropped Outbound error", t);
88+
}
89+
90+
@Override
91+
public void onComplete() {}
92+
93+
@Override
94+
public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLastPayload) {
95+
final CompositeByteBuf frames =
96+
ReassemblyUtils.addFollowingFrame(this.frames, followingFrame, this.maxInboundPayloadSize);
97+
98+
if (!hasFollows) {
99+
this.streamManager.remove(this.streamId, this);
100+
this.frames = null;
101+
102+
Payload payload;
103+
try {
104+
payload = this.payloadDecoder.apply(frames);
105+
frames.release();
106+
} catch (Throwable t) {
107+
ReferenceCountUtil.safeRelease(frames);
108+
logger.debug("Reassembly has failed", t);
109+
return;
110+
}
111+
112+
Mono<Void> source = this.handler.fireAndForget(payload);
113+
source.subscribe(this);
114+
}
115+
}
116+
117+
@Override
118+
public final void handleCancel() {
119+
final CompositeByteBuf frames = this.frames;
120+
if (frames != null) {
121+
this.frames = null;
122+
this.streamManager.remove(this.streamId, this);
123+
frames.release();
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)