Skip to content

Commit a0e15b7

Browse files
committed
DATACMNS-836 - Add ReactiveWrapperConverters abstraction.
1 parent 89fef84 commit a0e15b7

File tree

5 files changed

+444
-49
lines changed

5 files changed

+444
-49
lines changed
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
* Copyright 2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.repository.query;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.Optional;
21+
import java.util.function.Predicate;
22+
23+
import org.reactivestreams.Publisher;
24+
import org.springframework.core.convert.converter.Converter;
25+
import org.springframework.core.convert.support.GenericConversionService;
26+
import org.springframework.data.repository.util.QueryExecutionConverters;
27+
import org.springframework.util.Assert;
28+
import org.springframework.util.ClassUtils;
29+
30+
import reactor.core.publisher.Flux;
31+
import reactor.core.publisher.Mono;
32+
import rx.Observable;
33+
import rx.Single;
34+
35+
/**
36+
* Conversion support for reactive wrapper types.
37+
*
38+
* @author Mark Paluch
39+
* @since 2.0
40+
*/
41+
public abstract class ReactiveWrapperConverters {
42+
43+
private static final boolean PROJECT_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.converter.DependencyUtils",
44+
QueryExecutionConverters.class.getClassLoader());
45+
private static final boolean RXJAVA_SINGLE_PRESENT = ClassUtils.isPresent("rx.Single",
46+
QueryExecutionConverters.class.getClassLoader());
47+
private static final boolean RXJAVA_OBSERVABLE_PRESENT = ClassUtils.isPresent("rx.Observable",
48+
QueryExecutionConverters.class.getClassLoader());
49+
50+
private static final List<AbstractReactiveWrapper<?>> REACTIVE_WRAPPERS = new ArrayList<>();
51+
private static final GenericConversionService GENERIC_CONVERSION_SERVICE = new GenericConversionService();
52+
53+
static {
54+
55+
if (PROJECT_REACTOR_PRESENT) {
56+
REACTIVE_WRAPPERS.add(FluxWrapper.INSTANCE);
57+
REACTIVE_WRAPPERS.add(MonoWrapper.INSTANCE);
58+
REACTIVE_WRAPPERS.add(PublisherWrapper.INSTANCE);
59+
}
60+
61+
if (RXJAVA_SINGLE_PRESENT) {
62+
REACTIVE_WRAPPERS.add(SingleWrapper.INSTANCE);
63+
}
64+
65+
if (RXJAVA_OBSERVABLE_PRESENT) {
66+
REACTIVE_WRAPPERS.add(ObservableWrapper.INSTANCE);
67+
}
68+
69+
QueryExecutionConverters.registerConvertersIn(GENERIC_CONVERSION_SERVICE);
70+
}
71+
72+
private ReactiveWrapperConverters() {
73+
74+
}
75+
76+
/**
77+
* Returns whether the given type is a supported wrapper type.
78+
*
79+
* @param type must not be {@literal null}.
80+
* @return
81+
*/
82+
public static boolean supports(Class<?> type) {
83+
return assignableStream(type).isPresent();
84+
}
85+
86+
/**
87+
* Returns whether the type is a single-like wrapper.
88+
*
89+
* @param type must not be {@literal null}.
90+
* @return
91+
* @see Single
92+
* @see Mono
93+
*/
94+
public static boolean isSingleLike(Class<?> type) {
95+
return assignableStream(type).map(wrapper -> wrapper.getMultiplicity() == Multiplicity.ONE).orElse(false);
96+
}
97+
98+
/**
99+
* Returns whether the type is a collection/multi-element-like wrapper.
100+
*
101+
* @param type must not be {@literal null}.
102+
* @return
103+
* @see Observable
104+
* @see Flux
105+
* @see Publisher
106+
*/
107+
public static boolean isCollectionLike(Class<?> type) {
108+
return assignableStream(type).map(wrapper -> wrapper.getMultiplicity() == Multiplicity.MANY).orElse(false);
109+
}
110+
111+
/**
112+
* Casts or converts the given wrapper type into a different wrapper type.
113+
*
114+
* @param stream the stream, must not be {@literal null}.
115+
* @param expectedWrapperType must not be {@literal null}.
116+
* @return
117+
*/
118+
public static <T> T toWrapper(Object stream, Class<? extends T> expectedWrapperType) {
119+
120+
Assert.notNull(stream, "Stream must not be null!");
121+
Assert.notNull(expectedWrapperType, "Converter must not be null!");
122+
123+
if (expectedWrapperType.isAssignableFrom(stream.getClass())) {
124+
return (T) stream;
125+
}
126+
127+
return GENERIC_CONVERSION_SERVICE.convert(stream, expectedWrapperType);
128+
}
129+
130+
/**
131+
* Maps elements of a reactive element stream to other elements.
132+
*
133+
* @param stream must not be {@literal null}.
134+
* @param converter must not be {@literal null}.
135+
* @return
136+
*/
137+
public static <T> T map(Object stream, Converter<Object, Object> converter) {
138+
139+
Assert.notNull(stream, "Stream must not be null!");
140+
Assert.notNull(converter, "Converter must not be null!");
141+
142+
for (AbstractReactiveWrapper<?> reactiveWrapper : REACTIVE_WRAPPERS) {
143+
144+
if (ClassUtils.isAssignable(reactiveWrapper.getWrapperClass(), stream.getClass())) {
145+
return (T) reactiveWrapper.map(stream, converter);
146+
}
147+
}
148+
149+
throw new IllegalStateException(String.format("Cannot apply converter to %s", stream));
150+
}
151+
152+
private static Optional<AbstractReactiveWrapper<?>> assignableStream(Class<?> type) {
153+
154+
Assert.notNull(type, "Type must not be null!");
155+
156+
return findWrapper(wrapper -> ClassUtils.isAssignable(wrapper.getWrapperClass(), type));
157+
}
158+
159+
private static Optional<AbstractReactiveWrapper<?>> findWrapper(
160+
Predicate<? super AbstractReactiveWrapper<?>> predicate) {
161+
162+
return REACTIVE_WRAPPERS.stream().filter(predicate).findFirst();
163+
}
164+
165+
private abstract static class AbstractReactiveWrapper<T> {
166+
167+
private final Class<? super T> wrapperClass;
168+
private final Multiplicity multiplicity;
169+
170+
public AbstractReactiveWrapper(Class<? super T> wrapperClass, Multiplicity multiplicity) {
171+
this.wrapperClass = wrapperClass;
172+
this.multiplicity = multiplicity;
173+
}
174+
175+
public Class<? super T> getWrapperClass() {
176+
return wrapperClass;
177+
}
178+
179+
public Multiplicity getMultiplicity() {
180+
return multiplicity;
181+
}
182+
183+
public abstract Object map(Object wrapper, Converter<Object, Object> converter);
184+
}
185+
186+
private static class MonoWrapper extends AbstractReactiveWrapper<Mono<?>> {
187+
188+
static final MonoWrapper INSTANCE = new MonoWrapper();
189+
190+
private MonoWrapper() {
191+
super(Mono.class, Multiplicity.ONE);
192+
}
193+
194+
public Mono<?> map(Object wrapper, Converter<Object, Object> converter) {
195+
return ((Mono<?>) wrapper).map(converter::convert);
196+
}
197+
}
198+
199+
private static class FluxWrapper extends AbstractReactiveWrapper<Flux<?>> {
200+
201+
static final FluxWrapper INSTANCE = new FluxWrapper();
202+
203+
private FluxWrapper() {
204+
super(Flux.class, Multiplicity.MANY);
205+
}
206+
207+
public Flux<?> map(Object wrapper, Converter<Object, Object> converter) {
208+
return ((Flux<?>) wrapper).map(converter::convert);
209+
}
210+
}
211+
212+
private static class PublisherWrapper extends AbstractReactiveWrapper<Publisher<?>> {
213+
214+
static final PublisherWrapper INSTANCE = new PublisherWrapper();
215+
216+
public PublisherWrapper() {
217+
super(Publisher.class, Multiplicity.MANY);
218+
}
219+
220+
@Override
221+
public Publisher<?> map(Object wrapper, Converter<Object, Object> converter) {
222+
223+
if (wrapper instanceof Mono) {
224+
return MonoWrapper.INSTANCE.map((Mono<?>) wrapper, converter);
225+
}
226+
227+
if (wrapper instanceof Flux) {
228+
return FluxWrapper.INSTANCE.map((Flux<?>) wrapper, converter);
229+
}
230+
231+
return FluxWrapper.INSTANCE.map(Flux.from((Publisher<?>) wrapper), converter);
232+
}
233+
}
234+
235+
private static class SingleWrapper extends AbstractReactiveWrapper<Single<?>> {
236+
237+
static final SingleWrapper INSTANCE = new SingleWrapper();
238+
239+
private SingleWrapper() {
240+
super(Single.class, Multiplicity.ONE);
241+
}
242+
243+
@Override
244+
public Single<?> map(Object wrapper, Converter<Object, Object> converter) {
245+
return ((Single<?>) wrapper).map(converter::convert);
246+
}
247+
}
248+
249+
private static class ObservableWrapper extends AbstractReactiveWrapper<Observable<?>> {
250+
251+
static final ObservableWrapper INSTANCE = new ObservableWrapper();
252+
253+
private ObservableWrapper() {
254+
super(Observable.class, Multiplicity.MANY);
255+
}
256+
257+
@Override
258+
public Observable<?> map(Object wrapper, Converter<Object, Object> converter) {
259+
return ((Observable<?>) wrapper).map(converter::convert);
260+
}
261+
}
262+
263+
private enum Multiplicity {
264+
ONE, MANY,
265+
}
266+
267+
}

