Skip to content

Commit e1a63f5

Browse files
committed
DATACMNS-836 - Add support for RxJava2.
1 parent ac45348 commit e1a63f5

File tree

6 files changed

+685
-41
lines changed

6 files changed

+685
-41
lines changed

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
<properties>
1919
<scala>2.11.7</scala>
2020
<xmlbeam>1.4.8</xmlbeam>
21+
<rxjava>1.1.9</rxjava>
2122
</properties>
2223

2324
<dependencies>
@@ -113,6 +114,13 @@
113114
<optional>true</optional>
114115
</dependency>
115116

117+
<dependency>
118+
<groupId>io.reactivex.rxjava2</groupId>
119+
<artifactId>rxjava</artifactId>
120+
<version>${rxjava2}</version>
121+
<optional>true</optional>
122+
</dependency>
123+
116124
<!-- Querydsl -->
117125

118126
<dependency>

src/main/java/org/springframework/data/repository/query/ReactiveWrapperConverters.java

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,24 @@ public abstract class ReactiveWrapperConverters {
4747

4848
static {
4949

50+
if (RXJAVA1_PRESENT) {
51+
REACTIVE_WRAPPERS.add(RxJava1SingleWrapper.INSTANCE);
52+
REACTIVE_WRAPPERS.add(RxJava1ObservableWrapper.INSTANCE);
53+
}
54+
55+
if (RXJAVA2_PRESENT) {
56+
REACTIVE_WRAPPERS.add(RxJava2SingleWrapper.INSTANCE);
57+
REACTIVE_WRAPPERS.add(RxJava2MaybeWrapper.INSTANCE);
58+
REACTIVE_WRAPPERS.add(RxJava2ObservableWrapper.INSTANCE);
59+
REACTIVE_WRAPPERS.add(RxJava2FlowableWrapper.INSTANCE);
60+
}
61+
5062
if (PROJECT_REACTOR_PRESENT) {
5163
REACTIVE_WRAPPERS.add(FluxWrapper.INSTANCE);
5264
REACTIVE_WRAPPERS.add(MonoWrapper.INSTANCE);
5365
REACTIVE_WRAPPERS.add(PublisherWrapper.INSTANCE);
5466
}
5567

56-
if (RXJAVA1_PRESENT) {
57-
REACTIVE_WRAPPERS.add(RxJava1SingleWrapper.INSTANCE);
58-
REACTIVE_WRAPPERS.add(RxJava1ObservableWrapper.INSTANCE);
59-
}
60-
6168
QueryExecutionConverters.registerConvertersIn(GENERIC_CONVERSION_SERVICE);
6269
}
6370

@@ -126,6 +133,7 @@ public static <T> T toWrapper(Object stream, Class<? extends T> expectedWrapperT
126133
* @param converter must not be {@literal null}.
127134
* @return
128135
*/
136+
@SuppressWarnings("unchecked")
129137
public static <T> T map(Object stream, Converter<Object, Object> converter) {
130138

131139
Assert.notNull(stream, "Stream must not be null!");
@@ -252,6 +260,62 @@ public Observable<?> map(Object wrapper, Converter<Object, Object> converter) {
252260
}
253261
}
254262

263+
private static class RxJava2SingleWrapper extends AbstractReactiveWrapper<io.reactivex.Single<?>> {
264+
265+
static final RxJava2SingleWrapper INSTANCE = new RxJava2SingleWrapper();
266+
267+
private RxJava2SingleWrapper() {
268+
super(io.reactivex.Single.class, Multiplicity.ONE);
269+
}
270+
271+
@Override
272+
public io.reactivex.Single<?> map(Object wrapper, Converter<Object, Object> converter) {
273+
return ((io.reactivex.Single<?>) wrapper).map(converter::convert);
274+
}
275+
}
276+
277+
private static class RxJava2MaybeWrapper extends AbstractReactiveWrapper<io.reactivex.Maybe<?>> {
278+
279+
static final RxJava2MaybeWrapper INSTANCE = new RxJava2MaybeWrapper();
280+
281+
private RxJava2MaybeWrapper() {
282+
super(io.reactivex.Maybe.class, Multiplicity.MANY);
283+
}
284+
285+
@Override
286+
public io.reactivex.Maybe<?> map(Object wrapper, Converter<Object, Object> converter) {
287+
return ((io.reactivex.Maybe<?>) wrapper).map(converter::convert);
288+
}
289+
}
290+
291+
private static class RxJava2ObservableWrapper extends AbstractReactiveWrapper<io.reactivex.Observable<?>> {
292+
293+
static final RxJava2ObservableWrapper INSTANCE = new RxJava2ObservableWrapper();
294+
295+
private RxJava2ObservableWrapper() {
296+
super(io.reactivex.Observable.class, Multiplicity.MANY);
297+
}
298+
299+
@Override
300+
public io.reactivex.Observable<?> map(Object wrapper, Converter<Object, Object> converter) {
301+
return ((io.reactivex.Observable<?>) wrapper).map(converter::convert);
302+
}
303+
}
304+
305+
private static class RxJava2FlowableWrapper extends AbstractReactiveWrapper<io.reactivex.Flowable<?>> {
306+
307+
static final RxJava2FlowableWrapper INSTANCE = new RxJava2FlowableWrapper();
308+
309+
private RxJava2FlowableWrapper() {
310+
super(io.reactivex.Flowable.class, Multiplicity.MANY);
311+
}
312+
313+
@Override
314+
public io.reactivex.Flowable<?> map(Object wrapper, Converter<Object, Object> converter) {
315+
return ((io.reactivex.Flowable<?>) wrapper).map(converter::convert);
316+
}
317+
}
318+
255319
private enum Multiplicity {
256320
ONE, MANY,
257321
}

src/main/java/org/springframework/data/repository/query/ReactiveWrappers.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public class ReactiveWrappers {
4949
public static final boolean RXJAVA1_PRESENT = ClassUtils.isPresent("rx.Single",
5050
ReactiveWrappers.class.getClassLoader());
5151

52+
public static final boolean RXJAVA2_PRESENT = ClassUtils.isPresent("io.reactivex.Single",
53+
ReactiveWrappers.class.getClassLoader());
54+
5255
private static final Set<Class<?>> SINGLE_TYPES;
5356
private static final Set<Class<?>> MULTI_TYPES;
5457

@@ -62,6 +65,13 @@ public class ReactiveWrappers {
6265
multiTypes.add(getRxJava1ObservableClass());
6366
}
6467

68+
if (RXJAVA2_PRESENT) {
69+
singleTypes.add(getRxJava2SingleClass());
70+
multiTypes.add(getRxJava2ObservableClass());
71+
multiTypes.add(getRxJava2FlowableClass());
72+
multiTypes.add(getRxJava2MaybeClass());
73+
}
74+
6575
if (PROJECT_REACTOR_PRESENT) {
6676
singleTypes.add(getReactorMonoClass());
6777
multiTypes.add(getReactorFluxClass());
@@ -118,6 +128,22 @@ private static Class<?> getRxJava1ObservableClass() {
118128
return Observable.class;
119129
}
120130

131+
private static Class<?> getRxJava2SingleClass() {
132+
return io.reactivex.Single.class;
133+
}
134+
135+
private static Class<?> getRxJava2ObservableClass() {
136+
return io.reactivex.Observable.class;
137+
}
138+
139+
private static Class<?> getRxJava2FlowableClass() {
140+
return io.reactivex.Flowable.class;
141+
}
142+
143+
private static Class<?> getRxJava2MaybeClass() {
144+
return io.reactivex.Maybe.class;
145+
}
146+
121147
private static Class<?> getReactorMonoClass() {
122148
return Mono.class;
123149
}

0 commit comments

Comments
 (0)