Skip to content

Commit 3295972

Browse files
Frame Related Classes in Separate Package
1 parent 6e26b76 commit 3295972

25 files changed

+100
-105
lines changed

src/main/java/io/reactivesocket/ConnectionSetupPayload.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
*/
1616
package io.reactivesocket;
1717

18-
import io.reactivesocket.internal.SetupFrameFlyweight;
19-
2018
import java.nio.ByteBuffer;
2119

20+
import io.reactivesocket.internal.frame.SetupFrameFlyweight;
21+
2222
/**
2323
* Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data
2424
*/

src/main/java/io/reactivesocket/Frame.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@
1616
package io.reactivesocket;
1717

1818
import io.reactivesocket.internal.*;
19+
import io.reactivesocket.internal.frame.ErrorFrameFlyweight;
20+
import io.reactivesocket.internal.frame.FrameHeaderFlyweight;
21+
import io.reactivesocket.internal.frame.FramePool;
22+
import io.reactivesocket.internal.frame.LeaseFrameFlyweight;
23+
import io.reactivesocket.internal.frame.RequestFrameFlyweight;
24+
import io.reactivesocket.internal.frame.RequestNFrameFlyweight;
25+
import io.reactivesocket.internal.frame.SetupFrameFlyweight;
26+
import io.reactivesocket.internal.frame.UnpooledFrame;
1927
import uk.co.real_logic.agrona.DirectBuffer;
2028
import uk.co.real_logic.agrona.MutableDirectBuffer;
2129

src/main/java/io/reactivesocket/exceptions/Exceptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import java.nio.ByteBuffer;
2121

22-
import static io.reactivesocket.internal.ErrorFrameFlyweight.*;
22+
import static io.reactivesocket.internal.frame.ErrorFrameFlyweight.*;
2323
import static java.nio.charset.StandardCharsets.UTF_8;
2424

