Skip to content

Commit 91d4172

Browse files
christophstroblmp911de
authored andcommitted
DATAMONGO-1444 - Update bulk insert operations.
Accept Mono<Collection<T>> instead of Publisher<T> to get rid of hidden buffer call inside of MongoTemplate.
1 parent 60f7628 commit 91d4172

File tree

3 files changed

+35
-45
lines changed

3 files changed

+35
-45
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public interface ReactiveMongoOperations {
168168
/**
169169
* A set of collection names.
170170
*
171-
* @return list of collection names
171+
* @return Flux of collection names
172172
*/
173173
Flux<String> getCollectionNames();
174174

@@ -221,29 +221,29 @@ public interface ReactiveMongoOperations {
221221
Mono<Void> dropCollection(String collectionName);
222222

223223
/**
224-
* Query for a list of objects of type T from the collection used by the entity class.
224+
* Query for a {@link Flux} of objects of type T from the collection used by the entity class.
225225
* <p/>
226226
* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless
227227
* configured otherwise, an instance of {@link MappingMongoConverter} will be used.
228228
* <p/>
229229
* If your collection does not contain a homogeneous collection of types, this operation will not be an efficient way
230230
* to map objects since the test for class type is done in the client and not on the server.
231+
* @param entityClass the parametrized type of the returned {@link Flux}.
231232
*
232-
* @param entityClass the parametrized type of the returned list
233233
* @return the converted collection
234234
*/
235235
<T> Flux<T> findAll(Class<T> entityClass);
236236

237237
/**
238-
* Query for a list of objects of type T from the specified collection.
238+
* Query for a {@link Flux} of objects of type T from the specified collection.
239239
* <p/>
240240
* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless
241241
* configured otherwise, an instance of {@link MappingMongoConverter} will be used.
242242
* <p/>
243243
* If your collection does not contain a homogeneous collection of types, this operation will not be an efficient way
244244
* to map objects since the test for class type is done in the client and not on the server.
245245
*
246-
* @param entityClass the parametrized type of the returned list.
246+
* @param entityClass the parametrized type of the returned {@link Flux}.
247247
* @param collectionName name of the collection to retrieve the objects from
248248
* @return the converted collection
249249
*/
@@ -261,7 +261,7 @@ public interface ReactiveMongoOperations {
261261
*
262262
* @param query the query class that specifies the criteria used to find a record and also an optional fields
263263
* specification
264-
* @param entityClass the parametrized type of the returned list.
264+
* @param entityClass the parametrized type of the returned {@link Mono}.
265265
* @return the converted object
266266
*/
267267
<T> Mono<T> findOne(Query query, Class<T> entityClass);
@@ -278,7 +278,7 @@ public interface ReactiveMongoOperations {
278278
*
279279
* @param query the query class that specifies the criteria used to find a record and also an optional fields
280280
* specification
281-
* @param entityClass the parametrized type of the returned list.
281+
* @param entityClass the parametrized type of the returned {@link Mono}.
282282
* @param collectionName name of the collection to retrieve the objects from
283283
* @return the converted object
284284
*/
@@ -313,7 +313,7 @@ public interface ReactiveMongoOperations {
313313
Mono<Boolean> exists(Query query, Class<?> entityClass, String collectionName);
314314

315315
/**
316-
* Map the results of an ad-hoc query on the collection for the entity class to a List of the specified type.
316+
* Map the results of an ad-hoc query on the collection for the entity class to a {@link Flux} of the specified type.
317317
* <p/>
318318
* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless
319319
* configured otherwise, an instance of {@link MappingMongoConverter} will be used.
@@ -323,13 +323,13 @@ public interface ReactiveMongoOperations {
323323
*
324324
* @param query the query class that specifies the criteria used to find a record and also an optional fields
325325
* specification
326-
* @param entityClass the parametrized type of the returned list.
327-
* @return the List of converted objects
326+
* @param entityClass the parametrized type of the returned {@link Flux}.
327+
* @return the {@link Flux} of converted objects
328328
*/
329329
<T> Flux<T> find(Query query, Class<T> entityClass);
330330

331331
/**
332-
* Map the results of an ad-hoc query on the specified collection to a List of the specified type.
332+
* Map the results of an ad-hoc query on the specified collection to a {@link Flux} of the specified type.
333333
* <p/>
334334
* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless
335335
* configured otherwise, an instance of {@link MappingMongoConverter} will be used.
@@ -339,9 +339,9 @@ public interface ReactiveMongoOperations {
339339
*
340340
* @param query the query class that specifies the criteria used to find a record and also an optional fields
341341
* specification
342-
* @param entityClass the parametrized type of the returned list.
342+
* @param entityClass the parametrized type of the returned {@link Flux}.
343343
* @param collectionName name of the collection to retrieve the objects from
344-
* @return the List of converted objects
344+
* @return the {@link Flux} of converted objects
345345
*/
346346
<T> Flux<T> find(Query query, Class<T> entityClass, String collectionName);
347347

@@ -459,7 +459,7 @@ <T> Mono<T> findAndModify(Query query, Update update, FindAndModifyOptions optio
459459
*
460460
* @param query the query class that specifies the criteria used to find a record and also an optional fields
461461
* specification
462-
* @param entityClass the parametrized type of the returned list.
462+
* @param entityClass the parametrized type of the returned {@link Mono}.
463463
* @return the converted object
464464
*/
465465
<T> Mono<T> findAndRemove(Query query, Class<T> entityClass);
@@ -476,8 +476,8 @@ <T> Mono<T> findAndModify(Query query, Update update, FindAndModifyOptions optio
476476
*
477477
* @param query the query class that specifies the criteria used to find a record and also an optional fields
478478
* specification
479-
* @param entityClass the parametrized type of the returned list.
480-
* @param collectionName name of the collection to retrieve the objects from
479+
* @param entityClass the parametrized type of the returned {@link Mono}.
480+
* @param collectionName name of the collection to retrieve the objects from.
481481
* @return the converted object
482482
*/
483483
<T> Mono<T> findAndRemove(Query query, Class<T> entityClass, String collectionName);
@@ -550,14 +550,14 @@ <T> Mono<T> findAndModify(Query query, Update update, FindAndModifyOptions optio
550550
/**
551551
* Insert a Collection of objects into a collection in a single batch write to the database.
552552
*
553-
* @param batchToSave the list of objects to save.
553+
* @param batchToSave the batch of objects to save.
554554
* @param entityClass class that determines the collection to use
555555
* @return
556556
*/
557557
<T> Flux<T> insert(Collection<? extends T> batchToSave, Class<?> entityClass);
558558

559559
/**
560-
* Insert a list of objects into the specified collection in a single batch write to the database.
560+
* Insert a batch of objects into the specified collection in a single batch write to the database.
561561
*
562562
* @param batchToSave the list of objects to save.
563563
* @param collectionName name of the collection to store the object in
@@ -600,16 +600,16 @@ <T> Mono<T> findAndModify(Query query, Update update, FindAndModifyOptions optio
600600
* @param entityClass class that determines the collection to use
601601
* @return
602602
*/
603-
<T> Flux<T> insert(Publisher<? extends T> batchToSave, Class<?> entityClass);
603+
<T> Flux<T> insertAll(Mono<Collection<? extends T>> batchToSave, Class<?> entityClass);
604604

605605
/**
606-
* Insert a list of objects into the specified collection in a single batch write to the database.
606+
* Insert objects into the specified collection in a single batch write to the database.
607607
*
608608
* @param batchToSave the publisher which provides objects to save.
609609
* @param collectionName name of the collection to store the object in
610610
* @return
611611
*/
612-
<T> Flux<T> insert(Publisher<? extends T> batchToSave, String collectionName);
612+
<T> Flux<T> insertAll(Mono<Collection<? extends T>> batchToSave, String collectionName);
613613

614614
/**
615615
* Insert a mixed Collection of objects into a database collection determining the collection name to use based on the
@@ -618,7 +618,7 @@ <T> Mono<T> findAndModify(Query query, Update update, FindAndModifyOptions optio
618618
* @param objectsToSave the publisher which provides objects to save.
619619
* @return
620620
*/
621-
<T> Flux<T> insertAll(Publisher<? extends T> objectsToSave);
621+
<T> Flux<T> insertAll(Mono<Collection<? extends T>> objectsToSave);
622622

623623
/**
624624
* Save the object to the collection for the entity type of the object to save. This will perform an insert if the
@@ -910,8 +910,8 @@ <T> Mono<T> findAndModify(Query query, Update update, FindAndModifyOptions optio
910910
*
911911
* @param query the query class that specifies the criteria used to find a record and also an optional fields
912912
* specification
913-
* @param entityClass the parametrized type of the returned list.
914-
* @return the List of converted objects
913+
* @param entityClass the parametrized type of the returned {@link Flux}.
914+
* @return the {@link Flux} of converted objects
915915
*/
916916
<T> Flux<T> tail(Query query, Class<T> entityClass);
917917

@@ -929,9 +929,9 @@ <T> Mono<T> findAndModify(Query query, Update update, FindAndModifyOptions optio
929929
*
930930
* @param query the query class that specifies the criteria used to find a record and also an optional fields
931931
* specification
932-
* @param entityClass the parametrized type of the returned list.
932+
* @param entityClass the parametrized type of the returned {@link Flux}.
933933
* @param collectionName name of the collection to retrieve the objects from
934-
* @return the List of converted objects
934+
* @return the {@link Flux} of converted objects
935935
*/
936936
<T> Flux<T> tail(Query query, Class<T> entityClass, String collectionName);
937937

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
160160
private final QueryMapper queryMapper;
161161
private final UpdateMapper updateMapper;
162162

163-
private int publisherBatchSize = 10;
164163
private WriteConcern writeConcern;
165164
private WriteConcernResolver writeConcernResolver = DefaultWriteConcernResolver.INSTANCE;
166165
private WriteResultChecking writeResultChecking = WriteResultChecking.NONE;
@@ -257,15 +256,6 @@ public void setReadPreference(ReadPreference readPreference) {
257256
this.readPreference = readPreference;
258257
}
259258

260-
/**
261-
* Used to set a batch size when working with batches of {@link Publisher} emitting items to insert.
262-
*
263-
* @param publisherBatchSize batch size
264-
*/
265-
public void setPublisherBatchSize(int publisherBatchSize) {
266-
this.publisherBatchSize = publisherBatchSize;
267-
}
268-
269259
/*
270260
* (non-Javadoc)
271261
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
@@ -779,16 +769,16 @@ public <T> Mono<T> insert(Mono<? extends T> objectToSave) {
779769
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insert(org.reactivestreams.Publisher, java.lang.Class)
780770
*/
781771
@Override
782-
public <T> Flux<T> insert(Publisher<? extends T> batchToSave, Class<?> entityClass) {
783-
return insert(batchToSave, determineCollectionName(entityClass));
772+
public <T> Flux<T> insertAll(Mono<Collection<? extends T>> batchToSave, Class<?> entityClass) {
773+
return insertAll(batchToSave, determineCollectionName(entityClass));
784774
}
785775

786776
/* (non-Javadoc)
787777
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insert(org.reactivestreams.Publisher, java.lang.String)
788778
*/
789779
@Override
790-
public <T> Flux<T> insert(Publisher<? extends T> batchToSave, String collectionName) {
791-
return Flux.from(batchToSave).buffer(publisherBatchSize).flatMap(collection -> insert(collection, collectionName));
780+
public <T> Flux<T> insertAll(Mono<Collection<? extends T>> batchToSave, String collectionName) {
781+
return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName));
792782
}
793783

794784
/* (non-Javadoc)
@@ -857,8 +847,8 @@ public <T> Flux<T> insertAll(Collection<? extends T> objectsToSave) {
857847
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insertAll(org.reactivestreams.Publisher)
858848
*/
859849
@Override
860-
public <T> Flux<T> insertAll(Publisher<? extends T> objectsToSave) {
861-
return Flux.from(objectsToSave).buffer(publisherBatchSize).flatMap(this::insertAll);
850+
public <T> Flux<T> insertAll(Mono<Collection<? extends T>> objectsToSave) {
851+
return Flux.from(objectsToSave).flatMap(this::insertAll);
862852
}
863853

864854
protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) {

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import lombok.Data;
6969
import reactor.core.Cancellation;
7070
import reactor.core.publisher.Flux;
71+
import reactor.core.publisher.Mono;
7172
import reactor.test.TestSubscriber;
7273

7374
/**
@@ -404,8 +405,7 @@ public void updateMultiByEntityTypeShouldUpdateObjects() throws Exception {
404405
Query query = new Query(
405406
new Criteria().orOperator(where("firstName").is("Walter Jr"), Criteria.where("firstName").is("Walter")));
406407

407-
template.insertAll(Flux.just(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16))) //
408-
.collectList() //
408+
template.insertAll(Mono.just(Arrays.asList(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16)))) //
409409
.flatMap(a -> template.updateMulti(query, new Update().set("firstName", "Walt"), Person.class)) //
410410
.flatMap(p -> template.find(new Query(where("firstName").is("Walt")), Person.class)) //
411411
.subscribeWith(TestSubscriber.create()) //
@@ -422,7 +422,7 @@ public void updateMultiByCollectionNameShouldUpdateObject() throws Exception {
422422
new Criteria().orOperator(where("firstName").is("Walter Jr"), Criteria.where("firstName").is("Walter")));
423423

424424
template
425-
.insert(Flux.just(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16)), "people") //
425+
.insertAll(Mono.just(Arrays.asList(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16))), "people") //
426426
.collectList() //
427427
.flatMap(a -> template.updateMulti(query, new Update().set("firstName", "Walt"), Person.class, "people")) //
428428
.flatMap(p -> template.find(new Query(where("firstName").is("Walt")), Person.class, "people")) //

0 commit comments

Comments
 (0)