Skip to content

Commit 2d6c698

Browse files
committed
provides dedicated an operator per request
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 2dbf61d commit 2d6c698

File tree

60 files changed

+9938
-3436
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+9938
-3436
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.core;
17+
18+
import io.netty.util.collection.IntObjectHashMap;
19+
import io.netty.util.collection.IntObjectMap;
20+
21+
abstract class AbstractStreamManager implements StreamManager {
22+
23+
final StreamIdSupplier streamIdSupplier;
24+
final IntObjectMap<FrameHandler> activeStreams;
25+
26+
protected AbstractStreamManager(StreamIdSupplier streamIdSupplier) {
27+
this.streamIdSupplier = streamIdSupplier;
28+
this.activeStreams = new IntObjectHashMap<>();
29+
}
30+
31+
@Override
32+
public synchronized int getNextId() {
33+
return this.streamIdSupplier.nextStreamId(this.activeStreams);
34+
}
35+
36+
@Override
37+
public synchronized int addAndGetNextId(FrameHandler frameHandler) {
38+
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
39+
final int streamId = this.streamIdSupplier.nextStreamId(activeStreams);
40+
41+
activeStreams.put(streamId, frameHandler);
42+
43+
return streamId;
44+
}
45+
46+
@Override
47+
public synchronized FrameHandler get(int streamId) {
48+
return this.activeStreams.get(streamId);
49+
}
50+
51+
@Override
52+
public synchronized boolean remove(int streamId, FrameHandler frameHandler) {
53+
return this.activeStreams.remove(streamId, frameHandler);
54+
}
55+
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.core;
17+
18+
import static io.rsocket.core.OperatorsSend.sendReleasingPayload;
19+
import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
20+
import static io.rsocket.core.PayloadValidationUtils.isValid;
21+
import static io.rsocket.core.StateMachineSupport.isSubscribed;
22+
import static io.rsocket.core.StateMachineSupport.isTerminated;
23+
import static io.rsocket.core.StateMachineSupport.lazyTerminate;
24+
import static io.rsocket.core.StateMachineSupport.markSubscribed;
25+
import static io.rsocket.core.StateMachineSupport.markTerminated;
26+
27+
import io.netty.buffer.ByteBuf;
28+
import io.netty.buffer.ByteBufAllocator;
29+
import io.netty.util.IllegalReferenceCountException;
30+
import io.rsocket.Payload;
31+
import io.rsocket.frame.FrameType;
32+
import io.rsocket.internal.UnboundedProcessor;
33+
import java.time.Duration;
34+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
35+
import org.reactivestreams.Subscription;
36+
import reactor.core.CoreSubscriber;
37+
import reactor.core.Exceptions;
38+
import reactor.core.Scannable;
39+
import reactor.core.publisher.Mono;
40+
import reactor.core.publisher.Operators;
41+
import reactor.util.annotation.NonNull;
42+
import reactor.util.annotation.Nullable;
43+
44+
final class FireAndForgetMono extends Mono<Void> implements Subscription, Scannable {
45+
46+
volatile long state;
47+
48+
static final AtomicLongFieldUpdater<FireAndForgetMono> STATE =
49+
AtomicLongFieldUpdater.newUpdater(FireAndForgetMono.class, "state");
50+
51+
final ByteBufAllocator allocator;
52+
final Payload payload;
53+
final int mtu;
54+
final int maxFrameLength;
55+
final StreamManager streamManager;
56+
final UnboundedProcessor<ByteBuf> sendProcessor;
57+
58+
FireAndForgetMono(
59+
ByteBufAllocator allocator,
60+
Payload payload,
61+
int mtu,
62+
int maxFrameLength,
63+
StreamManager streamManager,
64+
UnboundedProcessor<ByteBuf> sendProcessor) {
65+
this.allocator = allocator;
66+
this.payload = payload;
67+
this.mtu = mtu;
68+
this.maxFrameLength = maxFrameLength;
69+
this.streamManager = streamManager;
70+
this.sendProcessor = sendProcessor;
71+
}
72+
73+
@Override
74+
public void subscribe(CoreSubscriber<? super Void> actual) {
75+
long previousState = markSubscribed(STATE, this);
76+
if (isSubscribed(previousState) || isTerminated(previousState)) {
77+
Operators.error(
78+
actual, new IllegalStateException("FireAndForgetMono allows only a single Subscriber"));
79+
return;
80+
}
81+
82+
actual.onSubscribe(this);
83+
84+
final Payload p = this.payload;
85+
int mtu = this.mtu;
86+
try {
87+
if (!isValid(mtu, this.maxFrameLength, p, false)) {
88+
lazyTerminate(STATE, this);
89+
p.release();
90+
actual.onError(
91+
new IllegalArgumentException(
92+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength)));
93+
return;
94+
}
95+
} catch (IllegalReferenceCountException e) {
96+
lazyTerminate(STATE, this);
97+
actual.onError(e);
98+
return;
99+
}
100+
101+
final int streamId;
102+
try {
103+
streamId = this.streamManager.getNextId();
104+
} catch (Throwable t) {
105+
lazyTerminate(STATE, this);
106+
p.release();
107+
actual.onError(Exceptions.unwrap(t));
108+
return;
109+
}
110+
111+
try {
112+
if (isTerminated(this.state)) {
113+
p.release();
114+
return;
115+
}
116+
117+
sendReleasingPayload(
118+
streamId, FrameType.REQUEST_FNF, mtu, p, this.sendProcessor, this.allocator, true);
119+
} catch (Throwable e) {
120+
lazyTerminate(STATE, this);
121+
actual.onError(e);
122+
return;
123+
}
124+
125+
lazyTerminate(STATE, this);
126+
actual.onComplete();
127+
}
128+
129+
@Override
130+
public void request(long n) {
131+
// no ops
132+
}
133+
134+
@Override
135+
public void cancel() {
136+
markTerminated(STATE, this);
137+
}
138+
139+
@Override
140+
@Nullable
141+
public Void block(Duration m) {
142+
return block();
143+
}
144+
145+
@Override
146+
@Nullable
147+
public Void block() {
148+
long previousState = markSubscribed(STATE, this);
149+
if (isSubscribed(previousState) || isTerminated(previousState)) {
150+
throw new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
151+
}
152+
153+
final Payload p = this.payload;
154+
try {
155+
if (!isValid(this.mtu, this.maxFrameLength, p, false)) {
156+
lazyTerminate(STATE, this);
157+
p.release();
158+
throw new IllegalArgumentException(
159+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
160+
}
161+
} catch (IllegalReferenceCountException e) {
162+
lazyTerminate(STATE, this);
163+
throw Exceptions.propagate(e);
164+
}
165+
166+
final int streamId;
167+
try {
168+
streamId = this.streamManager.getNextId();
169+
} catch (Throwable t) {
170+
lazyTerminate(STATE, this);
171+
p.release();
172+
throw Exceptions.propagate(t);
173+
}
174+
175+
try {
176+
sendReleasingPayload(
177+
streamId,
178+
FrameType.REQUEST_FNF,
179+
this.mtu,
180+
this.payload,
181+
this.sendProcessor,
182+
this.allocator,
183+
true);
184+
} catch (Throwable e) {
185+
lazyTerminate(STATE, this);
186+
throw Exceptions.propagate(e);
187+
}
188+
189+
lazyTerminate(STATE, this);
190+
return null;
191+
}
192+
193+
@Override
194+
public Object scanUnsafe(Scannable.Attr key) {
195+
return null; // no particular key to be represented, still useful in hooks
196+
}
197+
198+
@Override
199+
@NonNull
200+
public String stepName() {
201+
return "source(FireAndForgetMono)";
202+
}
203+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.core;
17+
18+
import io.netty.buffer.ByteBuf;
19+
import io.netty.buffer.ByteBufAllocator;
20+
import io.netty.buffer.CompositeByteBuf;
21+
import io.netty.util.ReferenceCountUtil;
22+
import io.rsocket.Payload;
23+
import io.rsocket.RSocket;
24+
import io.rsocket.frame.decoder.PayloadDecoder;
25+
import org.reactivestreams.Subscription;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
import reactor.core.CoreSubscriber;
29+
import reactor.core.publisher.Mono;
30+
31+
final class FireAndForgetSubscriber implements CoreSubscriber<Void>, ResponderFrameHandler {
32+
33+
static final Logger logger = LoggerFactory.getLogger(FireAndForgetSubscriber.class);
34+
35+
static final FireAndForgetSubscriber INSTANCE = new FireAndForgetSubscriber();
36+
37+
final int streamId;
38+
final ByteBufAllocator allocator;
39+
final PayloadDecoder payloadDecoder;
40+
final StreamManager streamManager;
41+
final RSocket handler;
42+
final int maxInboundPayloadSize;
43+
44+
CompositeByteBuf frames;
45+
46+
private FireAndForgetSubscriber() {
47+
this.streamId = 0;
48+
this.allocator = null;
49+
this.payloadDecoder = null;
50+
this.maxInboundPayloadSize = 0;
51+
this.streamManager = null;
52+
this.handler = null;
53+
this.frames = null;
54+
}
55+
56+
FireAndForgetSubscriber(
57+
int streamId,
58+
ByteBuf firstFrame,
59+
ByteBufAllocator allocator,
60+
PayloadDecoder payloadDecoder,
61+
int maxInboundPayloadSize,
62+
StreamManager streamManager,
63+
RSocket handler) {
64+
this.streamId = streamId;
65+
this.allocator = allocator;
66+
this.payloadDecoder = payloadDecoder;
67+
this.maxInboundPayloadSize = maxInboundPayloadSize;
68+
this.streamManager = streamManager;
69+
this.handler = handler;
70+
71+
this.frames =
72+
ReassemblyUtils.addFollowingFrame(
73+
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
74+
}
75+
76+
@Override
77+
public void onSubscribe(Subscription s) {
78+
s.request(Long.MAX_VALUE);
79+
}
80+
81+
@Override
82+
public void onNext(Void voidVal) {}
83+
84+
@Override
85+
public void onError(Throwable t) {
86+
logger.debug("Dropped Outbound error", t);
87+
}
88+
89+
@Override
90+
public void onComplete() {}
91+
92+
@Override
93+
public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLastPayload) {
94+
final CompositeByteBuf frames =
95+
ReassemblyUtils.addFollowingFrame(this.frames, followingFrame, this.maxInboundPayloadSize);
96+
97+
if (!hasFollows) {
98+
this.streamManager.remove(this.streamId, this);
99+
this.frames = null;
100+
101+
Payload payload;
102+
try {
103+
payload = this.payloadDecoder.apply(frames);
104+
frames.release();
105+
} catch (Throwable t) {
106+
ReferenceCountUtil.safeRelease(frames);
107+
logger.debug("Reassembly has failed", t);
108+
return;
109+
}
110+
111+
Mono<Void> source = this.handler.fireAndForget(payload);
112+
source.subscribe(this);
113+
}
114+
}
115+
116+
@Override
117+
public final void handleCancel() {
118+
final CompositeByteBuf frames = this.frames;
119+
if (frames != null) {
120+
this.streamManager.remove(this.streamId, this);
121+
frames.release();
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)