2525
public class Exceptions {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket.internal;
17+
18+
import org.reactivestreams.Publisher;
19+
import org.reactivestreams.Subscriber;
20+
import org.reactivestreams.Subscription;
21+
22+
import io.reactivesocket.Frame;
23+
import io.reactivesocket.FrameType;
24+
import io.reactivesocket.Payload;
25+
import io.reactivesocket.internal.frame.PayloadFragmenter;
26+
27+
public class FragmentedPublisher implements Publisher<Frame> {
28+
29+
private final PayloadFragmenter fragmenter = new PayloadFragmenter(Frame.METADATA_MTU, Frame.DATA_MTU);
30+
private final Publisher<Payload> responsePublisher;
31+
private final int streamId;
32+
private final FrameType type;
33+
34+
public FragmentedPublisher(FrameType type, int streamId, Publisher<Payload> responsePublisher) {
35+
this.type = type;
36+
this.streamId = streamId;
37+
this.responsePublisher = responsePublisher;
38+
}
39+
40+
@Override
41+
public void subscribe(Subscriber<? super Frame> child) {
42+
child.onSubscribe(new Subscription() {
43+
44+
@Override
45+
public void request(long n) {
46+
// TODO Auto-generated method stub
47+
48+
}
49+
50+
@Override
51+
public void cancel() {
52+
// TODO Auto-generated method stub
53+
54+
}});
55+
}
56+
57+
}

src/main/java/io/reactivesocket/internal/Requester.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.reactivesocket.exceptions.CancelException;
3939
import io.reactivesocket.exceptions.Exceptions;
4040
import io.reactivesocket.exceptions.Retryable;
41+
import io.reactivesocket.internal.frame.RequestFrameFlyweight;
4142
import io.reactivesocket.internal.rx.BackpressureUtils;
4243
import io.reactivesocket.internal.rx.EmptyDisposable;
4344
import io.reactivesocket.internal.rx.EmptySubscription;

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.reactivestreams.Subscription;
2929

3030
import io.reactivesocket.exceptions.SetupException;
31+
import io.reactivesocket.internal.frame.SetupFrameFlyweight;
3132
import io.reactivesocket.internal.rx.EmptyDisposable;
3233
import io.reactivesocket.internal.rx.EmptySubscription;
3334
import io.reactivesocket.observable.Disposable;
@@ -243,6 +244,7 @@ public void onNext(Frame requestFrame) {
243244
final RejectedException exception = new RejectedException("No associated lease");
244245
responsePublisher = PublisherUtils.errorFrame(streamId, exception);
245246
}
247+
246248
connection.addOutput(responsePublisher, new Completable() {
247249

248250
@Override
@@ -333,7 +335,7 @@ public void request(long n) {
333335
if (n > 0 && started.compareAndSet(false, true)) {
334336
final int streamId = requestFrame.getStreamId();
335337

336-
requestHandler.handleRequestResponse(requestFrame).subscribe(new Subscriber<Payload>() {
338+
new FragmentedPublisher(FrameType.NEXT_COMPLETE, streamId, requestHandler.handleRequestResponse(requestFrame)).subscribe(new Subscriber<Frame>() {
337339

338340
// event emission is serialized so this doesn't need to be atomic
339341
int count = 0;
@@ -349,12 +351,11 @@ public void onSubscribe(Subscription s) {
349351
}
350352

351353
@Override
352-
public void onNext(Payload v) {
354+
public void onNext(Frame v) {
353355
if (++count > 1) {
354356
onError(new IllegalStateException("RequestResponse expects a single onNext"));
355357
} else {
356-
357-
child.onNext(Frame.Response.from(streamId, FrameType.NEXT_COMPLETE, v));
358+
child.onNext(v);
358359
}
359360
}
360361

src/main/java/io/reactivesocket/internal/SubscriptionHelper.java

Lines changed: 0 additions & 79 deletions
This file was deleted.

src/main/java/io/reactivesocket/internal/ByteBufferUtil.java renamed to src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import java.nio.ByteBuffer;
1919

src/main/java/io/reactivesocket/internal/ErrorFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import io.reactivesocket.exceptions.*;

src/main/java/io/reactivesocket/internal/FrameHeaderFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,18 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.BitUtil;
2020
import uk.co.real_logic.agrona.DirectBuffer;
2121
import uk.co.real_logic.agrona.MutableDirectBuffer;
2222

23+
import static io.reactivesocket.internal.frame.ByteBufferUtil.*;
24+
2325
import java.nio.ByteBuffer;
2426
import java.nio.ByteOrder;
2527

26-
import static io.reactivesocket.internal.ByteBufferUtil.preservingSlice;
27-
2828
/**
2929
* Per connection frame flyweight.
3030
*

src/main/java/io/reactivesocket/internal/FramePool.java renamed to src/main/java/io/reactivesocket/internal/frame/FramePool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import uk.co.real_logic.agrona.MutableDirectBuffer;

src/main/java/io/reactivesocket/internal/KeepaliveFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.DirectBuffer;

src/main/java/io/reactivesocket/internal/LeaseFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.BitUtil;

src/main/java/io/reactivesocket/internal/PayloadBuilder.java renamed to src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import io.reactivesocket.Payload;

src/main/java/io/reactivesocket/internal/PayloadFragmenter.java renamed to src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import io.reactivesocket.FrameType;

src/main/java/io/reactivesocket/internal/PayloadReassembler.java renamed to src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import io.reactivesocket.Payload;
20+
2021
import org.reactivestreams.Subscriber;
2122
import org.reactivestreams.Subscription;
2223
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;

src/main/java/io/reactivesocket/internal/RequestFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.BitUtil;

src/main/java/io/reactivesocket/internal/RequestNFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.BitUtil;

src/main/java/io/reactivesocket/internal/SetupFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.BitUtil;

src/main/java/io/reactivesocket/internal/ThreadLocalFramePool.java renamed to src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import uk.co.real_logic.agrona.MutableDirectBuffer;

src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java renamed to src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import uk.co.real_logic.agrona.MutableDirectBuffer;

src/main/java/io/reactivesocket/internal/UnpooledFrame.java renamed to src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import uk.co.real_logic.agrona.MutableDirectBuffer;

src/test/java/io/reactivesocket/FrameTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@
2121
import java.util.concurrent.TimeUnit;
2222

2323
import io.reactivesocket.exceptions.RejectedException;
24-
import io.reactivesocket.internal.SetupFrameFlyweight;
24+
import io.reactivesocket.internal.frame.SetupFrameFlyweight;
25+
2526
import org.junit.Test;
2627
import org.junit.experimental.theories.DataPoint;
2728
import org.junit.experimental.theories.Theories;
2829
import org.junit.experimental.theories.Theory;
2930
import org.junit.runner.RunWith;
3031
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
3132

32-
import static io.reactivesocket.internal.ErrorFrameFlyweight.*;
33+
import static io.reactivesocket.internal.frame.ErrorFrameFlyweight.*;
3334
import static java.nio.charset.StandardCharsets.UTF_8;
3435

3536
@RunWith(Theories.class)

0 commit comments

Comments
 (0)