Skip to content

Commit 5ed0d2d

Browse files
committed
DATACMNS-836 - Consolidate Reactiver Wrapper type utilities.
We now expose utilities to determine which reactive wrapper types are available and methods to query whether a specific type supports single or multi value emission.
1 parent 20647db commit 5ed0d2d

File tree

3 files changed

+147
-41
lines changed

3 files changed

+147
-41
lines changed

src/main/java/org/springframework/data/repository/query/ReactiveWrapperConverters.java

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package org.springframework.data.repository.query;
1717

18+
import static org.springframework.data.repository.query.ReactiveWrappers.*;
19+
1820
import java.util.ArrayList;
1921
import java.util.List;
2022
import java.util.Optional;
@@ -40,13 +42,6 @@
4042
*/
4143
public abstract class ReactiveWrapperConverters {
4244

43-
private static final boolean PROJECT_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux",
44-
QueryExecutionConverters.class.getClassLoader());
45-
private static final boolean RXJAVA_SINGLE_PRESENT = ClassUtils.isPresent("rx.Single",
46-
QueryExecutionConverters.class.getClassLoader());
47-
private static final boolean RXJAVA_OBSERVABLE_PRESENT = ClassUtils.isPresent("rx.Observable",
48-
QueryExecutionConverters.class.getClassLoader());
49-
5045
private static final List<AbstractReactiveWrapper<?>> REACTIVE_WRAPPERS = new ArrayList<>();
5146
private static final GenericConversionService GENERIC_CONVERSION_SERVICE = new GenericConversionService();
5247

@@ -58,12 +53,9 @@ public abstract class ReactiveWrapperConverters {
5853
REACTIVE_WRAPPERS.add(PublisherWrapper.INSTANCE);
5954
}
6055

61-
if (RXJAVA_SINGLE_PRESENT) {
62-
REACTIVE_WRAPPERS.add(SingleWrapper.INSTANCE);
63-
}
64-
65-
if (RXJAVA_OBSERVABLE_PRESENT) {
66-
REACTIVE_WRAPPERS.add(ObservableWrapper.INSTANCE);
56+
if (RXJAVA1_PRESENT) {
57+
REACTIVE_WRAPPERS.add(RxJava1SingleWrapper.INSTANCE);
58+
REACTIVE_WRAPPERS.add(RxJava1ObservableWrapper.INSTANCE);
6759
}
6860

6961
QueryExecutionConverters.registerConvertersIn(GENERIC_CONVERSION_SERVICE);
@@ -150,15 +142,15 @@ public static <T> T map(Object stream, Converter<Object, Object> converter) {
150142
}
151143

152144
private static Optional<AbstractReactiveWrapper<?>> assignableStream(Class<?> type) {
153-
145+
154146
Assert.notNull(type, "Type must not be null!");
155-
147+
156148
return findWrapper(wrapper -> ClassUtils.isAssignable(wrapper.getWrapperClass(), type));
157149
}
158150

159151
private static Optional<AbstractReactiveWrapper<?>> findWrapper(
160152
Predicate<? super AbstractReactiveWrapper<?>> predicate) {
161-
153+
162154
return REACTIVE_WRAPPERS.stream().filter(predicate).findFirst();
163155
}
164156

@@ -232,11 +224,11 @@ public Publisher<?> map(Object wrapper, Converter<Object, Object> converter) {
232224
}
233225
}
234226

