Skip to content

Commit 9649845

Browse files
Merge pull request #59 from ReactiveSocket/reorg
Rx Concat and Package ReOrg
2 parents 8fa38c5 + b3d27e0 commit 9649845

File tree

68 files changed

+2088
-66
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+2088
-66
lines changed

src/main/java/io/reactivesocket/ConnectionSetupPayload.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
*/
1616
package io.reactivesocket;
1717

18-
import io.reactivesocket.internal.SetupFrameFlyweight;
19-
2018
import java.nio.ByteBuffer;
2119

20+
import io.reactivesocket.internal.frame.SetupFrameFlyweight;
21+
2222
/**
2323
* Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data
2424
*/

src/main/java/io/reactivesocket/DuplexConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
import org.reactivestreams.Publisher;
2121

22-
import io.reactivesocket.observable.Observable;
22+
import io.reactivesocket.rx.Completable;
23+
import io.reactivesocket.rx.Observable;
2324

2425
/**
2526
* Represents a connection with input/output that the protocol uses.

src/main/java/io/reactivesocket/Frame.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@
1616
package io.reactivesocket;
1717

1818
import io.reactivesocket.internal.*;
19+
import io.reactivesocket.internal.frame.ErrorFrameFlyweight;
20+
import io.reactivesocket.internal.frame.FrameHeaderFlyweight;
21+
import io.reactivesocket.internal.frame.FramePool;
22+
import io.reactivesocket.internal.frame.LeaseFrameFlyweight;
23+
import io.reactivesocket.internal.frame.RequestFrameFlyweight;
24+
import io.reactivesocket.internal.frame.RequestNFrameFlyweight;
25+
import io.reactivesocket.internal.frame.SetupFrameFlyweight;
26+
import io.reactivesocket.internal.frame.UnpooledFrame;
1927
import uk.co.real_logic.agrona.DirectBuffer;
2028
import uk.co.real_logic.agrona.MutableDirectBuffer;
2129

src/main/java/io/reactivesocket/LeaseGovernor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.reactivesocket;
22

33
import io.reactivesocket.internal.Responder;
4+
import io.reactivesocket.lease.NullLeaseGovernor;
5+
import io.reactivesocket.lease.UnlimitedLeaseGovernor;
46

57
public interface LeaseGovernor {
68
public static final LeaseGovernor NULL_LEASE_GOVERNOR = new NullLeaseGovernor();

src/main/java/io/reactivesocket/ReactiveSocket.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@
2626
import org.reactivestreams.Subscriber;
2727
import org.reactivestreams.Subscription;
2828

29-
import io.reactivesocket.internal.CompositeCompletable;
30-
import io.reactivesocket.internal.CompositeDisposable;
3129
import io.reactivesocket.internal.Requester;
3230
import io.reactivesocket.internal.Responder;
33-
import io.reactivesocket.observable.Disposable;
34-
import io.reactivesocket.observable.Observable;
35-
import io.reactivesocket.observable.Observer;
31+
import io.reactivesocket.internal.rx.CompositeCompletable;
32+
import io.reactivesocket.internal.rx.CompositeDisposable;
33+
import io.reactivesocket.rx.Completable;
34+
import io.reactivesocket.rx.Disposable;
35+
import io.reactivesocket.rx.Observable;
36+
import io.reactivesocket.rx.Observer;
3637
import uk.co.real_logic.agrona.BitUtil;
3738

3839
/**

src/main/java/io/reactivesocket/exceptions/Exceptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import java.nio.ByteBuffer;
2121

22-
import static io.reactivesocket.internal.ErrorFrameFlyweight.*;
22+
import static io.reactivesocket.internal.frame.ErrorFrameFlyweight.*;
2323
import static java.nio.charset.StandardCharsets.UTF_8;
2424

2525
public class Exceptions {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket.internal;
17+
18+
import org.reactivestreams.Publisher;
19+
import org.reactivestreams.Subscriber;
20+
import org.reactivestreams.Subscription;
21+
22+
import io.reactivesocket.Frame;
23+
import io.reactivesocket.FrameType;
24+
import io.reactivesocket.Payload;
25+
import io.reactivesocket.internal.frame.PayloadFragmenter;
26+
27+
public class FragmentedPublisher implements Publisher<Frame> {
28+
29+
private final PayloadFragmenter fragmenter = new PayloadFragmenter(Frame.METADATA_MTU, Frame.DATA_MTU);
30+
private final Publisher<Payload> responsePublisher;
31+
private final int streamId;
32+
private final FrameType type;
33+
34+
public FragmentedPublisher(FrameType type, int streamId, Publisher<Payload> responsePublisher) {
35+
this.type = type;
36+
this.streamId = streamId;
37+
this.responsePublisher = responsePublisher;
38+
}
39+
40+
@Override
41+
public void subscribe(Subscriber<? super Frame> child) {
42+
child.onSubscribe(new Subscription() {
43+
44+
@Override
45+
public void request(long n) {
46+
// TODO Auto-generated method stub
47+
48+
}
49+
50+
@Override
51+
public void cancel() {
52+
// TODO Auto-generated method stub
53+
54+
}});
55+
}
56+
57+
}

src/main/java/io/reactivesocket/internal/PublisherUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727

2828
import io.reactivesocket.Frame;
2929
import io.reactivesocket.Payload;
30+
import io.reactivesocket.internal.rx.BackpressureHelper;
31+
import io.reactivesocket.internal.rx.BackpressureUtils;
32+
import io.reactivesocket.internal.rx.EmptySubscription;
33+
import io.reactivesocket.internal.rx.SubscriptionHelper;
3034

3135
public class PublisherUtils {
3236

src/main/java/io/reactivesocket/internal/Requester.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.reactivestreams.Subscriber;
3030
import org.reactivestreams.Subscription;
3131

32-
import io.reactivesocket.Completable;
3332
import io.reactivesocket.ConnectionSetupPayload;
3433
import io.reactivesocket.DuplexConnection;
3534
import io.reactivesocket.Frame;
@@ -38,8 +37,13 @@
3837
import io.reactivesocket.exceptions.CancelException;
3938
import io.reactivesocket.exceptions.Exceptions;
4039
import io.reactivesocket.exceptions.Retryable;
41-
import io.reactivesocket.observable.Disposable;
42-
import io.reactivesocket.observable.Observer;
40+
import io.reactivesocket.internal.frame.RequestFrameFlyweight;
41+
import io.reactivesocket.internal.rx.BackpressureUtils;
42+
import io.reactivesocket.internal.rx.EmptyDisposable;
43+
import io.reactivesocket.internal.rx.EmptySubscription;
44+
import io.reactivesocket.rx.Completable;
45+
import io.reactivesocket.rx.Disposable;
46+
import io.reactivesocket.rx.Observer;
4347
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;
4448

4549
/**

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@
2828
import org.reactivestreams.Subscription;
2929

3030
import io.reactivesocket.exceptions.SetupException;
31-
import io.reactivesocket.observable.Disposable;
32-
import io.reactivesocket.observable.Observer;
31+
import io.reactivesocket.internal.frame.SetupFrameFlyweight;
32+
import io.reactivesocket.internal.rx.EmptyDisposable;
33+
import io.reactivesocket.internal.rx.EmptySubscription;
34+
import io.reactivesocket.rx.Completable;
35+
import io.reactivesocket.rx.Disposable;
36+
import io.reactivesocket.rx.Observer;
3337
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;
3438

3539
/**
@@ -241,6 +245,7 @@ public void onNext(Frame requestFrame) {
241245
final RejectedException exception = new RejectedException("No associated lease");
242246
responsePublisher = PublisherUtils.errorFrame(streamId, exception);
243247
}
248+
244249
connection.addOutput(responsePublisher, new Completable() {
245250

246251
@Override

src/main/java/io/reactivesocket/internal/ByteBufferUtil.java renamed to src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import java.nio.ByteBuffer;
1919

src/main/java/io/reactivesocket/internal/ErrorFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import io.reactivesocket.exceptions.*;

src/main/java/io/reactivesocket/internal/FrameHeaderFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,18 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.BitUtil;
2020
import uk.co.real_logic.agrona.DirectBuffer;
2121
import uk.co.real_logic.agrona.MutableDirectBuffer;
2222

23+
import static io.reactivesocket.internal.frame.ByteBufferUtil.*;
24+
2325
import java.nio.ByteBuffer;
2426
import java.nio.ByteOrder;
2527

26-
import static io.reactivesocket.internal.ByteBufferUtil.preservingSlice;
27-
2828
/**
2929
* Per connection frame flyweight.
3030
*

src/main/java/io/reactivesocket/internal/FramePool.java renamed to src/main/java/io/reactivesocket/internal/frame/FramePool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import uk.co.real_logic.agrona.MutableDirectBuffer;

src/main/java/io/reactivesocket/internal/KeepaliveFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.DirectBuffer;

src/main/java/io/reactivesocket/internal/LeaseFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.BitUtil;

src/main/java/io/reactivesocket/internal/PayloadBuilder.java renamed to src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import io.reactivesocket.Payload;

src/main/java/io/reactivesocket/internal/PayloadFragmenter.java renamed to src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import io.reactivesocket.FrameType;

src/main/java/io/reactivesocket/internal/PayloadReassembler.java renamed to src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import io.reactivesocket.Payload;
20+
2021
import org.reactivestreams.Subscriber;
2122
import org.reactivestreams.Subscription;
2223
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;

src/main/java/io/reactivesocket/internal/RequestFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.BitUtil;

src/main/java/io/reactivesocket/internal/RequestNFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.BitUtil;

src/main/java/io/reactivesocket/internal/SetupFrameFlyweight.java renamed to src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.FrameType;
1919
import uk.co.real_logic.agrona.BitUtil;

src/main/java/io/reactivesocket/internal/ThreadLocalFramePool.java renamed to src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import uk.co.real_logic.agrona.MutableDirectBuffer;

src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java renamed to src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import uk.co.real_logic.agrona.MutableDirectBuffer;

src/main/java/io/reactivesocket/internal/UnpooledFrame.java renamed to src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.frame;
1717

1818
import io.reactivesocket.Frame;
1919
import uk.co.real_logic.agrona.MutableDirectBuffer;

0 commit comments

Comments
 (0)