Skip to content

Commit 1c006fb

Browse files
committed
DATACMNS-836 - Cleanups.
- Rename RxJava...Repository to RxJava1...Repository - Use Completable and Observable instead of Single for results without values/optional values - Remove reactive paging for now as it does not really fit reactive data streaming - Expose ReactiveWrappers.isAvailable(ReactiveLibrary) method to query library availability
1 parent 33750a6 commit 1c006fb

File tree

9 files changed

+119
-65
lines changed

9 files changed

+119
-65
lines changed
Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,24 @@
1818
import java.io.Serializable;
1919

2020
import org.reactivestreams.Publisher;
21-
import org.springframework.data.domain.Page;
22-
import org.springframework.data.domain.Pageable;
2321
import org.springframework.data.domain.Sort;
2422
import org.springframework.data.repository.NoRepositoryBean;
2523

2624
import reactor.core.publisher.Flux;
2725
import reactor.core.publisher.Mono;
2826

2927
/**
30-
* Extension of {@link ReactiveCrudRepository} to provide additional methods to retrieve entities using the pagination
31-
* and sorting abstraction.
28+
* Extension of {@link ReactiveCrudRepository} to provide additional methods to retrieve entities using the sorting
29+
* abstraction.
3230
*
3331
* @author Mark Paluch
3432
* @since 2.0
3533
* @see Sort
36-
* @see Pageable
3734
* @see Mono
3835
* @see Flux
3936
*/
4037
@NoRepositoryBean
41-
public interface ReactivePagingAndSortingRepository<T, ID extends Serializable> extends ReactiveCrudRepository<T, ID> {
38+
public interface ReactiveSortingRepository<T, ID extends Serializable> extends ReactiveCrudRepository<T, ID> {
4239

4340
/*
4441
* (non-Javadoc)
@@ -68,12 +65,4 @@ public interface ReactivePagingAndSortingRepository<T, ID extends Serializable>
6865
* @return all entities sorted by the given options
6966
*/
7067
Flux<T> findAll(Sort sort);
71-
72-
/**
73-
* Returns a {@link Page} of entities meeting the paging restriction provided in the {@code Pageable} object.
74-
*
75-
* @param pageable
76-
* @return a page of entities
77-
*/
78-
Mono<Page<T>> findAll(Pageable pageable);
7968
}

src/main/java/org/springframework/data/repository/reactive/RxJavaCrudRepository.java renamed to src/main/java/org/springframework/data/repository/reactive/RxJava1CrudRepository.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.springframework.data.repository.NoRepositoryBean;
2222
import org.springframework.data.repository.Repository;
2323

24+
import rx.Completable;
2425
import rx.Observable;
2526
import rx.Single;
2627

@@ -34,7 +35,7 @@
3435
* @see Observable
3536
*/
3637
@NoRepositoryBean
37-
public interface RxJavaCrudRepository<T, ID extends Serializable> extends Repository<T, ID> {
38+
public interface RxJava1CrudRepository<T, ID extends Serializable> extends Repository<T, ID> {
3839

3940
/**
4041
* Saves a given entity. Use the returned instance for further operations as the save operation might have changed the
@@ -70,7 +71,7 @@ public interface RxJavaCrudRepository<T, ID extends Serializable> extends Reposi
7071
* @return the entity with the given id or {@literal null} if none found
7172
* @throws IllegalArgumentException if {@code id} is {@literal null}
7273
*/
73-
Single<T> findOne(ID id);
74+
Observable<T> findOne(ID id);
7475

7576
/**
7677
* Retrieves an entity by its id supplied by a {@link Single}.
@@ -79,7 +80,7 @@ public interface RxJavaCrudRepository<T, ID extends Serializable> extends Reposi
7980
* @return the entity with the given id or {@literal null} if none found
8081
* @throws IllegalArgumentException if {@code id} is {@literal null}
8182
*/
82-
Single<T> findOne(Single<ID> id);
83+
Observable<T> findOne(Single<ID> id);
8384

8485
/**
8586
* Returns whether an entity with the given id exists.
@@ -135,34 +136,34 @@ public interface RxJavaCrudRepository<T, ID extends Serializable> extends Reposi
135136
* @param id must not be {@literal null}.
136137
* @throws IllegalArgumentException in case the given {@code id} is {@literal null}
137138
*/
138-
Single<Void> delete(ID id);
139+
Completable delete(ID id);
139140

140141
/**
141142
* Deletes a given entity.
142143
*
143144
* @param entity
144145
* @throws IllegalArgumentException in case the given entity is {@literal null}.
145146
*/
146-
Single<Void> delete(T entity);
147+
Completable delete(T entity);
147148

148149
/**
149150
* Deletes the given entities.
150151
*
151152
* @param entities
152153
* @throws IllegalArgumentException in case the given {@link Iterable} is {@literal null}.
153154
*/
154-
Single<Void> delete(Iterable<? extends T> entities);
155+
Completable delete(Iterable<? extends T> entities);
155156

156157
/**
157158
* Deletes the given entities.
158159
*
159160
* @param entityStream
160161
* @throws IllegalArgumentException in case the given {@link Publisher} is {@literal null}.
161162
*/
162-
Single<Void> delete(Observable<? extends T> entityStream);
163+
Completable delete(Observable<? extends T> entityStream);
163164

164165
/**
165166
* Deletes all entities managed by the repository.
166167
*/
167-
Single<Void> deleteAll();
168+
Completable deleteAll();
168169
}
Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,25 @@
1717

1818
import java.io.Serializable;
1919

20-
import org.reactivestreams.Publisher;
21-
import org.springframework.data.domain.Page;
22-
import org.springframework.data.domain.Pageable;
2320
import org.springframework.data.domain.Sort;
2421
import org.springframework.data.repository.NoRepositoryBean;
2522

2623
import rx.Observable;
2724
import rx.Single;
2825

2926
/**
30-
* Extension of {@link RxJavaCrudRepository} to provide additional methods to retrieve entities using the pagination and sorting
27+
* Extension of {@link RxJava1CrudRepository} to provide additional methods to retrieve entities using the sorting
3128
* abstraction.
3229
*
3330
* @author Mark Paluch
3431
* @since 2.0
3532
* @see Sort
36-
* @see Pageable
3733
* @see Single
3834
* @see Observable
39-
* @see RxJavaCrudRepository
35+
* @see RxJava1CrudRepository
4036
*/
4137
@NoRepositoryBean
42-
public interface RxJavaPagingAndSortingRepository<T, ID extends Serializable> extends RxJavaCrudRepository<T, ID> {
38+
public interface RxJava1SortingRepository<T, ID extends Serializable> extends RxJava1CrudRepository<T, ID> {
4339

4440
/*
4541
* (non-Javadoc)
@@ -69,12 +65,4 @@ public interface RxJavaPagingAndSortingRepository<T, ID extends Serializable> ex
6965
* @return all entities sorted by the given options
7066
*/
7167
Observable<T> findAll(Sort sort);
72-
73-
/**
74-
* Returns a {@link Page} of entities meeting the paging restriction provided in the {@code Pageable} object.
75-
*
76-
* @param pageable
77-
* @return a page of entities
78-
*/
79-
Single<Page<T>> findAll(Pageable pageable);
8068
}

src/main/java/org/springframework/data/repository/util/QueryExecutionConverters.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.springframework.core.convert.converter.Converter;
2929
import org.springframework.core.convert.converter.GenericConverter;
3030
import org.springframework.core.convert.support.ConfigurableConversionService;
31+
import org.springframework.data.repository.util.ReactiveWrappers.ReactiveLibrary;
3132
import org.springframework.scheduling.annotation.AsyncResult;
3233
import org.springframework.util.Assert;
3334
import org.springframework.util.ClassUtils;
@@ -189,7 +190,7 @@ public static void registerConvertersIn(ConfigurableConversionService conversion
189190

190191
if (ReactiveWrappers.isAvailable()) {
191192

192-
if (ReactiveWrappers.RXJAVA1_PRESENT) {
193+
if (ReactiveWrappers.isAvailable(ReactiveLibrary.RXJAVA1)) {
193194

194195
conversionService.addConverter(PublisherToRxJava1CompletableConverter.INSTANCE);
195196
conversionService.addConverter(RxJava1CompletableToPublisherConverter.INSTANCE);
@@ -206,7 +207,7 @@ public static void registerConvertersIn(ConfigurableConversionService conversion
206207
conversionService.addConverter(RxJava1ObservableToFluxConverter.INSTANCE);
207208
}
208209

209-
if (ReactiveWrappers.RXJAVA2_PRESENT) {
210+
if (ReactiveWrappers.isAvailable(ReactiveLibrary.RXJAVA2)) {
210211

211212
conversionService.addConverter(PublisherToRxJava2CompletableConverter.INSTANCE);
212213
conversionService.addConverter(RxJava2CompletableToPublisherConverter.INSTANCE);
@@ -231,19 +232,20 @@ public static void registerConvertersIn(ConfigurableConversionService conversion
231232
conversionService.addConverter(RxJava2MaybeToFluxConverter.INSTANCE);
232233
}
233234

234-
if (ReactiveWrappers.PROJECT_REACTOR_PRESENT) {
235+
if (ReactiveWrappers.isAvailable(ReactiveLibrary.PROJECT_REACTOR)) {
235236
conversionService.addConverter(PublisherToMonoConverter.INSTANCE);
236237
conversionService.addConverter(PublisherToFluxConverter.INSTANCE);
237238
}
238239

239-
if (ReactiveWrappers.RXJAVA1_PRESENT) {
240+
if (ReactiveWrappers.isAvailable(ReactiveLibrary.RXJAVA1)) {
240241
conversionService.addConverter(RxJava1SingleToObservableConverter.INSTANCE);
241242
conversionService.addConverter(RxJava1ObservableToSingleConverter.INSTANCE);
242243
}
243244

244-
if (ReactiveWrappers.RXJAVA2_PRESENT) {
245+
if (ReactiveWrappers.isAvailable(ReactiveLibrary.RXJAVA2)) {
245246
conversionService.addConverter(RxJava2SingleToObservableConverter.INSTANCE);
246247
conversionService.addConverter(RxJava2ObservableToSingleConverter.INSTANCE);
248+
conversionService.addConverter(RxJava2ObservableToMaybeConverter.INSTANCE);
247249
}
248250
}
249251
}
@@ -1084,7 +1086,24 @@ public enum RxJava2ObservableToSingleConverter
10841086

10851087
@Override
10861088
public io.reactivex.Single<?> convert(io.reactivex.Observable<?> source) {
1087-
return source.singleElement().toSingle();
1089+
return source.singleOrError();
1090+
}
1091+
}
1092+
1093+
/**
1094+
* A {@link Converter} to convert a {@link Observable} to {@link Maybe}.
1095+
*
1096+
* @author Mark Paluch
1097+
* @author 2.0
1098+
*/
1099+
public enum RxJava2ObservableToMaybeConverter
1100+
implements Converter<io.reactivex.Observable<?>, io.reactivex.Maybe<?>> {
1101+
1102+
INSTANCE;
1103+
1104+
@Override
1105+
public io.reactivex.Maybe<?> convert(io.reactivex.Observable<?> source) {
1106+
return source.singleElement();
10881107
}
10891108
}
10901109

src/main/java/org/springframework/data/repository/util/ReactiveWrapperConverters.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.core.ReactiveAdapterRegistry;
2323
import org.springframework.core.convert.converter.Converter;
2424
import org.springframework.core.convert.support.GenericConversionService;
25+
import org.springframework.data.repository.util.ReactiveWrappers.ReactiveLibrary;
2526
import org.springframework.util.Assert;
2627
import org.springframework.util.ClassUtils;
2728

@@ -52,21 +53,21 @@ public class ReactiveWrapperConverters {
5253

5354
static {
5455

55-
if (ReactiveWrappers.RXJAVA1_PRESENT) {
56+
if (ReactiveWrappers.isAvailable(ReactiveLibrary.RXJAVA1)) {
5657

5758
REACTIVE_WRAPPERS.add(RxJava1SingleWrapper.INSTANCE);
5859
REACTIVE_WRAPPERS.add(RxJava1ObservableWrapper.INSTANCE);
5960
}
6061

61-
if (ReactiveWrappers.RXJAVA2_PRESENT) {
62+
if (ReactiveWrappers.isAvailable(ReactiveLibrary.RXJAVA2)) {
6263

6364
REACTIVE_WRAPPERS.add(RxJava2SingleWrapper.INSTANCE);
6465
REACTIVE_WRAPPERS.add(RxJava2MaybeWrapper.INSTANCE);
6566
REACTIVE_WRAPPERS.add(RxJava2ObservableWrapper.INSTANCE);
6667
REACTIVE_WRAPPERS.add(RxJava2FlowableWrapper.INSTANCE);
6768
}
6869

69-
if (ReactiveWrappers.PROJECT_REACTOR_PRESENT) {
70+
if (ReactiveWrappers.isAvailable(ReactiveLibrary.PROJECT_REACTOR)) {
7071

7172
REACTIVE_WRAPPERS.add(FluxWrapper.INSTANCE);
7273
REACTIVE_WRAPPERS.add(MonoWrapper.INSTANCE);

src/main/java/org/springframework/data/repository/util/ReactiveWrappers.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@
5959
@UtilityClass
6060
public class ReactiveWrappers {
6161

62-
static final boolean PROJECT_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Mono",
62+
private static final boolean PROJECT_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Mono",
6363
ReactiveWrappers.class.getClassLoader());
6464

65-
static final boolean RXJAVA1_PRESENT = ClassUtils.isPresent("rx.Completable",
65+
private static final boolean RXJAVA1_PRESENT = ClassUtils.isPresent("rx.Completable",
6666
ReactiveWrappers.class.getClassLoader());
6767

68-
static final boolean RXJAVA2_PRESENT = ClassUtils.isPresent("io.reactivex.Flowable",
68+
private static final boolean RXJAVA2_PRESENT = ClassUtils.isPresent("io.reactivex.Flowable",
6969
ReactiveWrappers.class.getClassLoader());
7070

7171
private static final Map<Class<?>, Descriptor> REACTIVE_WRAPPERS;
@@ -107,7 +107,30 @@ public class ReactiveWrappers {
107107
* @return {@literal true} if reactive support is available.
108108
*/
109109
public static boolean isAvailable() {
110-
return RXJAVA1_PRESENT || RXJAVA2_PRESENT || PROJECT_REACTOR_PRESENT;
110+
return isAvailable(ReactiveLibrary.PROJECT_REACTOR) || isAvailable(ReactiveLibrary.RXJAVA1)
111+
|| isAvailable(ReactiveLibrary.RXJAVA2);
112+
}
113+
114+
/**
115+
* Returns {@literal true} if the {@link ReactiveLibrary} is available.
116+
*
117+
* @param reactiveLibrary must not be {@literal null}.
118+
* @return {@literal true} if the {@link ReactiveLibrary} is available.
119+
*/
120+
public static boolean isAvailable(ReactiveLibrary reactiveLibrary) {
121+
122+
Assert.notNull(reactiveLibrary, "ReactiveLibrary must not be null!");
123+
124+
switch (reactiveLibrary) {
125+
case PROJECT_REACTOR:
126+
return PROJECT_REACTOR_PRESENT;
127+
case RXJAVA1:
128+
return RXJAVA1_PRESENT;
129+
case RXJAVA2:
130+
return RXJAVA2_PRESENT;
131+
}
132+
133+
throw new IllegalArgumentException(String.format("ReactiveLibrary %s not supported", reactiveLibrary));
111134
}
112135

113136
/**
@@ -207,4 +230,13 @@ private static Optional<Descriptor> findDescriptor(Class<?> rhsType) {
207230
}
208231
return Optional.empty();
209232
}
233+
234+
/**
235+
* Enumeration of supported reactive libraries.
236+
*
237+
* @author Mark Paluch
238+
*/
239+
enum ReactiveLibrary {
240+
PROJECT_REACTOR, RXJAVA1, RXJAVA2;
241+
}
210242
}

src/test/java/org/springframework/data/repository/core/support/ReactiveRepositoryInformationUnitTests.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@
3030
import org.springframework.core.convert.support.DefaultConversionService;
3131
import org.springframework.data.repository.core.RepositoryMetadata;
3232
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
33-
import org.springframework.data.repository.reactive.ReactivePagingAndSortingRepository;
34-
import org.springframework.data.repository.reactive.RxJavaCrudRepository;
33+
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
34+
import org.springframework.data.repository.reactive.RxJava1CrudRepository;
3535
import org.springframework.data.repository.util.QueryExecutionConverters;
3636

3737
/**
38-
* Unit tests for {@link ConvertingMethodParameterRepositoryInformation}.
38+
* Unit tests for {@link ReactiveRepositoryInformation}.
3939
*
4040
* @author Mark Paluch
4141
*/
@@ -47,8 +47,8 @@ public class ReactiveRepositoryInformationUnitTests {
4747
@Test
4848
public void discoversMethodWithoutComparingReturnType() throws Exception {
4949

50-
Method method = RxJavaInterfaceWithGenerics.class.getMethod("deleteAll");
51-
RepositoryMetadata metadata = new DefaultRepositoryMetadata(RxJavaInterfaceWithGenerics.class);
50+
Method method = RxJava1InterfaceWithGenerics.class.getMethod("deleteAll");
51+
RepositoryMetadata metadata = new DefaultRepositoryMetadata(RxJava1InterfaceWithGenerics.class);
5252
DefaultRepositoryInformation information = new DefaultRepositoryInformation(metadata, REPOSITORY, null);
5353

5454
Method reference = information.getTargetClassMethod(method);
@@ -62,8 +62,8 @@ public void discoversMethodWithConvertibleArguments() throws Exception {
6262
DefaultConversionService conversionService = new DefaultConversionService();
6363
QueryExecutionConverters.registerConvertersIn(conversionService);
6464

65-
Method method = RxJavaInterfaceWithGenerics.class.getMethod("save", Observable.class);
66-
RepositoryMetadata metadata = new DefaultRepositoryMetadata(RxJavaInterfaceWithGenerics.class);
65+
Method method = RxJava1InterfaceWithGenerics.class.getMethod("save", Observable.class);
66+
RepositoryMetadata metadata = new DefaultRepositoryMetadata(RxJava1InterfaceWithGenerics.class);
6767
DefaultRepositoryInformation information = new ReactiveRepositoryInformation(metadata, REPOSITORY, null,
6868
conversionService);
6969

@@ -79,7 +79,7 @@ public void discoversMethodAssignableArguments() throws Exception {
7979
DefaultConversionService conversionService = new DefaultConversionService();
8080
QueryExecutionConverters.registerConvertersIn(conversionService);
8181

82-
Method method = ReactivePagingAndSortingRepository.class.getMethod("save", Publisher.class);
82+
Method method = ReactiveSortingRepository.class.getMethod("save", Publisher.class);
8383
RepositoryMetadata metadata = new DefaultRepositoryMetadata(ReactiveJavaInterfaceWithGenerics.class);
8484
DefaultRepositoryInformation information = new ReactiveRepositoryInformation(metadata, REPOSITORY, null,
8585
conversionService);
@@ -124,7 +124,8 @@ public void discoversMethodExactObjectArguments() throws Exception {
124124
assertThat(reference.getParameterTypes()[0], is(equalTo(Object.class)));
125125
}
126126

127-
interface RxJavaInterfaceWithGenerics extends RxJavaCrudRepository<User, String> {}
127+
interface RxJava1InterfaceWithGenerics extends RxJava1CrudRepository<User, String>
128+
{}
128129

129130
interface ReactiveJavaInterfaceWithGenerics extends ReactiveCrudRepository<User, String> {}
130131

0 commit comments

Comments
 (0)