Skip to content

Commit 51536c4

Browse files
authored
[ETCM-739] Refactor BlockFetcher (#976)
Change BlockFetcher to typed actor Split fetcher message handling among child actors Abstract the fetch trait Scalafmt Refactor blockFetcherState Fix it tests Fix scalastyle, comments Fix comments Add logging
1 parent d40732a commit 51536c4

File tree

15 files changed

+599
-775
lines changed

15 files changed

+599
-775
lines changed

nix/overlay.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ rev: final: prev: {
33

44
mantis = final.callPackage ./mantis.nix {
55
src = ../.;
6-
depsSha256 = "sha256-0AeemKFcIU3eVGse8QQGauJeRsF7IgCLo5Yqu2FZsMs=";
6+
depsSha256 = "sha256-US4L/xh2otnEfOa05bazb14bgYhQZpF4GfFY30sDkNY=";
77
};
88

99
mantis-hash = final.mantis.override {

project/Dependencies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ object Dependencies {
1414
Seq(
1515
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
1616
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
17+
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
1718
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
19+
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion,
1820
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
1921
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.5" % "it,test"
2022
)

src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ class BlockImporterItSpec
267267

268268
val msg = fetcherProbe
269269
.fishForMessage(Timeouts.longTimeout) {
270-
case BlockFetcher.FetchStateNode(_) => true
270+
case BlockFetcher.FetchStateNode(_, _) => true
271271
case _ => false
272272
}
273273
.asInstanceOf[BlockFetcher.FetchStateNode]

src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,6 @@ class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAl
2828
testScheduler.awaitTermination(120.second)
2929
}
3030

31-
"a peer should reorganise when receives a checkpoint older than the current best from a peer" in customTestCaseResourceM(
32-
FakePeer.start2FakePeersRes()
33-
) { case (peer1, peer2) =>
34-
for {
35-
_ <- peer1.importBlocksUntil(20)(IdentityUpdate)
36-
_ <- peer2.importBlocksUntil(30)(IdentityUpdate)
37-
_ <- peer1.startRegularSync()
38-
_ <- peer2.startRegularSync()
39-
_ <- peer1.addCheckpointedBlock(peer1.bl.getBestBlock().get)
40-
_ <- peer1.waitForRegularSyncLoadLastBlock(21)
41-
_ <- peer2.getCheckpointFromPeer(peer1.bl.getBestBlock().get, PeerId("Peer1"))
42-
_ <- peer2.waitForRegularSyncLoadLastBlock(21)
43-
} yield {
44-
assert(peer1.bl.getBestBlock().get.hash == peer2.bl.getBestBlock().get.hash)
45-
assert(peer1.bl.getLatestCheckpointBlockNumber() == peer2.bl.getLatestCheckpointBlockNumber())
46-
}
47-
}
48-
4931
"peer 2 should sync to the top of peer1 blockchain" - {
5032
"given a previously imported blockchain" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
5133
case (peer1, peer2) =>

src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,13 @@
11
package io.iohk.ethereum.sync.util
22

3-
import akka.actor.ActorRef
3+
import akka.actor.{ActorRef, typed}
44
import akka.util.ByteString
55
import cats.effect.Resource
66
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
77
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcast.BlockToBroadcast
88
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
99
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.Start
10-
import io.iohk.ethereum.blockchain.sync.regular.{
11-
BlockBroadcast,
12-
BlockBroadcasterActor,
13-
BlockFetcher,
14-
BlockImporter,
15-
RegularSync
16-
}
10+
import io.iohk.ethereum.blockchain.sync.regular.{BlockBroadcast, BlockBroadcasterActor, BlockFetcher, BlockImporter, RegularSync}
1711
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
1812
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
1913
import io.iohk.ethereum.checkpointing.CheckpointingTestHelpers
@@ -36,6 +30,8 @@ import io.iohk.ethereum.utils._
3630
import io.iohk.ethereum.vm.EvmConfig
3731
import monix.eval.Task
3832
import monix.execution.Scheduler
33+
import akka.actor.typed.scaladsl.adapter._
34+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.AdaptedMessageFromEventBus
3935

4036
import scala.concurrent.duration._
4137
object RegularSyncItSpecUtils {
@@ -86,15 +82,15 @@ object RegularSyncItSpecUtils {
8682
"block-broadcaster"
8783
)
8884

89-
val fetcher: ActorRef =
90-
system.actorOf(
91-
BlockFetcher.props(peersClient, peerEventBus, regularSync, syncConfig, validators.blockValidator),
85+
val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] =
86+
system.spawn(
87+
BlockFetcher(peersClient, peerEventBus, regularSync, syncConfig, validators.blockValidator),
9288
"block-fetcher"
9389
)
9490

9591
lazy val blockImporter = system.actorOf(
9692
BlockImporter.props(
97-
fetcher,
93+
fetcher.toClassic,
9894
ledger,
9995
bl,
10096
syncConfig,
@@ -181,7 +177,7 @@ object RegularSyncItSpecUtils {
181177

182178
def getCheckpointFromPeer(checkpoint: Block, peerId: PeerId): Task[Unit] = Task {
183179
blockImporter ! Start
184-
fetcher ! MessageFromPeer(NewBlock(checkpoint, checkpoint.header.difficulty), peerId)
180+
fetcher ! AdaptedMessageFromEventBus(NewBlock(checkpoint, checkpoint.header.difficulty), peerId)
185181
}
186182

187183
private def getMptForBlock(block: Block) = {

0 commit comments

Comments
 (0)