Skip to content

Commit 2ea0bbf

Browse files
Request/Response Fragmentation & Assembly
Here is an example request/response of a single payload fragmented across multiple frames: ``` SERVER ==> Writes from server->client: Frame[0] => Stream ID: 2 Type: NEXT_COMPLETE Payload: data: "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklm... CLIENT <== Input from server->client: Frame[0] => Stream ID: 2 Type: NEXT_COMPLETE Payload: data: "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmn... SERVER ==> Writes from server->client: Frame[0] => Stream ID: 2 Type: NEXT_COMPLETE Payload: data: "ijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstu... CLIENT <== Input from server->client: Frame[0] => Stream ID: 2 Type: NEXT_COMPLETE Payload: data: "ijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuv... SERVER ==> Writes from server->client: Frame[0] => Stream ID: 2 Type: NEXT_COMPLETE Payload: data: "qrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabc... SERVER ==> Writes from server->client: Frame[0] => Stream ID: 2 Type: NEXT_COMPLETE Payload: data: "yzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijk... CLIENT <== Input from server->client: Frame[0] => Stream ID: 2 Type: NEXT_COMPLETE Payload: data: "qrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcd... CLIENT <== Input from server->client: Frame[0] => Stream ID: 2 Type: NEXT_COMPLETE Payload: data: "yzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijkl... ``` Note how only the first starts with "abc..." while the rest are chopped in the middle.
1 parent 4032fbd commit 2ea0bbf

File tree

9 files changed

+762
-257
lines changed

9 files changed

+762
-257
lines changed

src/main/java/io/reactivesocket/internal/FragmentedPublisher.java

Lines changed: 289 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,43 +15,315 @@
1515
*/
1616
package io.reactivesocket.internal;
1717

18+
import java.util.ArrayList;
19+
import java.util.Collections;
20+
import java.util.Iterator;
21+
import java.util.List;
22+
import java.util.Queue;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
1827
import org.reactivestreams.Publisher;
1928
import org.reactivestreams.Subscriber;
2029
import org.reactivestreams.Subscription;
2130

2231
import io.reactivesocket.Frame;
2332
import io.reactivesocket.FrameType;
24-
import io.reactivesocket.Payload;
2533
import io.reactivesocket.internal.frame.PayloadFragmenter;
34+
import io.reactivesocket.internal.rx.QueueDrainHelper;
35+
import io.reactivesocket.internal.rx.SerializedSubscriber;
36+
import io.reactivesocket.internal.rx.SubscriptionArbiter;
37+
import uk.co.real_logic.agrona.concurrent.OneToOneConcurrentArrayQueue;
2638

