Skip to content

Commit 4fd9edf

Browse files
committed
DATAMONGO-1559 - Polishing.
Migrate off deprecated Cancellation API to Disposable.
1 parent 955597b commit 4fd9edf

File tree

6 files changed

+24
-20
lines changed

6 files changed

+24
-20
lines changed

spring-data-mongodb/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@
111111
<optional>true</optional>
112112
</dependency>
113113

114+
<dependency>
115+
<groupId>io.projectreactor.addons</groupId>
116+
<artifactId>reactor-test</artifactId>
117+
<version>${reactor}</version>
118+
<optional>true</optional>
119+
</dependency>
120+
114121
<dependency>
115122
<groupId>io.reactivex</groupId>
116123
<artifactId>rxjava</artifactId>

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/InfiniteStream.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -13,7 +13,6 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
1716
package org.springframework.data.mongodb.repository;
1817

1918
import java.lang.annotation.Documented;
@@ -24,8 +23,6 @@
2423

2524
import org.springframework.data.annotation.QueryAnnotation;
2625

27-
import reactor.core.Cancellation;
28-
2926
/**
3027
* Annotation to declare an infinite stream using repository query methods. An infinite stream uses MongoDB's
3128
* {@link com.mongodb.CursorType#TailableAwait tailable} cursors to retrieve data from a capped collection and stream
@@ -35,8 +32,8 @@
3532
* The stream may become dead, or invalid, if either the query returns no match or the cursor returns the document at
3633
* the "end" of the collection and then the application deletes that document.
3734
* <p>
38-
* A stream that is no longer in use must be {@link Cancellation#dispose()} disposed} otherwise the streams will linger
39-
* and exhaust resources.
35+
* A stream that is no longer in use must be {@link reactor.core.Disposable#dispose()} disposed} otherwise the streams
36+
* will linger and exhaust resources.
4037
*
4138
* @author Mark Paluch
4239
* @see <a href="https://docs.mongodb.com/manual/core/tailable-cursors/">Tailable Cursors</a>

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import static org.springframework.data.mongodb.core.query.Query.*;
2222

2323
import lombok.Data;
24-
import reactor.core.Cancellation;
24+
import reactor.core.Disposable;
2525
import reactor.core.publisher.Flux;
2626
import reactor.core.publisher.Mono;
2727
import reactor.test.StepVerifier;
@@ -812,12 +812,12 @@ public void tailStreamsData() throws InterruptedException {
812812

813813
Flux<Document> capped = template.tail(null, Document.class, "capped");
814814

815-
Cancellation cancellation = capped.doOnNext(documents::add).subscribe();
815+
Disposable disposable = capped.doOnNext(documents::add).subscribe();
816816

817817
assertThat(documents.poll(5, TimeUnit.SECONDS), is(notNullValue()));
818818
assertThat(documents.isEmpty(), is(true));
819819

820-
cancellation.dispose();
820+
disposable.dispose();
821821
}
822822

823823
@Test // DATAMONGO-1444
@@ -834,7 +834,7 @@ public void tailStreamsDataUntilCancellation() throws InterruptedException {
834834

835835
Flux<Document> capped = template.tail(null, Document.class, "capped");
836836

837-
Cancellation cancellation = capped.doOnNext(documents::add).subscribe();
837+
Disposable disposable = capped.doOnNext(documents::add).subscribe();
838838

839839
assertThat(documents.poll(5, TimeUnit.SECONDS), is(notNullValue()));
840840
assertThat(documents.isEmpty(), is(true));
@@ -845,7 +845,7 @@ public void tailStreamsDataUntilCancellation() throws InterruptedException {
845845

846846
assertThat(documents.poll(5, TimeUnit.SECONDS), is(notNullValue()));
847847

848-
cancellation.dispose();
848+
disposable.dispose();
849849

850850
StepVerifier.create(template.insert(new Document("random", Math.random()).append("key", "value"), "capped")) //
851851
.expectNextCount(1) //

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import static org.springframework.data.domain.Sort.Direction.*;
2121

2222
import lombok.NoArgsConstructor;
23-
import reactor.core.Cancellation;
23+
import reactor.core.Disposable;
2424
import reactor.core.publisher.Flux;
2525
import reactor.core.publisher.Mono;
2626
import reactor.test.StepVerifier;
@@ -183,15 +183,15 @@ public void shouldUseInfiniteStream() throws Exception {
183183

184184
BlockingQueue<Capped> documents = new LinkedBlockingDeque<>(100);
185185

186-
Cancellation cancellation = cappedRepository.findByKey("value").doOnNext(documents::add).subscribe();
186+
Disposable disposable = cappedRepository.findByKey("value").doOnNext(documents::add).subscribe();
187187

188188
assertThat(documents.poll(5, TimeUnit.SECONDS), is(notNullValue()));
189189

190190
StepVerifier.create(template.insert(new Capped("value", Math.random()))).expectNextCount(1).verifyComplete();
191191
assertThat(documents.poll(5, TimeUnit.SECONDS), is(notNullValue()));
192192
assertThat(documents.isEmpty(), is(true));
193193

194-
cancellation.dispose();
194+
disposable.dispose();
195195
}
196196

197197
@Test // DATAMONGO-1444
@@ -208,7 +208,7 @@ public void shouldUseInfiniteStreamWithProjection() throws Exception {
208208

209209
BlockingQueue<CappedProjection> documents = new LinkedBlockingDeque<>(100);
210210

211-
Cancellation cancellation = cappedRepository.findProjectionByKey("value").doOnNext(documents::add).subscribe();
211+
Disposable disposable = cappedRepository.findProjectionByKey("value").doOnNext(documents::add).subscribe();
212212

213213
CappedProjection projection1 = documents.poll(5, TimeUnit.SECONDS);
214214
assertThat(projection1, is(notNullValue()));
@@ -222,7 +222,7 @@ public void shouldUseInfiniteStreamWithProjection() throws Exception {
222222

223223
assertThat(documents.isEmpty(), is(true));
224224

225-
cancellation.dispose();
225+
disposable.dispose();
226226
}
227227

228228
@Test // DATAMONGO-1444

src/main/asciidoc/reference/reactive-mongo-repositories.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,10 @@ public interface PersonRepository extends ReactiveMongoRepository<Person, String
206206
207207
Flux<Person> stream = repository.findByFirstname("Joe");
208208
209-
Cancellation cancellation = stream.doOnNext(person -> System.out.println(person)).subscribe();
209+
Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();
210210
211211
// …
212212
213213
// Later: Dispose the stream
214-
cancellation.dispose();
214+
subscription.dispose();
215215
----

src/main/asciidoc/reference/reactive-mongodb.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,12 +466,12 @@ By default, MongoDB will automatically close a cursor when the client has exhaus
466466
----
467467
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);
468468
469-
Cancellation cancellation = stream.doOnNext(person -> System.out.println(person)).subscribe();
469+
Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();
470470
471471
// …
472472
473473
// Later: Dispose the stream
474-
cancellation.dispose();
474+
subscription.dispose();
475475
----
476476

477477

0 commit comments

Comments
 (0)