@@ -3,9 +3,10 @@ package io.iohk.ethereum.blockchain.sync.regular
3
3
import java .net .InetSocketAddress
4
4
5
5
import akka .actor .ActorSystem
6
- import akka .actor .testkit .typed .scaladsl .ScalaTestWithActorTestKit
6
+ import akka .actor .testkit .typed .scaladsl .ActorTestKit
7
7
import akka .actor .typed .ActorRef
8
8
import akka .actor .typed .scaladsl .adapter ._
9
+ import akka .testkit .TestKit
9
10
import akka .testkit .TestProbe
10
11
11
12
import scala .concurrent .ExecutionContext .Implicits .global
@@ -41,7 +42,7 @@ import io.iohk.ethereum.network.p2p.messages.ETH62._
41
42
import io .iohk .ethereum .security .SecureRandomBuilder
42
43
import io .iohk .ethereum .utils .Config
43
44
44
- class BlockFetcherSpec extends ScalaTestWithActorTestKit () with AnyFreeSpecLike with Matchers with SecureRandomBuilder {
45
+ class BlockFetcherSpec extends AnyFreeSpecLike with Matchers with SecureRandomBuilder {
45
46
46
47
" BlockFetcher" - {
47
48
@@ -76,6 +77,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
76
77
peersClient.send(refExpectingReply, PeersClient .Response (fakePeer, secondGetBlockHeadersResponse))
77
78
78
79
peersClient.expectMsgPF() { case PeersClient .Request (msg, _, _) if msg == firstGetBlockHeadersRequest => () }
80
+ shutdownActorSystem()
79
81
}
80
82
81
83
" should not requests headers upon invalidation while a request is already in progress, should resume after failure in response" in new TestSetup {
@@ -111,6 +113,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
111
113
112
114
peersClient.expectMsgClass(classOf [BlacklistPeer ])
113
115
peersClient.expectMsgPF() { case PeersClient .Request (msg, _, _) if msg == firstGetBlockHeadersRequest => () }
116
+ shutdownActorSystem()
114
117
}
115
118
116
119
" should not enqueue requested blocks if the received bodies do not match" in new TestSetup {
@@ -129,6 +132,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
129
132
// Fetcher should not enqueue any new block
130
133
importer.send(blockFetcher.toClassic, PickBlocks (syncConfig.blocksBatchSize, importer.ref))
131
134
importer.expectNoMessage(100 .millis)
135
+ shutdownActorSystem()
132
136
}
133
137
134
138
" should be able to handle block bodies received in several parts" in new TestSetup {
@@ -161,6 +165,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
161
165
importer.expectMsgPF() { case BlockFetcher .PickedBlocks (blocks) =>
162
166
blocks.map(_.hash).toList shouldEqual firstBlocksBatch.map(_.hash)
163
167
}
168
+ shutdownActorSystem()
164
169
}
165
170
166
171
" should stop requesting, without blacklist the peer, in case empty bodies are received" in new TestSetup {
@@ -190,6 +195,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
190
195
importer.expectMsgPF() { case BlockFetcher .PickedBlocks (blocks) =>
191
196
blocks.map(_.hash).toList shouldEqual subChain1.map(_.hash)
192
197
}
198
+ shutdownActorSystem()
193
199
}
194
200
195
201
" should ensure blocks passed to importer are always forming chain" in new TestSetup {
@@ -210,14 +216,20 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
210
216
skip = 0 ,
211
217
reverse = false
212
218
)
213
- // Save the reference to respond to the ask pattern on fetcher
214
- val refForAnswerSecondHeaderReq = peersClient.expectMsgPF() {
215
- case PeersClient .Request (msg, _, _) if msg == secondGetBlockHeadersRequest => peersClient.lastSender
219
+
220
+ val msgs = peersClient.receiveWhile() {
221
+ // Save the reference to respond to the ask pattern on fetcher
222
+ case PeersClient .Request (`secondGetBlockHeadersRequest`, _, _) =>
223
+ (secondGetBlockHeadersRequest, peersClient.lastSender)
224
+ // First bodies request
225
+ case PeersClient .Request (`firstGetBlockBodiesRequest`, _, _) =>
226
+ (firstGetBlockBodiesRequest, peersClient.lastSender)
216
227
}
217
228
218
- // First bodies request
219
- val refForAnswerFirstBodiesReq = peersClient.expectMsgPF() {
220
- case PeersClient .Request (msg, _, _) if msg == firstGetBlockBodiesRequest => peersClient.lastSender
229
+ val (refForAnswerSecondHeaderReq, refForAnswerFirstBodiesReq) = msgs match {
230
+ case Seq ((`secondGetBlockHeadersRequest`, s1), (`firstGetBlockBodiesRequest`, s2)) => (s1, s2)
231
+ case Seq ((`firstGetBlockBodiesRequest`, s2), (`secondGetBlockHeadersRequest`, s1)) => (s1, s2)
232
+ case _ => fail(" missing body or header request" )
221
233
}
222
234
223
235
// Block 16 is mined (we could have reached this stage due to invalidation messages sent to the fetcher)
@@ -249,9 +261,9 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
249
261
importer.send(blockFetcher.toClassic, PickBlocks (syncConfig.blocksBatchSize, importer.ref))
250
262
importer.expectMsgPF() { case BlockFetcher .PickedBlocks (blocks) =>
251
263
val headers = blocks.map(_.header).toList
252
-
253
264
assert(HeadersSeq .areChain(headers))
254
265
}
266
+ shutdownActorSystem()
255
267
}
256
268
257
269
" should properly handle a request timeout" in new TestSetup {
@@ -268,11 +280,13 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
268
280
Thread .sleep((syncConfig.peerResponseTimeout + 2 .seconds).toMillis)
269
281
270
282
peersClient.expectMsgPF() { case PeersClient .Request (`firstGetBlockHeadersRequest`, _, _) => () }
283
+ shutdownActorSystem()
271
284
}
272
285
}
273
286
274
287
trait TestSetup extends TestSyncConfig {
275
288
val as : ActorSystem = ActorSystem (" BlockFetcherSpec_System" )
289
+ val atks : ActorTestKit = ActorTestKit (as.toTyped)
276
290
277
291
val time = new VirtualTime
278
292
@@ -295,7 +309,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
295
309
val fakePeerActor : TestProbe = TestProbe ()(as)
296
310
val fakePeer : Peer = Peer (PeerId (" fakePeer" ), new InetSocketAddress (" 127.0.0.1" , 9000 ), fakePeerActor.ref, false )
297
311
298
- lazy val blockFetcher : ActorRef [BlockFetcher .FetchCommand ] = spawn(
312
+ lazy val blockFetcher : ActorRef [BlockFetcher .FetchCommand ] = atks. spawn(
299
313
BlockFetcher (
300
314
peersClient.ref,
301
315
peerEventBus.ref,
@@ -318,6 +332,11 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
318
332
)
319
333
}
320
334
335
+ def shutdownActorSystem (): Unit = {
336
+ atks.shutdownTestKit()
337
+ TestKit .shutdownActorSystem(as, verifySystemShutdown = true )
338
+ }
339
+
321
340
// Sending a far away block as a NewBlock message
322
341
// Currently BlockFetcher only downloads first block-headers-per-request blocks without this
323
342
def triggerFetching (startingNumber : BigInt = 1000 ): Unit = {
0 commit comments