2739
public class FragmentedPublisher implements Publisher<Frame> {
2840

29-
private final PayloadFragmenter fragmenter = new PayloadFragmenter(Frame.METADATA_MTU, Frame.DATA_MTU);
30-
private final Publisher<Payload> responsePublisher;
31-
private final int streamId;
32-
private final FrameType type;
41+
private volatile Subscriber<Frame> downstream;
42+
private volatile SubscriptionArbiter sa;
43+
private final AtomicInteger wip = new AtomicInteger();
44+
45+
// TODO use better data structures
46+
private final List<InnerSubscriber> subscribers = Collections.synchronizedList(new ArrayList<InnerSubscriber>(16));
47+
private final Queue<InnerSubscriber> toRemove = new ConcurrentLinkedQueue<InnerSubscriber>();
48+
49+
private int index = 0;
50+
private static final Object COMPLETED = new Object();
3351

34-
public FragmentedPublisher(FrameType type, int streamId, Publisher<Payload> responsePublisher) {
35-
this.type = type;
36-
this.streamId = streamId;
37-
this.responsePublisher = responsePublisher;
38-
}
39-
4052
@Override
4153
public void subscribe(Subscriber<? super Frame> child) {
42-
child.onSubscribe(new Subscription() {
54+
SerializedSubscriber<Frame> ssub = new SerializedSubscriber<>(child);
55+
sa = new SubscriptionArbiter();
56+
sa.setSubscription(new Subscription() {
4357

4458
@Override
4559
public void request(long n) {
46-
// TODO Auto-generated method stub
47-
60+
tryEmit();
4861
}
4962

5063
@Override
5164
public void cancel() {
52-
// TODO Auto-generated method stub
53-
54-
}});
65+
sa.cancel();
66+
for (InnerSubscriber is : subscribers) {
67+
is._s.cancel();
68+
}
69+
}
70+
71+
});
72+
ssub.onSubscribe(sa);
73+
downstream = ssub;
74+
}
75+
76+
public void tryEmit() {
77+
QueueDrainHelper.queueDrainLoop(wip,
78+
this::drainInnerSubscribers, /* drain if we get the emission lock */
79+
() -> {
80+
/* nothing to do if another thread has the emission lock */} ,
81+
this::drainInnerSubscribers /* drain if others tried while we were emitting */);
82+
}
83+
84+
private void drainInnerSubscribers() {
85+
long r = sa.getRequested();
86+
int numItems = subscribers.size();
87+
int emitted = 0;
88+
int startIndex = index;
89+
if (subscribers.size() == 0) {
90+
return;
91+
}
92+
while (emitted < r) {
93+
if (index >= subscribers.size()) {
94+
index = 0;
95+
}
96+
InnerSubscriber is = subscribers.get(index++);
97+
numItems--;
98+
if (is == null) {
99+
break;
100+
}
101+
emitted += is.drain(r - emitted);
102+
if (numItems == 0) {
103+
break;
104+
}
105+
if (index == startIndex) {
106+
// looped around, break out so this thread isn't starved
107+
break;
108+
}
109+
}
110+
sa.produced(emitted);
111+
for (InnerSubscriber is : toRemove) {
112+
subscribers.remove(is);
113+
toRemove.remove(is);
114+
}
115+
}
116+
117+
/**
118+
* Horizontally Unbounded submission of Publisher. This means as many "concurrent" outputs as wanted.
119+
* <p>
120+
* This is ultimately controlled by lease semantics which controls how many streams (requests) are in-flight.
121+
*
122+
* @param t
123+
*/
124+
public void submit(Publisher<Frame> t) {
125+
if (downstream == null) {
126+
throw new IllegalStateException("Downstream has not yet subscribed. Please await() subscription");
127+
}
128+
129+
// horizontally no backpressure, we subscribe to all incoming Publishers, and then backpressure their individual streams
130+
InnerSubscriber is = new InnerSubscriber(this);
131+
subscribers.add(is);
132+
t.subscribe(is);
133+
}
134+
135+
/**
136+
* Asynchronously subscribe to Frames, buffer them internally if needed.
137+
*/
138+
private static final class InnerSubscriber implements Subscriber<Frame> {
139+
private final AtomicLong outstanding = new AtomicLong();
140+
// TODO replace this as this is very inefficient
141+
private volatile OneToOneConcurrentArrayQueue<Object> q;
142+
final FragmentedPublisher parent;
143+
private Subscription _s;
144+
static final int BATCH = 128;
145+
146+
public InnerSubscriber(FragmentedPublisher parent) {
147+
this.parent = parent;
148+
}
149+
150+
@Override
151+
public void onSubscribe(Subscription s) {
152+
_s = s;
153+
// we manage our own rate since the transport should go as fast as it can, no application level flow control here
154+
s.request(BATCH);
155+
outstanding.set(BATCH);
156+
}
157+
158+
@Override
159+
public void onNext(Frame frame) {
160+
FrameType type = frame.getType();
161+
int streamId = frame.getStreamId();
162+
if (PayloadFragmenter.requiresFragmenting(Frame.METADATA_MTU, Frame.DATA_MTU, frame)) {
163+
nextFragmented(frame, type, streamId);
164+
} else {
165+
nextUnfragmented(frame);
166+
}
167+
}
168+
169+
private void nextUnfragmented(Frame frame) {
170+
QueueDrainHelper.queueDrainLoop(parent.wip,
171+
/* fast-path if no contention when trying to emit */
172+
() -> {
173+
if (parent.sa.getRequested() == 0) {
174+
createQueueIfNecessary();
175+
q.add(frame);
176+
return;
177+
}
178+
if (q == null || q.size() == 0) {
179+
parent.downstream.onNext(frame);
180+
parent.sa.produced(1);
181+
outstanding.decrementAndGet();
182+
InnerSubscriber.this.requestMoreIfNeeded();
183+
} else {
184+
// enqueue, then drain if there are already things in the queue
185+
q.add(frame);
186+
drainRequestedAndRequestMoreIfNeeded();
187+
}
188+
} ,
189+
/* if contended, then just enqueue */
190+
() -> {
191+
createQueueIfNecessary();
192+
q.add(frame);
193+
} ,
194+
/* if another thread enqueued while emitting above, then this will have a chance to drain after */
195+
() -> {
196+
drainRequestedAndRequestMoreIfNeeded();
197+
});
198+
}
199+
200+
private void nextFragmented(Frame frame, FrameType type, int streamId) {
201+
// not reusing each time since I need the Iterator state stored through request(n) and can have several in a queue
202+
PayloadFragmenter fragmenter = new PayloadFragmenter(Frame.METADATA_MTU, Frame.DATA_MTU);
203+
if (FrameType.NEXT_COMPLETE.equals(type)) {
204+
fragmenter.resetForResponseComplete(streamId, frame);
205+
} else {
206+
fragmenter.resetForResponse(streamId, frame);
207+
}
208+
209+
QueueDrainHelper.queueDrainLoop(parent.wip,
210+
/* fast-path if no contention when trying to emit */
211+
() -> {
212+
if (parent.sa.getRequested() == 0) {
213+
createQueueIfNecessary();
214+
q.add(frame);
215+
return;
216+
}
217+
if (q == null || q.size() == 0) {
218+
long r = parent.sa.getRequested();
219+
long emitted = 0;
220+
// emit as much of iterable as requested allows
221+
for (int i = 0; i < r; i++) { // TODO limit so we don't head-of-line block
222+
if (fragmenter.hasNext()) {
223+
parent.downstream.onNext(fragmenter.next());
224+
emitted++;
225+
} else {
226+
break;
227+
}
228+
parent.sa.produced(emitted);
229+
outstanding.addAndGet(-emitted);
230+
InnerSubscriber.this.requestMoreIfNeeded();
231+
}
232+
if (fragmenter.hasNext()) {
233+
// not finished so enqueue
234+
createQueueIfNecessary();
235+
q.add(fragmenter);
236+
}
237+
} else {
238+
// enqueue, then drain if there are already things in the queue
239+
q.add(fragmenter);
240+
drainRequestedAndRequestMoreIfNeeded();
241+
}
242+
} ,
243+
/* if contended, then just enqueue */
244+
() -> {
245+
createQueueIfNecessary();
246+
q.add(fragmenter);
247+
} ,
248+
/* if another thread enqueued while emitting above, then this will have a chance to drain after */
249+
() -> {
250+
drainRequestedAndRequestMoreIfNeeded();
251+
});
252+
}
253+
254+
private void createQueueIfNecessary() {
255+
if(q == null) {
256+
q = new OneToOneConcurrentArrayQueue<Object>(BATCH);
257+
}
258+
}
259+
260+
private void drainRequestedAndRequestMoreIfNeeded() {
261+
long emitted = drain(parent.sa.getRequested());
262+
parent.sa.produced(emitted);
263+
outstanding.addAndGet(-emitted);
264+
InnerSubscriber.this.requestMoreIfNeeded();
265+
}
266+
267+
public long drain(long maxToDrain) {
268+
if(q == null) {
269+
return 0;
270+
}
271+
long emitted = 0;
272+
while (emitted < maxToDrain) {
273+
Object o = q.peek();
274+
if (o == null) {
275+
break;
276+
}
277+
if (o instanceof Frame) {
278+
parent.downstream.onNext((Frame) o);
279+
emitted++;
280+
q.poll(); // pop it off the queue
281+
} else if (o == COMPLETED) {
282+
parent.toRemove.add(InnerSubscriber.this);
283+
break;
284+
} else {
285+
@SuppressWarnings("unchecked")
286+
Iterator<Frame> ifs = (Iterator<Frame>) o;
287+
while (ifs.hasNext()) {
288+
parent.downstream.onNext(ifs.next());
289+
emitted++;
290+
if (emitted == maxToDrain) {
291+
break;
292+
}
293+
}
294+
if (!ifs.hasNext()) {
295+
// finished, so remove it
296+
q.poll();
297+
}
298+
}
299+
}
300+
return emitted;
301+
}
302+
303+
private void requestMoreIfNeeded() {
304+
long current = outstanding.get();
305+
if (current < 20) {
306+
long d = BATCH - current;
307+
_s.request(d);
308+
outstanding.addAndGet(d);
309+
}
310+
}
311+
312+
@Override
313+
public void onError(Throwable t) {
314+
parent.sa.cancel();
315+
parent.downstream.onError(t);
316+
parent.subscribers.remove(this);
317+
}
318+
319+
@Override
320+
public void onComplete() {
321+
if (q == null || q.size() == 0) {
322+
parent.subscribers.remove(this);
323+
} else {
324+
q.add(COMPLETED);
325+
}
326+
}
55327
}
56328