src/main/java/org/springframework/data/repository/query/ResultProcessor.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717

1818
import lombok.NonNull;
1919
import lombok.RequiredArgsConstructor;
20-
import reactor.core.publisher.Flux;
21-
import reactor.core.publisher.Mono;
22-
import rx.Observable;
23-
import rx.Single;
2420

2521
import java.util.Arrays;
2622
import java.util.Collection;
@@ -30,14 +26,12 @@
3026
import java.util.function.Function;
3127
import java.util.stream.Stream;
3228

33-
import org.reactivestreams.Publisher;
3429
import org.springframework.core.CollectionFactory;
3530
import org.springframework.core.convert.ConversionService;
3631
import org.springframework.core.convert.converter.Converter;
3732
import org.springframework.core.convert.support.DefaultConversionService;
3833
import org.springframework.data.domain.Slice;
3934
import org.springframework.data.projection.ProjectionFactory;
40-
import org.springframework.data.repository.util.QueryExecutionConverters;
4135
import org.springframework.data.util.ReflectionUtils;
4236
import org.springframework.util.Assert;
4337

@@ -163,32 +157,8 @@ public <T> T processResult(Object source, Converter<Object, Object> preparingCon
163157
return (T) new StreamQueryResultHandler(type, converter).handle(source);
164158
}
165159

