Skip to content

Commit 61503fa

Browse files
committed
[ETCM-739] Refactor BlockFetcher
Change BlockFetcher to typed actor Split fetcher message handling among child actors Abstract the fetch trait Scalafmt Refactor blockFetcherState Fix it tests Fix scalastyle, comments Fix comments Add logging
1 parent d40732a commit 61503fa

File tree

15 files changed

+599
-775
lines changed

15 files changed

+599
-775
lines changed

nix/overlay.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ rev: final: prev: {
33

44
mantis = final.callPackage ./mantis.nix {
55
src = ../.;
6-
depsSha256 = "sha256-0AeemKFcIU3eVGse8QQGauJeRsF7IgCLo5Yqu2FZsMs=";
6+
depsSha256 = "sha256-US4L/xh2otnEfOa05bazb14bgYhQZpF4GfFY30sDkNY=";
77
};
88

99
mantis-hash = final.mantis.override {

project/Dependencies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ object Dependencies {
1414
Seq(
1515
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
1616
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
17+
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
1718
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
19+
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion,
1820
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
1921
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.5" % "it,test"
2022
)

src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ class BlockImporterItSpec
267267

268268
val msg = fetcherProbe
269269
.fishForMessage(Timeouts.longTimeout) {
270-
case BlockFetcher.FetchStateNode(_) => true
270+
case BlockFetcher.FetchStateNode(_, _) => true
271271
case _ => false
272272
}
273273
.asInstanceOf[BlockFetcher.FetchStateNode]

src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,6 @@ class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAl
2828
testScheduler.awaitTermination(120.second)
2929
}
3030

31-
"a peer should reorganise when receives a checkpoint older than the current best from a peer" in customTestCaseResourceM(
32-
FakePeer.start2FakePeersRes()
33-
) { case (peer1, peer2) =>
34-
for {
35-
_ <- peer1.importBlocksUntil(20)(IdentityUpdate)
36-
_ <- peer2.importBlocksUntil(30)(IdentityUpdate)
37-
_ <- peer1.startRegularSync()
38-
_ <- peer2.startRegularSync()
39-
_ <- peer1.addCheckpointedBlock(peer1.bl.getBestBlock().get)
40-
_ <- peer1.waitForRegularSyncLoadLastBlock(21)
41-
_ <- peer2.getCheckpointFromPeer(peer1.bl.getBestBlock().get, PeerId("Peer1"))
42-
_ <- peer2.waitForRegularSyncLoadLastBlock(21)
43-
} yield {
44-
assert(peer1.bl.getBestBlock().get.hash == peer2.bl.getBestBlock().get.hash)
45-
assert(peer1.bl.getLatestCheckpointBlockNumber() == peer2.bl.getLatestCheckpointBlockNumber())
46-
}
47-
}
48-
4931
"peer 2 should sync to the top of peer1 blockchain" - {
5032
"given a previously imported blockchain" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
5133
case (peer1, peer2) =>

src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,13 @@
11
package io.iohk.ethereum.sync.util
22

3-
import akka.actor.ActorRef
3+
import akka.actor.{ActorRef, typed}
44
import akka.util.ByteString
55
import cats.effect.Resource
66
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
77
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcast.BlockToBroadcast
88
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
99
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.Start
10-
import io.iohk.ethereum.blockchain.sync.regular.{
11-
BlockBroadcast,
12-
BlockBroadcasterActor,
13-
BlockFetcher,
14-
BlockImporter,
15-
RegularSync
16-
}
10+
import io.iohk.ethereum.blockchain.sync.regular.{BlockBroadcast, BlockBroadcasterActor, BlockFetcher, BlockImporter, RegularSync}
1711
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
1812
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
1913
import io.iohk.ethereum.checkpointing.CheckpointingTestHelpers
@@ -36,6 +30,8 @@ import io.iohk.ethereum.utils._
3630
import io.iohk.ethereum.vm.EvmConfig
3731
import monix.eval.Task
3832
import monix.execution.Scheduler
33+
import akka.actor.typed.scaladsl.adapter._
34+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.AdaptedMessageFromEventBus
3935

4036
import scala.concurrent.duration._
4137
object RegularSyncItSpecUtils {
@@ -86,15 +82,15 @@ object RegularSyncItSpecUtils {
8682
"block-broadcaster"
8783
)
8884

89-
val fetcher: ActorRef =
90-
system.actorOf(
91-
BlockFetcher.props(peersClient, peerEventBus, regularSync, syncConfig, validators.blockValidator),
85+
val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] =
86+
system.spawn(
87+
BlockFetcher(peersClient, peerEventBus, regularSync, syncConfig, validators.blockValidator),
9288
"block-fetcher"
9389
)
9490

9591
lazy val blockImporter = system.actorOf(
9692
BlockImporter.props(
97-
fetcher,
93+
fetcher.toClassic,
9894
ledger,
9995
bl,
10096
syncConfig,
@@ -181,7 +177,7 @@ object RegularSyncItSpecUtils {
181177

182178
def getCheckpointFromPeer(checkpoint: Block, peerId: PeerId): Task[Unit] = Task {
183179
blockImporter ! Start
184-
fetcher ! MessageFromPeer(NewBlock(checkpoint, checkpoint.header.difficulty), peerId)
180+
fetcher ! AdaptedMessageFromEventBus(NewBlock(checkpoint, checkpoint.header.difficulty), peerId)
185181
}
186182

187183
private def getMptForBlock(block: Block) = {

0 commit comments

Comments
 (0)