13
13
* See the License for the specific language governing permissions and
14
14
* limitations under the License.
15
15
*/
16
-
17
16
package org .springframework .data .mongodb .core ;
18
17
19
18
import static com .sun .prism .impl .Disposer .*;
20
19
import static org .hamcrest .Matchers .*;
21
20
import static org .junit .Assert .*;
22
21
import static org .junit .Assume .*;
23
22
24
- import java .util .List ;
23
+ import reactor .core .publisher .Flux ;
24
+ import reactor .test .StepVerifier ;
25
25
26
26
import org .bson .Document ;
27
27
import org .junit .After ;
39
39
40
40
import com .mongodb .MongoException ;
41
41
import com .mongodb .ReadPreference ;
42
+ import com .mongodb .reactivestreams .client .MongoCollection ;
42
43
import com .mongodb .reactivestreams .client .MongoDatabase ;
43
44
44
- import reactor .core .publisher .Flux ;
45
- import reactor .test .TestSubscriber ;
46
-
47
45
/**
48
46
* Integration test for {@link ReactiveMongoTemplate} execute methods.
49
47
*
@@ -55,111 +53,116 @@ public class ReactiveMongoTemplateExecuteTests {
55
53
56
54
private static final Version THREE = Version .parse ("3.0" );
57
55
56
+ @ Rule public ExpectedException thrown = ExpectedException .none ();
57
+
58
58
@ Autowired SimpleReactiveMongoDatabaseFactory factory ;
59
59
@ Autowired ReactiveMongoOperations operations ;
60
60
61
- @ Rule public ExpectedException thrown = ExpectedException .none ();
62
-
63
61
Version mongoVersion ;
64
62
65
63
@ Before
66
64
public void setUp () {
67
65
cleanUp ();
68
66
69
67
if (mongoVersion == null ) {
70
- Document result = operations .executeCommand ("{ buildInfo: 1 }" ).block ();
71
- mongoVersion = Version .parse (result .get ("version" ).toString ());
68
+ mongoVersion = operations .executeCommand ("{ buildInfo: 1 }" ) //
69
+ .map (it -> it .get ("version" ).toString ())//
70
+ .map (Version ::parse ) //
71
+ .block ();
72
72
}
73
73
}
74
74
75
75
@ After
76
76
public void tearDown () {
77
77
78
- operations .dropCollection ("person" ).block ();
79
- operations .dropCollection (Person .class ).block ();
80
- operations .dropCollection ("execute_test" ).block ();
81
- operations .dropCollection ("execute_test1" ).block ();
82
- operations .dropCollection ("execute_test2" ).block ();
83
- operations .dropCollection ("execute_index_test" ).block ();
78
+ Flux <Void > cleanup = operations .dropCollection ("person" ) //
79
+ .mergeWith (operations .dropCollection (Person .class )) //
80
+ .mergeWith (operations .dropCollection ("execute_test" )) //
81
+ .mergeWith (operations .dropCollection ("execute_test1" )) //
82
+ .mergeWith (operations .dropCollection ("execute_test2" )) //
83
+ .mergeWith (operations .dropCollection ("execute_index_test" ));
84
+
85
+ StepVerifier .create (cleanup ).verifyComplete ();
84
86
}
85
87
86
88
@ Test // DATAMONGO-1444
87
- public void executeCommandJsonCommandShouldReturnSingleResponse () throws Exception {
89
+ public void executeCommandJsonCommandShouldReturnSingleResponse () {
88
90
89
- Document document = operations .executeCommand ("{ buildInfo: 1 }" ). block ();
91
+ StepVerifier . create ( operations .executeCommand ("{ buildInfo: 1 }" )). consumeNextWith ( actual -> {
90
92
91
- assertThat (document , hasKey ("version" ));
93
+ assertThat (actual , hasKey ("version" ));
94
+ }).verifyComplete ();
92
95
}
93
96
94
97
@ Test // DATAMONGO-1444
95
- public void executeCommandDocumentCommandShouldReturnSingleResponse () throws Exception {
98
+ public void executeCommandDocumentCommandShouldReturnSingleResponse () {
96
99
97
- Document document = operations .executeCommand (new Document ("buildInfo" , 1 )). block ();
100
+ StepVerifier . create ( operations .executeCommand (new Document ("buildInfo" , 1 ))). consumeNextWith ( actual -> {
98
101
99
- assertThat (document , hasKey ("version" ));
102
+ assertThat (actual , hasKey ("version" ));
103
+ }).verifyComplete ();
100
104
}
101
105
102
106
@ Test // DATAMONGO-1444
103
- public void executeCommandJsonCommandShouldReturnMultipleResponses () throws Exception {
107
+ public void executeCommandJsonCommandShouldReturnMultipleResponses () {
104
108
105
109
assumeTrue (mongoVersion .isGreaterThan (THREE ));
106
110
107
- operations .executeCommand ("{ insert: 'execute_test', documents: [{},{},{}]}" ).block ();
111
+ StepVerifier .create (operations .executeCommand ("{ insert: 'execute_test', documents: [{},{},{}]}" ))
112
+ .expectNextCount (1 ).verifyComplete ();
108
113
109
- TestSubscriber < Document > subscriber = TestSubscriber .create ();
110
- operations . executeCommand ( "{ find: 'execute_test'}" ). subscribe ( subscriber );
114
+ StepVerifier .create (operations . executeCommand ( "{ find: 'execute_test'}" )) //
115
+ . consumeNextWith ( actual -> {
111
116
112
- subscriber .awaitAndAssertNextValueCount (1 );
113
- subscriber .assertValuesWith (document -> {
114
-
115
- assertThat (document .get ("ok" , Double .class ), is (closeTo (1D , 0D )));
116
- assertThat (document , hasKey ("cursor" ));
117
- });
117
+ assertThat (actual .get ("ok" , Double .class ), is (closeTo (1D , 0D )));
118
+ assertThat (actual , hasKey ("cursor" ));
119
+ }) //
120
+ .verifyComplete ();
118
121
}
119
122
120
123
@ Test // DATAMONGO-1444
121
- public void executeCommandJsonCommandShouldTranslateExceptions () throws Exception {
124
+ public void executeCommandJsonCommandShouldTranslateExceptions () {
122
125
123
- TestSubscriber < Document > testSubscriber = TestSubscriber . subscribe (operations .executeCommand ("{ unknown: 1 }" ));
124
-
125
- testSubscriber . await (). assertError ( InvalidDataAccessApiUsageException . class );
126
+ StepVerifier . create (operations .executeCommand ("{ unknown: 1 }" )) //
127
+ . expectError ( InvalidDataAccessApiUsageException . class ) //
128
+ . verify ( );
126
129
}
127
130
128
131
@ Test // DATAMONGO-1444
129
- public void executeCommandDocumentCommandShouldTranslateExceptions () throws Exception {
132
+ public void executeCommandDocumentCommandShouldTranslateExceptions () {
130
133
131
- TestSubscriber <Document > testSubscriber = TestSubscriber
132
- .subscribe (operations .executeCommand (new Document ("unknown" , 1 )));
134
+ StepVerifier .create (operations .executeCommand (new Document ("unknown" , 1 ))) //
135
+ .expectError (InvalidDataAccessApiUsageException .class ) //
136
+ .verify ();
133
137
134
- testSubscriber .await ().assertError (InvalidDataAccessApiUsageException .class );
135
138
}
136
139
137
140
@ Test // DATAMONGO-1444
138
- public void executeCommandWithReadPreferenceCommandShouldTranslateExceptions () throws Exception {
139
-
140
- TestSubscriber <Document > testSubscriber = TestSubscriber
141
- .subscribe (operations .executeCommand (new Document ("unknown" , 1 ), ReadPreference .nearest ()));
141
+ public void executeCommandWithReadPreferenceCommandShouldTranslateExceptions () {
142
142
143
- testSubscriber .await ().assertError (InvalidDataAccessApiUsageException .class );
143
+ StepVerifier .create (operations .executeCommand (new Document ("unknown" , 1 ), ReadPreference .nearest ())) //
144
+ .expectError (InvalidDataAccessApiUsageException .class ) //
145
+ .verify ();
144
146
}
145
147
146
148
@ Test // DATAMONGO-1444
147
- public void executeOnDatabaseShouldExecuteCommand () throws Exception {
149
+ public void executeOnDatabaseShouldExecuteCommand () {
148
150
149
- operations .executeCommand ("{ insert: 'execute_test', documents: [{},{},{}]}" ). block ();
150
- operations .executeCommand ("{ insert: 'execute_test1', documents: [{},{},{}]}" ). block ();
151
- operations .executeCommand ("{ insert: 'execute_test2', documents: [{},{},{}]}" ). block ( );
151
+ Flux < Document > documentFlux = operations .executeCommand ("{ insert: 'execute_test', documents: [{},{},{}]}" )
152
+ . mergeWith ( operations .executeCommand ("{ insert: 'execute_test1', documents: [{},{},{}]}" ))
153
+ . mergeWith ( operations .executeCommand ("{ insert: 'execute_test2', documents: [{},{},{}]}" ));
152
154
153
- Flux < Document > execute = operations . execute ( MongoDatabase :: listCollections );
155
+ StepVerifier . create ( documentFlux ). expectNextCount ( 3 ). verifyComplete ( );
154
156
155
- List <Document > documents = execute .filter (document -> document .getString ("name" ).startsWith ("execute_test" ))
156
- .collectList ().block ();
157
+ Flux <Document > execute = operations .execute (MongoDatabase ::listCollections );
157
158
158
- assertThat (documents , hasSize (3 ));
159
+ StepVerifier .create (execute .filter (document -> document .getString ("name" ).startsWith ("execute_test" ))) //
160
+ .expectNextCount (3 ) //
161
+ .verifyComplete ();
159
162
}
160
163
161
164
@ Test // DATAMONGO-1444
162
- public void executeOnDatabaseShouldDeferExecution () throws Exception {
165
+ public void executeOnDatabaseShouldDeferExecution () {
163
166
164
167
operations .execute (db -> {
165
168
throw new MongoException (50 , "hi there" );
@@ -169,42 +172,34 @@ public void executeOnDatabaseShouldDeferExecution() throws Exception {
169
172
}
170
173
171
174
@ Test // DATAMONGO-1444
172
- public void executeOnDatabaseShouldShouldTranslateExceptions () throws Exception {
173
-
174
- TestSubscriber <Document > testSubscriber = TestSubscriber .create ();
175
+ public void executeOnDatabaseShouldShouldTranslateExceptions () {
175
176
176
177
Flux <Document > execute = operations .execute (db -> {
177
178
throw new MongoException (50 , "hi there" );
178
179
});
179
180
180
- execute .subscribe (testSubscriber );
181
-
182
- testSubscriber .await ().assertError (UncategorizedMongoDbException .class );
181
+ StepVerifier .create (execute ).expectError (UncategorizedMongoDbException .class ).verify ();
183
182
}
184
183
185
184
@ Test // DATAMONGO-1444
186
- public void executeOnCollectionWithTypeShouldReturnFindResults () throws Exception {
187
-
188
- operations .executeCommand ("{ insert: 'person', documents: [{},{},{}]}" ).block ();
189
-
190
- TestSubscriber <Document > testSubscriber = TestSubscriber .create ();
185
+ public void executeOnCollectionWithTypeShouldReturnFindResults () {
191
186
192
- Flux <Document > execute = operations .execute (Person .class , collection -> collection .find ());
193
- execute .subscribe (testSubscriber );
187
+ StepVerifier .create (operations .executeCommand ("{ insert: 'person', documents: [{},{},{}]}" )) //
188
+ .expectNextCount (1 ) //
189
+ .verifyComplete ();
194
190
195
- testSubscriber . awaitAndAssertNextValueCount ( 3 ).assertComplete ();
191
+ StepVerifier . create ( operations . execute ( Person . class , MongoCollection :: find )). expectNextCount ( 3 ).verifyComplete ();
196
192
}
197
193
198
194
@ Test // DATAMONGO-1444
199
- public void executeOnCollectionWithNameShouldReturnFindResults () throws Exception {
200
-
201
- operations .executeCommand ("{ insert: 'execute_test', documents: [{},{},{}]}" ).block ();
202
-
203
- TestSubscriber <Document > testSubscriber = TestSubscriber .create ();
195
+ public void executeOnCollectionWithNameShouldReturnFindResults () {
204
196
205
- Flux <Document > execute = operations .execute ("execute_test" , collection -> collection .find ());
206
- execute .subscribe (testSubscriber );
197
+ StepVerifier .create (operations .executeCommand ("{ insert: 'execute_test', documents: [{},{},{}]}" )) //
198
+ .expectNextCount (1 ) //
199
+ .verifyComplete ();
207
200
208
- testSubscriber .awaitAndAssertNextValueCount (3 ).assertComplete ();
201
+ StepVerifier .create (operations .execute ("execute_test" , MongoCollection ::find )) //
202
+ .expectNextCount (3 ) //
203
+ .verifyComplete ();
209
204
}
210
205
}
0 commit comments