@@ -105,37 +105,28 @@ void clusterRestart() {
105
105
106
106
publisherStates .forEach (PublisherState ::start );
107
107
108
- consumerStates .stream ()
109
- .map (s -> s .waitForNewMessages (10 ))
110
- .parallel ()
111
- .forEach (s -> assertThat (s ).completes ());
108
+ List <Sync > syncs =
109
+ consumerStates .stream ().map (s -> s .waitForNewMessages (10 )).collect (toList ());
110
+ syncs .forEach (s -> assertThat (s ).completes ());
112
111
113
112
nodes .forEach (Cli ::restartNode );
114
113
Cli .rebalance ();
115
114
116
115
waitAtMost (() -> connection .state () == Resource .State .OPEN );
117
116
118
- qqNames .stream ()
119
- .parallel ()
120
- .forEach (
121
- n -> waitAtMostNoException (TIMEOUT .multipliedBy (2 ), () -> management .queueInfo (n )));
122
- qqNames .stream ()
123
- .parallel ()
124
- .forEach (
125
- n ->
126
- assertThat (management .queueInfo (n ).replicas ())
127
- .hasSameSizeAs (nodes )
128
- .containsExactlyInAnyOrderElementsOf (nodes ));
129
-
130
- publisherStates .stream ()
131
- .map (s -> s .waitForNewMessages (10 ))
132
- .parallel ()
133
- .forEach (s -> assertThat (s ).completes ());
134
-
135
- consumerStates .stream ()
136
- .map (s -> s .waitForNewMessages (10 ))
137
- .parallel ()
138
- .forEach (s -> assertThat (s ).completes ());
117
+ qqNames .forEach (
118
+ n -> waitAtMostNoException (TIMEOUT .multipliedBy (2 ), () -> management .queueInfo (n )));
119
+ qqNames .forEach (
120
+ n ->
121
+ assertThat (management .queueInfo (n ).replicas ())
122
+ .hasSameSizeAs (nodes )
123
+ .containsExactlyInAnyOrderElementsOf (nodes ));
124
+
125
+ syncs = publisherStates .stream ().map (s -> s .waitForNewMessages (10 )).collect (toList ());
126
+ syncs .forEach (s -> assertThat (s ).completes ());
127
+
128
+ syncs = consumerStates .stream ().map (s -> s .waitForNewMessages (10 )).collect (toList ());
129
+ syncs .forEach (s -> assertThat (s ).completes ());
139
130
140
131
assertThat (publisherStates ).allMatch (s -> s .state () == Resource .State .OPEN );
141
132
assertThat (consumerStates ).allMatch (s -> s .state () == Resource .State .OPEN );
0 commit comments