57329
}

src/main/java/io/reactivesocket/internal/Requester.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import io.reactivesocket.exceptions.CancelException;
3838
import io.reactivesocket.exceptions.Exceptions;
3939
import io.reactivesocket.exceptions.Retryable;
40+
import io.reactivesocket.internal.frame.FrameHeaderFlyweight;
41+
import io.reactivesocket.internal.frame.PayloadBuilder;
4042
import io.reactivesocket.internal.frame.RequestFrameFlyweight;
4143
import io.reactivesocket.internal.rx.BackpressureUtils;
4244
import io.reactivesocket.internal.rx.EmptyDisposable;
@@ -644,6 +646,7 @@ private final static class StreamInputSubscriber implements Subscriber<Frame> {
644646
private final Subscriber<? super Payload> child;
645647
private final Runnable cancelAction;
646648
private final AtomicReference<Subscription> requestStreamSubscription;
649+
private PayloadBuilder payloadBuilder; // created if fragmented
647650

648651
public StreamInputSubscriber(int streamId, long threshold, AtomicLong outstanding, AtomicLong requested, UnicastSubject<Frame> writer, Subscriber<? super Payload> child, Runnable cancelAction) {
649652
this.streamId = streamId;
@@ -675,6 +678,27 @@ public void onSubscribe(Subscription s) {
675678

676679
@Override
677680
public void onNext(Frame frame) {
681+
if (FrameHeaderFlyweight.FLAGS_RESPONSE_F == (frame.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F)) {
682+
// fragment
683+
if(payloadBuilder == null) {
684+
payloadBuilder = new PayloadBuilder();
685+
}
686+
}
687+
if(payloadBuilder != null) {
688+
payloadBuilder.append(frame);
689+
if (FrameHeaderFlyweight.FLAGS_RESPONSE_F != (frame.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F)) {
690+
// no more fragments, but we have a PayloadBuilder, so this is the final Frame to be reassembled
691+
Payload payload = payloadBuilder.payload();
692+
Frame assembled = Frame.Response.from(streamId, frame.getType(), payload);
693+
// replace 'frame' with the assembled one
694+
frame = assembled;
695+
payloadBuilder = null;
696+
} else {
697+
// it was a fragment, so return without further processing
698+
return;
699+
}
700+
}
701+
678702
FrameType type = frame.getType();
679703
// convert ERROR messages into terminal events
680704
if (type == FrameType.NEXT_COMPLETE) {

0 commit comments

Comments
 (0)