166-
if(QueryExecutionConverters.supports(source.getClass())){
167-
168-
// TODO: Perform mapping in a way that allows absence of wrapper types
169-
if (source instanceof Mono) {
170-
171-
Mono<?> mono = (Mono<?>) source;
172-
return (T) mono.map(o -> type.isInstance(o) ? o : converter.convert(o));
173-
}
174-
175-
if (source instanceof Flux) {
176-
177-
Flux<?> flux = (Flux<?>) source;
178-
return (T) flux.map(o -> type.isInstance(o) ? o : converter.convert(o));
179-
}
180-
181-
if (source instanceof Single) {
182-
183-
Single<?> single = (Single<?>) source;
184-
return (T) single.map(o -> type.isInstance(o) ? o : converter.convert(o));
185-
}
186-
187-
if (source instanceof Observable) {
188-
189-
Observable<?> observable = (Observable<?>) source;
190-
return (T) observable.map(o -> type.isInstance(o) ? o : converter.convert(o));
191-
}
160+
if(ReactiveWrapperConverters.supports(source.getClass())){
161+
return (T) ReactiveWrapperConverters.map(source, o -> type.isInstance(o) ? o : converter.convert(o));
192162
}
193163

194164
return (T) converter.convert(source);

0 commit comments

Comments
 (0)