Skip to content

[Kaizen] Unflake BlockFetcherSpec #1050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 11, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package io.iohk.ethereum.blockchain.sync.regular
import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.adapter._
import akka.testkit.TestKit
import akka.testkit.TestProbe

import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -41,7 +42,7 @@ import io.iohk.ethereum.network.p2p.messages.ETH62._
import io.iohk.ethereum.security.SecureRandomBuilder
import io.iohk.ethereum.utils.Config

class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike with Matchers with SecureRandomBuilder {
class BlockFetcherSpec extends AnyFreeSpecLike with Matchers with SecureRandomBuilder {

"BlockFetcher" - {

Expand Down Expand Up @@ -76,6 +77,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
peersClient.send(refExpectingReply, PeersClient.Response(fakePeer, secondGetBlockHeadersResponse))

peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }
shutdownActorSystem()
}

"should not requests headers upon invalidation while a request is already in progress, should resume after failure in response" in new TestSetup {
Expand Down Expand Up @@ -111,6 +113,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike

peersClient.expectMsgClass(classOf[BlacklistPeer])
peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }
shutdownActorSystem()
}

"should not enqueue requested blocks if the received bodies do not match" in new TestSetup {
Expand All @@ -129,6 +132,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
// Fetcher should not enqueue any new block
importer.send(blockFetcher.toClassic, PickBlocks(syncConfig.blocksBatchSize, importer.ref))
importer.expectNoMessage(100.millis)
shutdownActorSystem()
}

"should be able to handle block bodies received in several parts" in new TestSetup {
Expand Down Expand Up @@ -161,6 +165,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
blocks.map(_.hash).toList shouldEqual firstBlocksBatch.map(_.hash)
}
shutdownActorSystem()
}

"should stop requesting, without blacklist the peer, in case empty bodies are received" in new TestSetup {
Expand Down Expand Up @@ -190,6 +195,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
blocks.map(_.hash).toList shouldEqual subChain1.map(_.hash)
}
shutdownActorSystem()
}

"should ensure blocks passed to importer are always forming chain" in new TestSetup {
Expand All @@ -210,14 +216,20 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
skip = 0,
reverse = false
)
// Save the reference to respond to the ask pattern on fetcher
val refForAnswerSecondHeaderReq = peersClient.expectMsgPF() {
case PeersClient.Request(msg, _, _) if msg == secondGetBlockHeadersRequest => peersClient.lastSender

val msgs = peersClient.receiveWhile() {
// Save the reference to respond to the ask pattern on fetcher
case PeersClient.Request(`secondGetBlockHeadersRequest`, _, _) =>
(secondGetBlockHeadersRequest, peersClient.lastSender)
// First bodies request
case PeersClient.Request(`firstGetBlockBodiesRequest`, _, _) =>
(firstGetBlockBodiesRequest, peersClient.lastSender)
}

// First bodies request
val refForAnswerFirstBodiesReq = peersClient.expectMsgPF() {
case PeersClient.Request(msg, _, _) if msg == firstGetBlockBodiesRequest => peersClient.lastSender
val (refForAnswerSecondHeaderReq, refForAnswerFirstBodiesReq) = msgs match {
case Seq((`secondGetBlockHeadersRequest`, s1), (`firstGetBlockBodiesRequest`, s2)) => (s1, s2)
case Seq((`firstGetBlockBodiesRequest`, s2), (`secondGetBlockHeadersRequest`, s1)) => (s1, s2)
case _ => fail("missing body or header request")
}

// Block 16 is mined (we could have reached this stage due to invalidation messages sent to the fetcher)
Expand Down Expand Up @@ -249,9 +261,9 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
importer.send(blockFetcher.toClassic, PickBlocks(syncConfig.blocksBatchSize, importer.ref))
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
val headers = blocks.map(_.header).toList

assert(HeadersSeq.areChain(headers))
}
shutdownActorSystem()
}

"should properly handle a request timeout" in new TestSetup {
Expand All @@ -268,11 +280,13 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
Thread.sleep((syncConfig.peerResponseTimeout + 2.seconds).toMillis)

peersClient.expectMsgPF() { case PeersClient.Request(`firstGetBlockHeadersRequest`, _, _) => () }
shutdownActorSystem()
}
}

trait TestSetup extends TestSyncConfig {
val as: ActorSystem = ActorSystem("BlockFetcherSpec_System")
val atks: ActorTestKit = ActorTestKit(as.toTyped)

val time = new VirtualTime

Expand All @@ -295,7 +309,7 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
val fakePeerActor: TestProbe = TestProbe()(as)
val fakePeer: Peer = Peer(PeerId("fakePeer"), new InetSocketAddress("127.0.0.1", 9000), fakePeerActor.ref, false)

lazy val blockFetcher: ActorRef[BlockFetcher.FetchCommand] = spawn(
lazy val blockFetcher: ActorRef[BlockFetcher.FetchCommand] = atks.spawn(
BlockFetcher(
peersClient.ref,
peerEventBus.ref,
Expand All @@ -318,6 +332,11 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
)
}

def shutdownActorSystem(): Unit = {
atks.shutdownTestKit()
TestKit.shutdownActorSystem(as, verifySystemShutdown = true)
}

// Sending a far away block as a NewBlock message
// Currently BlockFetcher only downloads first block-headers-per-request blocks without this
def triggerFetching(startingNumber: BigInt = 1000): Unit = {
Expand Down