235-
private static class SingleWrapper extends AbstractReactiveWrapper<Single<?>> {
227+
private static class RxJava1SingleWrapper extends AbstractReactiveWrapper<Single<?>> {
236228

237-
static final SingleWrapper INSTANCE = new SingleWrapper();
229+
static final RxJava1SingleWrapper INSTANCE = new RxJava1SingleWrapper();
238230

239-
private SingleWrapper() {
231+
private RxJava1SingleWrapper() {
240232
super(Single.class, Multiplicity.ONE);
241233
}
242234

@@ -246,11 +238,11 @@ public Single<?> map(Object wrapper, Converter<Object, Object> converter) {
246238
}
247239
}
248240

249-
private static class ObservableWrapper extends AbstractReactiveWrapper<Observable<?>> {
241+
private static class RxJava1ObservableWrapper extends AbstractReactiveWrapper<Observable<?>> {
250242

251-
static final ObservableWrapper INSTANCE = new ObservableWrapper();
243+
static final RxJava1ObservableWrapper INSTANCE = new RxJava1ObservableWrapper();
252244

253-
private ObservableWrapper() {
245+
private RxJava1ObservableWrapper() {
254246
super(Observable.class, Multiplicity.MANY);
255247
}
256248

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright 2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.data.repository.query;
18+
19+
import java.util.Collections;
20+
import java.util.HashSet;
21+
import java.util.Set;
22+
23+
import org.springframework.util.Assert;
24+
import org.springframework.util.ClassUtils;
25+
26+
import lombok.experimental.UtilityClass;
27+
import reactor.core.publisher.Flux;
28+
import reactor.core.publisher.Mono;
29+
import rx.Observable;
30+
import rx.Single;
31+
32+
/**
33+
* Utility class to expose availability of reactive wrapper types and checking which type supports single/multi-element
34+
* emission.
35+
*
36+
* @author Mark Paluch
37+
* @since 2.0
38+
* @see Single
39+
* @see Observable
40+
* @see Mono
41+
* @see Flux
42+
*/
43+
@UtilityClass
44+
public class ReactiveWrappers {
45+
46+
public static final boolean PROJECT_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux",
47+
ReactiveWrappers.class.getClassLoader());
48+
49+
public static final boolean RXJAVA1_PRESENT = ClassUtils.isPresent("rx.Single",
50+
ReactiveWrappers.class.getClassLoader());
51+
52+
private static final Set<Class<?>> SINGLE_TYPES;
53+
private static final Set<Class<?>> MULTI_TYPES;
54+
55+
static {
56+
57+
Set<Class<?>> singleTypes = new HashSet<>();
58+
Set<Class<?>> multiTypes = new HashSet<>();
59+
60+
if (RXJAVA1_PRESENT) {
61+
singleTypes.add(getRxJava1SingleClass());
62+
multiTypes.add(getRxJava1ObservableClass());
63+
}
64+
65+
if (PROJECT_REACTOR_PRESENT) {
66+
singleTypes.add(getReactorMonoClass());
67+
multiTypes.add(getReactorFluxClass());
68+
}
69+
70+
SINGLE_TYPES = Collections.unmodifiableSet(singleTypes);
71+
MULTI_TYPES = Collections.unmodifiableSet(multiTypes);
72+
}
73+
74+
/**
75+
* Returns {@literal true} if {@code theClass} is a reactive wrapper type for single element emission.
76+
*
77+
* @param theClass must not be {@literal null}.
78+
* @return {@literal true} if {@code theClass} is a reactive wrapper type for single element emission
79+
*/
80+
public static boolean isSingleType(Class<?> theClass) {
81+
82+
Assert.notNull(theClass, "Class type must not be null!");
83+
84+
return isAssignable(SINGLE_TYPES, theClass);
85+
}
86+
87+
/**
88+
* Returns {@literal true} if {@code theClass} is a reactive wrapper type supporting emission of {@code 0..N}
89+
* elements.
90+
*
91+
* @param theClass must not be {@literal null}.
92+
* @return {@literal true} if {@code theClass} is a reactive wrapper type supporting emission of {@code 0..N}
93+
* elements.
94+
*/
95+
public static boolean isMultiType(Class<?> theClass) {
96+
97+
Assert.notNull(theClass, "Class type must not be null!");
98+
99+
return isAssignable(MULTI_TYPES, theClass);
100+
}
101+
102+
private static boolean isAssignable(Iterable<Class<?>> lhsTypes, Class<?> rhsType) {
103+
104+
for (Class<?> type : lhsTypes) {
105+
if (org.springframework.util.ClassUtils.isAssignable(type, rhsType)) {
106+
return true;
107+
}
108+
}
109+
110+
return false;
111+
}
112+
113+
private static Class<?> getRxJava1SingleClass() {
114+
return Single.class;
115+
}
116+
117+
private static Class<?> getRxJava1ObservableClass() {
118+
return Observable.class;
119+
}
120+
121+
private static Class<?> getReactorMonoClass() {
122+
return Mono.class;
123+
}
124+
125+
private static Class<?> getReactorFluxClass() {
126+
return Flux.class;
127+
}
128+
}

src/main/java/org/springframework/data/repository/util/QueryExecutionConverters.java

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import rx.Completable;
4545
import rx.Observable;
4646
import rx.Single;
47-
import scala.Option;
4847

4948
/**
5049
* Converters to potentially wrap the execution of a repository method into a variety of wrapper types potentially being
@@ -83,11 +82,7 @@ public abstract class QueryExecutionConverters {
8382

8483
private static final boolean PROJECT_REACTOR_PRESENT = ClassUtils.isPresent("reactor.adapter.RxJava1Adapter",
8584
QueryExecutionConverters.class.getClassLoader());
86-
private static final boolean RXJAVA_SINGLE_PRESENT = ClassUtils.isPresent("rx.Single",
87-
QueryExecutionConverters.class.getClassLoader());
88-
private static final boolean RXJAVA_OBSERVABLE_PRESENT = ClassUtils.isPresent("rx.Observable",
89-
QueryExecutionConverters.class.getClassLoader());
90-
private static final boolean RXJAVA_COMPLETABLE_PRESENT = ClassUtils.isPresent("rx.Completable",
85+
private static final boolean RXJAVA1_PRESENT = ClassUtils.isPresent("rx.Completable",
9186
QueryExecutionConverters.class.getClassLoader());
9287

9388
private static final Set<Class<?>> WRAPPER_TYPES = new HashSet<Class<?>>();
@@ -130,15 +125,9 @@ public abstract class QueryExecutionConverters {
130125
WRAPPER_TYPES.add(Flux.class);
131126
}
132127

133-
if (RXJAVA_SINGLE_PRESENT) {
128+
if (RXJAVA1_PRESENT) {
134129
WRAPPER_TYPES.add(Single.class);
135-
}
136-
137-
if (RXJAVA_COMPLETABLE_PRESENT) {
138130
WRAPPER_TYPES.add(Completable.class);
139-
}
140-
141-
if (RXJAVA_OBSERVABLE_PRESENT) {
142131
WRAPPER_TYPES.add(Observable.class);
143132
}
144133
}
@@ -209,20 +198,17 @@ public static void registerConvertersIn(ConfigurableConversionService conversion
209198

210199
if (PROJECT_REACTOR_PRESENT) {
211200

212-
if (RXJAVA_COMPLETABLE_PRESENT) {
201+
if (RXJAVA1_PRESENT) {
202+
213203
conversionService.addConverter(PublisherToCompletableConverter.INSTANCE);
214204
conversionService.addConverter(CompletableToPublisherConverter.INSTANCE);
215205
conversionService.addConverter(CompletableToMonoConverter.INSTANCE);
216-
}
217206

218-
if (RXJAVA_SINGLE_PRESENT) {
219207
conversionService.addConverter(PublisherToSingleConverter.INSTANCE);
220208
conversionService.addConverter(SingleToPublisherConverter.INSTANCE);
221209
conversionService.addConverter(SingleToMonoConverter.INSTANCE);
222210
conversionService.addConverter(SingleToFluxConverter.INSTANCE);
223-
}
224211

225-
if (RXJAVA_OBSERVABLE_PRESENT) {
226212
conversionService.addConverter(PublisherToObservableConverter.INSTANCE);
227213
conversionService.addConverter(ObservableToPublisherConverter.INSTANCE);
228214
conversionService.addConverter(ObservableToMonoConverter.INSTANCE);
@@ -233,7 +219,7 @@ public static void registerConvertersIn(ConfigurableConversionService conversion
233219
conversionService.addConverter(PublisherToFluxConverter.INSTANCE);
234220
}
235221

236-
if (RXJAVA_SINGLE_PRESENT && RXJAVA_OBSERVABLE_PRESENT) {
222+
if (RXJAVA1_PRESENT) {
237223
conversionService.addConverter(SingleToObservableConverter.INSTANCE);
238224
conversionService.addConverter(ObservableToSingleConverter.INSTANCE);
239225
}

0 commit comments

Comments
 (0)