Skip to content

Commit 12a65a8

Browse files
committed
DATACMNS-836 - Add reactive repository interfaces.
Add ReactiveCrudRepository and ReactivePagingAndSortingRepository.
1 parent 2d3cbb5 commit 12a65a8

File tree

4 files changed

+276
-11
lines changed

4 files changed

+276
-11
lines changed

pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,15 @@
9595
<optional>true</optional>
9696
</dependency>
9797

98+
<!-- Project Reactor -->
99+
100+
<dependency>
101+
<groupId>io.projectreactor</groupId>
102+
<artifactId>reactor-core</artifactId>
103+
<version>${reactor}</version>
104+
<optional>true</optional>
105+
</dependency>
106+
98107
<!-- Querydsl -->
99108

100109
<dependency>

src/main/java/org/springframework/data/repository/query/ResultProcessor.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import lombok.NonNull;
1919
import lombok.RequiredArgsConstructor;
20+
import reactor.core.publisher.Flux;
21+
import reactor.core.publisher.Mono;
2022

2123
import java.util.Arrays;
2224
import java.util.Collection;
@@ -26,6 +28,7 @@
2628
import java.util.function.Function;
2729
import java.util.stream.Stream;
2830

31+
import org.reactivestreams.Publisher;
2932
import org.springframework.core.CollectionFactory;
3033
import org.springframework.core.convert.ConversionService;
3134
import org.springframework.core.convert.converter.Converter;
@@ -38,7 +41,7 @@
3841
/**
3942
* A {@link ResultProcessor} to expose metadata about query result element projection and eventually post prcessing raw
4043
* query results into projections and data transfer objects.
41-
*
44+
*
4245
* @author Oliver Gierke
4346
* @author John Blum
4447
* @since 1.12
@@ -53,7 +56,7 @@ public class ResultProcessor {
5356

5457
/**
5558
* Creates a new {@link ResultProcessor} from the given {@link QueryMethod} and {@link ProjectionFactory}.
56-
*
59+
*
5760
* @param method must not be {@literal null}.
5861
* @param factory must not be {@literal null}.
5962
*/
@@ -63,7 +66,7 @@ public class ResultProcessor {
6366

6467
/**
6568
* Creates a new {@link ResultProcessor} for the given {@link QueryMethod}, {@link ProjectionFactory} and type.
66-
*
69+
*
6770
* @param method must not be {@literal null}.
6871
* @param factory must not be {@literal null}.
6972
* @param type must not be {@literal null}.
@@ -82,7 +85,7 @@ private ResultProcessor(QueryMethod method, ProjectionFactory factory, Class<?>
8285

8386
/**
8487
* Returns a new {@link ResultProcessor} with a new projection type obtained from the given {@link ParameterAccessor}.
85-
*
88+
*
8689
* @param accessor can be {@literal null}.
8790
* @return
8891
*/
@@ -99,7 +102,7 @@ public ResultProcessor withDynamicProjection(ParameterAccessor accessor) {
99102

100103
/**
101104
* Returns the {@link ReturnedType}.
102-
*
105+
*
103106
* @return
104107
*/
105108
public ReturnedType getReturnedType() {
@@ -108,7 +111,7 @@ public ReturnedType getReturnedType() {
108111

109112
/**
110113
* Post-processes the given query result.
111-
*
114+
*
112115
* @param source can be {@literal null}.
113116
* @return
114117
*/
@@ -119,7 +122,7 @@ public <T> T processResult(Object source) {
119122
/**
120123
* Post-processes the given query result using the given preparing {@link Converter} to potentially prepare collection
121124
* elements.
122-
*
125+
*
123126
* @param source can be {@literal null}.
124127
* @param preparingConverter must not be {@literal null}.
125128
* @return
@@ -156,6 +159,18 @@ public <T> T processResult(Object source, Converter<Object, Object> preparingCon
156159
return (T) new StreamQueryResultHandler(type, converter).handle(source);
157160
}
158161

162+
if (source instanceof Mono) {
163+
164+
Mono<?> mono = (Mono<?>) source;
165+
return (T) mono.map(o -> type.isInstance(o) ? o : converter.convert(o));
166+
}
167+
168+
if (source instanceof Flux) {
169+
170+
Flux<?> flux = (Flux<?>) source;
171+
return (T) flux.map(o -> type.isInstance(o) ? o : converter.convert(o));
172+
}
173+
159174
return (T) converter.convert(source);
160175
}
161176

@@ -168,7 +183,7 @@ private static class ChainingConverter implements Converter<Object, Object> {
168183
/**
169184
* Returns a new {@link ChainingConverter} that hands the elements resulting from the current conversion to the
170185
* given {@link Converter}.
171-
*
186+
*
172187
* @param converter must not be {@literal null}.
173188
* @return
174189
*/
@@ -187,7 +202,7 @@ public Object convert(Object source) {
187202
});
188203
}
189204

190-
/*
205+
/*
191206
* (non-Javadoc)
192207
* @see org.springframework.core.convert.converter.Converter#convert(java.lang.Object)
193208
*/
@@ -207,7 +222,7 @@ private static enum NoOpConverter implements Converter<Object, Object> {
207222

208223
INSTANCE;
209224

210-
/*
225+
/*
211226
* (non-Javadoc)
212227
* @see org.springframework.core.convert.converter.Converter#convert(java.lang.Object)
213228
*/
@@ -224,7 +239,7 @@ private static class ProjectingConverter implements Converter<Object, Object> {
224239
private final @NonNull ProjectionFactory factory;
225240
private final ConversionService conversionService = new DefaultConversionService();
226241

227-
/*
242+
/*
228243
* (non-Javadoc)
229244
* @see org.springframework.core.convert.converter.Converter#convert(java.lang.Object)
230245
*/
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright 2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.repository.reactive;
17+
18+
import java.io.Serializable;
19+
20+
import org.reactivestreams.Publisher;
21+
import org.springframework.data.repository.NoRepositoryBean;
22+
import org.springframework.data.repository.Repository;
23+
24+
import reactor.core.publisher.Flux;
25+
import reactor.core.publisher.Mono;
26+
27+
/**
28+
* Interface for generic CRUD operations on a repository for a specific type.
29+
*
30+
* @author Mark Paluch
31+
*/
32+
@NoRepositoryBean
33+
public interface ReactiveCrudRepository<T, ID extends Serializable> extends Repository<T, ID> {
34+
35+
/**
36+
* Saves a given entity. Use the returned instance for further operations as the save operation might have changed the
37+
* entity instance completely.
38+
*
39+
* @param entity
40+
* @return the saved entity
41+
*/
42+
<S extends T> Mono<S> save(S entity);
43+
44+
/**
45+
* Saves all given entities.
46+
*
47+
* @param entities must not be {@literal null}.
48+
* @return the saved entities
49+
* @throws IllegalArgumentException in case the given entity is {@literal null}.
50+
*/
51+
<S extends T> Flux<S> save(Iterable<S> entities);
52+
53+
/**
54+
* Saves all given entities.
55+
*
56+
* @param entityStream must not be {@literal null}.
57+
* @return the saved entities
58+
* @throws IllegalArgumentException in case the given {@code Publisher} is {@literal null}.
59+
*/
60+
<S extends T> Flux<S> save(Publisher<S> entityStream);
61+
62+
/**
63+
* Retrieves an entity by its id.
64+
*
65+
* @param id must not be {@literal null}.
66+
* @return the entity with the given id or {@literal null} if none found
67+
* @throws IllegalArgumentException if {@code id} is {@literal null}
68+
*/
69+
Mono<T> findOne(ID id);
70+
71+
/**
72+
* Retrieves an entity by its id supplied by a {@link Mono}.
73+
*
74+
* @param id must not be {@literal null}.
75+
* @return the entity with the given id or {@literal null} if none found
76+
* @throws IllegalArgumentException if {@code id} is {@literal null}
77+
*/
78+
Mono<T> findOne(Mono<ID> id);
79+
80+
/**
81+
* Returns whether an entity with the given id exists.
82+
*
83+
* @param id must not be {@literal null}.
84+
* @return true if an entity with the given id exists, {@literal false} otherwise
85+
* @throws IllegalArgumentException if {@code id} is {@literal null}
86+
*/
87+
Mono<Boolean> exists(ID id);
88+
89+
/**
90+
* Returns whether an entity with the given id exists supplied by a {@link Mono}.
91+
*
92+
* @param id must not be {@literal null}.
93+
* @return true if an entity with the given id exists, {@literal false} otherwise
94+
* @throws IllegalArgumentException if {@code id} is {@literal null}
95+
*/
96+
Mono<Boolean> exists(Mono<ID> id);
97+
98+
/**
99+
* Returns all instances of the type.
100+
*
101+
* @return all entities
102+
*/
103+
Flux<T> findAll();
104+
105+
/**
106+
* Returns all instances of the type with the given IDs.
107+
*
108+
* @param ids
109+
* @return
110+
*/
111+
Flux<T> findAll(Iterable<ID> ids);
112+
113+
/**
114+
* Returns all instances of the type with the given IDs.
115+
*
116+
* @param idStream
117+
* @return
118+
*/
119+
Flux<T> findAll(Publisher<ID> idStream);
120+
121+
/**
122+
* Returns the number of entities available.
123+
*
124+
* @return the number of entities
125+
*/
126+
Mono<Long> count();
127+
128+
/**
129+
* Deletes the entity with the given id.
130+
*
131+
* @param id must not be {@literal null}.
132+
* @throws IllegalArgumentException in case the given {@code id} is {@literal null}
133+
*/
134+
Mono<Void> delete(ID id);
135+
136+
/**
137+
* Deletes a given entity.
138+
*
139+
* @param entity
140+
* @throws IllegalArgumentException in case the given entity is {@literal null}.
141+
*/
142+
Mono<Void> delete(T entity);
143+
144+
/**
145+
* Deletes the given entities.
146+
*
147+
* @param entities
148+
* @throws IllegalArgumentException in case the given {@link Iterable} is {@literal null}.
149+
*/
150+
Mono<Void> delete(Iterable<? extends T> entities);
151+
152+
/**
153+
* Deletes the given entities.
154+
*
155+
* @param entityStream
156+
* @throws IllegalArgumentException in case the given {@link Publisher} is {@literal null}.
157+
*/
158+
Mono<Void> delete(Publisher<? extends T> entityStream);
159+
160+
/**
161+
* Deletes all entities managed by the repository.
162+
*/
163+
Mono<Void> deleteAll();
164+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.repository.reactive;
17+
18+
import java.io.Serializable;
19+
20+
import org.reactivestreams.Publisher;
21+
import org.springframework.data.domain.Page;
22+
import org.springframework.data.domain.Pageable;
23+
import org.springframework.data.domain.Sort;
24+
import org.springframework.data.repository.NoRepositoryBean;
25+
26+
import reactor.core.publisher.Flux;
27+
import reactor.core.publisher.Mono;
28+
29+
/**
30+
* Extension of {@link ReactiveCrudRepository} to provide additional methods to retrieve entities using the pagination
31+
* and sorting abstraction.
32+
*
33+
* @author Mark Paluch
34+
* @see Sort
35+
* @see Pageable
36+
* @see org.springframework.data.domain.reactive.ReactivePage
37+
*/
38+
@NoRepositoryBean
39+
public interface ReactivePagingAndSortingRepository<T, ID extends Serializable> extends ReactiveCrudRepository<T, ID> {
40+
41+
/*
42+
* (non-Javadoc)
43+
* @see org.springframework.data.repository.reactive.ReactiveCrudRepository#findAll()
44+
*/
45+
@Override
46+
Flux<T> findAll();
47+
48+
/*
49+
* (non-Javadoc)
50+
* @see org.springframework.data.repository.reactive.ReactiveCrudRepository#findAll(java.lang.Iterable)
51+
*/
52+
@Override
53+
Flux<T> findAll(Iterable<ID> ids);
54+
55+
/*
56+
* (non-Javadoc)
57+
* @see org.springframework.data.repository.reactive.ReactiveCrudRepository#findAll(org.reactivestreams.Publisher)
58+
*/
59+
@Override
60+
Flux<T> findAll(Publisher<ID> idStream);
61+
62+
/**
63+
* Returns all entities sorted by the given options.
64+
*
65+
* @param sort
66+
* @return all entities sorted by the given options
67+
*/
68+
Flux<T> findAll(Sort sort);
69+
70+
/**
71+
* Returns a {@link Page} of entities meeting the paging restriction provided in the {@code Pageable} object.
72+
*
73+
* @param pageable
74+
* @return a page of entities
75+
*/
76+
Mono<Page<T>> findAll(Pageable pageable);
77+
}

0 commit comments

Comments
 (0)