Skip to content

Commit 9876ed2

Browse files
committed
[ETCM-739] Refactor BlockFetcher
Change BlockFetcher to typed actor Split fetcher message handling among child actors Abstract the fetch trait Scalafmt Refactor blockFetcherState
1 parent c38c2c2 commit 9876ed2

File tree

12 files changed

+590
-748
lines changed

12 files changed

+590
-748
lines changed

nix/overlay.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ rev: final: prev: {
33

44
mantis = final.callPackage ./mantis.nix {
55
src = ../.;
6-
depsSha256 = "sha256-0AeemKFcIU3eVGse8QQGauJeRsF7IgCLo5Yqu2FZsMs=";
6+
depsSha256 = "sha256-US4L/xh2otnEfOa05bazb14bgYhQZpF4GfFY30sDkNY=";
77
};
88

99
mantis-hash = final.mantis.override {

project/Dependencies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ object Dependencies {
1414
Seq(
1515
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
1616
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
17+
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
1718
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
19+
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion,
1820
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
1921
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.5" % "it,test"
2022
)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 214 additions & 354 deletions
Large diffs are not rendered by default.

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import scala.collection.immutable.Queue
2929
* - haven't fetched any yet
3030
* - are awaiting a response
3131
* - are awaiting a response but it should be ignored due to blocks being invalidated
32-
* @param stateNodeFetcher
3332
* @param lastBlock
3433
* @param knownTop
3534
* @param blockProviders
@@ -41,17 +40,13 @@ case class BlockFetcherState(
4140
waitingHeaders: Queue[BlockHeader],
4241
fetchingHeadersState: FetchingHeadersState,
4342
fetchingBodiesState: FetchingBodiesState,
44-
pausedFetching: Boolean = false,
45-
stateNodeFetcher: Option[StateNodeFetcher],
4643
lastBlock: BigInt,
4744
knownTop: BigInt,
4845
blockProviders: Map[BigInt, PeerId]
4946
) {
5047

5148
def isFetching: Boolean = isFetchingHeaders || isFetchingBodies
5249

53-
def isFetchingStateNode: Boolean = stateNodeFetcher.isDefined
54-
5550
private def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty
5651

5752
def hasFetchedTopHeader: Boolean = nextBlockToFetch == knownTop + 1
@@ -88,31 +83,6 @@ case class BlockFetcherState(
8883
)
8984
})
9085

91-
def tryInsertBlock(block: Block, peerId: PeerId): Either[String, BlockFetcherState] = {
92-
val blockHash = block.hash
93-
if (isExist(blockHash)) {
94-
Right(this)
95-
} else if (isExistInReadyBlocks(block.header.parentHash)) {
96-
val newState = clearQueues()
97-
.copy(
98-
readyBlocks = readyBlocks.takeWhile(_.number < block.number).enqueue(block)
99-
)
100-
.withPeerForBlocks(peerId, Seq(block.number))
101-
.withKnownTopAt(block.number)
102-
Right(newState)
103-
} else if (isExistInWaitingHeaders(block.header.parentHash)) {
104-
// ignore already requested bodies
105-
val newFetchingBodiesState =
106-
if (fetchingBodiesState == AwaitingBodies) AwaitingBodiesToBeIgnored else fetchingBodiesState
107-
val newState = copy(
108-
waitingHeaders = waitingHeaders.takeWhile(_.number < block.number).enqueue(block.header),
109-
fetchingBodiesState = newFetchingBodiesState
110-
)
111-
.withKnownTopAt(block.number)
112-
Right(newState)
113-
} else Left(s"Cannot insert block [${ByteStringUtils.hash2string(blockHash)}] into the queues")
114-
}
115-
11686
/**
11787
* Validates received headers consistency and their compatibility with the state
11888
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
@@ -235,7 +205,7 @@ case class BlockFetcherState(
235205
.filter(_.headOption.exists(block => block.number <= lower))
236206
.filter(_.lastOption.exists(block => block.number >= upper))
237207
.filter(_.nonEmpty)
238-
.map(blocks => (NonEmptyList(blocks.head, blocks.tail.toList), copy(readyBlocks = Queue())))
208+
.map(blocks => (NonEmptyList(blocks.head, blocks.tail.toList), copy(readyBlocks = Queue(), lastBlock = blocks.last.number)))
239209
}
240210

241211
def clearQueues(): BlockFetcherState = {
@@ -267,11 +237,11 @@ case class BlockFetcherState(
267237
)
268238
}
269239

270-
def isExist(hash: ByteString): Boolean = isExistInReadyBlocks(hash) || isExistInWaitingHeaders(hash)
240+
def exists(hash: ByteString): Boolean = existsInReadyBlocks(hash) || existsInWaitingHeaders(hash)
271241

272-
def isExistInWaitingHeaders(hash: ByteString): Boolean = waitingHeaders.exists(_.hash == hash)
242+
def existsInWaitingHeaders(hash: ByteString): Boolean = waitingHeaders.exists(_.hash == hash)
273243

274-
def isExistInReadyBlocks(hash: ByteString): Boolean = readyBlocks.exists(_.hash == hash)
244+
def existsInReadyBlocks(hash: ByteString): Boolean = readyBlocks.exists(_.hash == hash)
275245

276246
def withLastBlock(nr: BigInt): BlockFetcherState = copy(lastBlock = nr)
277247

@@ -296,14 +266,6 @@ case class BlockFetcherState(
296266
def withNewBodiesFetch: BlockFetcherState = copy(fetchingBodiesState = AwaitingBodies)
297267
def withBodiesFetchReceived: BlockFetcherState = copy(fetchingBodiesState = NotFetchingBodies)
298268

299-
def withPausedFetching: BlockFetcherState = copy(pausedFetching = true)
300-
def withResumedFetching: BlockFetcherState = copy(pausedFetching = false)
301-
302-
def fetchingStateNode(hash: ByteString, requestor: ActorRef): BlockFetcherState =
303-
copy(stateNodeFetcher = Some(StateNodeFetcher(hash, requestor)))
304-
305-
def notFetchingStateNode(): BlockFetcherState = copy(stateNodeFetcher = None)
306-
307269
def status: Map[String, Any] = Map(
308270
"ready blocks" -> readyBlocks.size,
309271
"known top" -> knownTop,
@@ -314,7 +276,6 @@ case class BlockFetcherState(
314276
"fetched headers" -> waitingHeaders.size,
315277
"fetching headers" -> isFetchingHeaders,
316278
"fetching bodies" -> isFetchingBodies,
317-
"fetching state node" -> isFetchingStateNode,
318279
"fetched top header" -> hasFetchedTopHeader,
319280
"first header" -> waitingHeaders.headOption.map(_.number),
320281
"first block" -> readyBlocks.headOption.map(_.number),
@@ -333,7 +294,6 @@ object BlockFetcherState {
333294
waitingHeaders = Queue(),
334295
fetchingHeadersState = NotFetchingHeaders,
335296
fetchingBodiesState = NotFetchingBodies,
336-
stateNodeFetcher = None,
337297
lastBlock = lastBlock,
338298
knownTop = lastBlock + 1,
339299
blockProviders = Map()

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ class BlockImporter(
129129

130130
private def pickBlocks(state: ImporterState): Unit = {
131131
val msg =
132-
state.resolvingBranchFrom.fold[BlockFetcher.FetchMsg](BlockFetcher.PickBlocks(syncConfig.blocksBatchSize))(from =>
133-
BlockFetcher.StrictPickBlocks(from, startingBlockNumber)
134-
)
132+
state.resolvingBranchFrom.fold[BlockFetcher.FetchCommand](
133+
BlockFetcher.PickBlocks(syncConfig.blocksBatchSize, self)
134+
)(from => BlockFetcher.StrictPickBlocks(from, startingBlockNumber, self))
135135

136136
fetcher ! msg
137137
}
@@ -172,7 +172,7 @@ class BlockImporter(
172172

173173
err match {
174174
case e: MissingNodeException =>
175-
fetcher ! BlockFetcher.FetchStateNode(e.hash)
175+
fetcher ! BlockFetcher.FetchStateNode(e.hash, self)
176176
ResolvingMissingNode(NonEmptyList(notImportedBlocks.head, notImportedBlocks.tail))
177177
case _ =>
178178
val invalidBlockNr = notImportedBlocks.head.number
@@ -189,7 +189,7 @@ class BlockImporter(
189189
if (blocks.isEmpty) {
190190
importedBlocks.headOption match {
191191
case Some(block) =>
192-
supervisor ! ProgressProtocol.ImportedBlock(block.number, block.hasCheckpoint, internally = false)
192+
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false)
193193
case None => ()
194194
}
195195

@@ -243,7 +243,7 @@ class BlockImporter(
243243
val (blocks, weights) = importedBlocksData.map(data => (data.block, data.weight)).unzip
244244
broadcastBlocks(blocks, weights)
245245
updateTxPool(importedBlocksData.map(_.block), Seq.empty)
246-
supervisor ! ProgressProtocol.ImportedBlock(block.number, block.hasCheckpoint, internally)
246+
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally)
247247
case BlockEnqueued => ()
248248
case DuplicateBlock => ()
249249
case UnknownParent => () // This is normal when receiving broadcast blocks
@@ -252,7 +252,7 @@ class BlockImporter(
252252
broadcastBlocks(newBranch, weights)
253253
newBranch.lastOption match {
254254
case Some(newBlock) =>
255-
supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, block.hasCheckpoint, internally)
255+
supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, internally)
256256
case None => ()
257257
}
258258
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.iohk.ethereum.blockchain.sync.regular
2+
3+
import akka.actor.typed.{ActorRef, Behavior}
4+
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
5+
import akka.actor.{ActorRef => ClassicActorRef}
6+
import akka.util.ByteString
7+
import io.iohk.ethereum.blockchain.sync.PeersClient.{BestPeer, Request}
8+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.FetchCommand
9+
import io.iohk.ethereum.blockchain.sync.regular.BodiesFetcher.BodiesFetcherCommand
10+
import io.iohk.ethereum.network.Peer
11+
import io.iohk.ethereum.network.p2p.Message
12+
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBodies, GetBlockBodies}
13+
import io.iohk.ethereum.utils.Config.SyncConfig
14+
import monix.execution.Scheduler
15+
16+
import scala.util.{Failure, Success}
17+
18+
class BodiesFetcher(
19+
val peersClient: ClassicActorRef,
20+
val syncConfig: SyncConfig,
21+
val supervisor: ActorRef[FetchCommand],
22+
context: ActorContext[BodiesFetcher.BodiesFetcherCommand]
23+
) extends AbstractBehavior[BodiesFetcher.BodiesFetcherCommand](context)
24+
with FetchRequest[BodiesFetcherCommand] {
25+
26+
val log = context.log
27+
implicit val ec: Scheduler = Scheduler(context.executionContext)
28+
29+
import BodiesFetcher._
30+
31+
override def makeAdaptedMessage[T <: Message](peer: Peer, msg: T): BodiesFetcherCommand = AdaptedMessage(peer, msg)
32+
33+
override def onMessage(message: BodiesFetcherCommand): Behavior[BodiesFetcherCommand] = {
34+
message match {
35+
case FetchBodies(hashes) =>
36+
log.debug("Start fetching bodies")
37+
requestBodies(hashes)
38+
Behaviors.same
39+
case AdaptedMessage(peer, BlockBodies(bodies)) =>
40+
log.debug(s"Received ${bodies.size} block bodies")
41+
supervisor ! BlockFetcher.ReceivedBodies(peer, bodies)
42+
Behaviors.same
43+
case BodiesFetcher.RetryBodiesRequest =>
44+
supervisor ! BlockFetcher.RetryBodiesRequest
45+
Behaviors.same
46+
case _ => Behaviors.unhandled
47+
}
48+
}
49+
50+
private def requestBodies(hashes: Seq[ByteString]): Unit = {
51+
val resp = makeRequest(Request.create(GetBlockBodies(hashes), BestPeer), BodiesFetcher.RetryBodiesRequest)
52+
context.pipeToSelf(resp.runToFuture) {
53+
case Success(res) => res
54+
case Failure(_) => BodiesFetcher.RetryBodiesRequest
55+
}
56+
}
57+
}
58+
59+
object BodiesFetcher {
60+
61+
def apply(
62+
peersClient: ClassicActorRef,
63+
syncConfig: SyncConfig,
64+
supervisor: ActorRef[FetchCommand]
65+
): Behavior[BodiesFetcherCommand] =
66+
Behaviors.setup(context => new BodiesFetcher(peersClient, syncConfig, supervisor, context))
67+
68+
sealed trait BodiesFetcherCommand
69+
final case class FetchBodies(hashes: Seq[ByteString]) extends BodiesFetcherCommand
70+
final case object RetryBodiesRequest extends BodiesFetcherCommand
71+
private final case class AdaptedMessage[T <: Message](peer: Peer, msg: T) extends BodiesFetcherCommand
72+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.iohk.ethereum.blockchain.sync.regular
2+
3+
import akka.actor.ActorRef
4+
import io.iohk.ethereum.blockchain.sync.PeersClient
5+
import io.iohk.ethereum.blockchain.sync.PeersClient.{BlacklistPeer, NoSuitablePeer, Request, RequestFailed}
6+
import io.iohk.ethereum.network.Peer
7+
import io.iohk.ethereum.network.p2p.Message
8+
import io.iohk.ethereum.utils.Config.SyncConfig
9+
import monix.eval.Task
10+
import org.slf4j.Logger
11+
import akka.pattern.ask
12+
import akka.util.Timeout
13+
import io.iohk.ethereum.utils.FunctorOps._
14+
15+
import scala.concurrent.duration._
16+
import scala.util.Failure
17+
18+
trait FetchRequest[A] {
19+
val peersClient: ActorRef
20+
val syncConfig: SyncConfig
21+
val log: Logger
22+
23+
def makeAdaptedMessage[T <: Message](peer: Peer, msg: T): A
24+
25+
implicit val timeout: Timeout = syncConfig.peerResponseTimeout + 2.second // some margin for actor communication
26+
27+
def makeRequest(request: Request[_], responseFallback: A): Task[A] =
28+
Task
29+
.deferFuture(peersClient ? request)
30+
.tap(blacklistPeerOnFailedRequest)
31+
.flatMap(handleRequestResult(responseFallback))
32+
.onErrorHandle { error =>
33+
log.error("Unexpected error while doing a request", error)
34+
responseFallback
35+
}
36+
37+
def blacklistPeerOnFailedRequest(msg: Any): Unit = msg match {
38+
case RequestFailed(peer, reason) => peersClient ! BlacklistPeer(peer.id, reason)
39+
case _ => ()
40+
}
41+
42+
def handleRequestResult(fallback: A)(msg: Any): Task[A] = {
43+
msg match {
44+
case failed: RequestFailed =>
45+
log.debug("Request failed due to {}", failed)
46+
Task.now(fallback)
47+
case NoSuitablePeer =>
48+
Task.now(fallback).delayExecution(syncConfig.syncRetryInterval)
49+
case Failure(cause) =>
50+
log.error("Unexpected error on the request result", cause)
51+
Task.now(fallback)
52+
case PeersClient.Response(peer, msg) =>
53+
Task.now(makeAdaptedMessage(peer, msg))
54+
}
55+
}
56+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.iohk.ethereum.blockchain.sync.regular
2+
import akka.actor.typed.{ActorRef, Behavior}
3+
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
4+
import akka.actor.{ActorRef => ClassicActorRef}
5+
import io.iohk.ethereum.blockchain.sync.PeersClient.{BestPeer, Request}
6+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.FetchCommand
7+
import io.iohk.ethereum.blockchain.sync.regular.HeadersFetcher.HeadersFetcherCommand
8+
import io.iohk.ethereum.network.Peer
9+
import io.iohk.ethereum.network.p2p.Message
10+
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockHeaders, GetBlockHeaders}
11+
import io.iohk.ethereum.utils.Config.SyncConfig
12+
import monix.eval.Task
13+
import monix.execution.Scheduler
14+
import org.slf4j.Logger
15+
16+
import scala.util.{Failure, Success}
17+
18+
class HeadersFetcher(
19+
val peersClient: ClassicActorRef,
20+
val syncConfig: SyncConfig,
21+
val supervisor: ActorRef[FetchCommand],
22+
context: ActorContext[HeadersFetcher.HeadersFetcherCommand]
23+
) extends AbstractBehavior[HeadersFetcher.HeadersFetcherCommand](context)
24+
with FetchRequest[HeadersFetcherCommand] {
25+
26+
override val log: Logger = context.log
27+
implicit val ec: Scheduler = Scheduler(context.executionContext)
28+
29+
import HeadersFetcher._
30+
31+
override def makeAdaptedMessage[T <: Message](peer: Peer, msg: T): HeadersFetcherCommand = AdaptedMessage(peer, msg)
32+
33+
override def onMessage(message: HeadersFetcherCommand): Behavior[HeadersFetcherCommand] =
34+
message match {
35+
case FetchHeaders(blockNumber: BigInt, amount: BigInt) =>
36+
log.debug("Start fetching headers from block {}", blockNumber)
37+
requestHeaders(blockNumber, amount)
38+
Behaviors.same
39+
case AdaptedMessage(_, BlockHeaders(headers)) =>
40+
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
41+
supervisor ! BlockFetcher.ReceivedHeaders(headers)
42+
Behaviors.same
43+
case HeadersFetcher.RetryHeadersRequest =>
44+
supervisor ! BlockFetcher.RetryHeadersRequest
45+
Behaviors.same
46+
case _ => Behaviors.unhandled
47+
}
48+
49+
private def requestHeaders(blockNr: BigInt, amount: BigInt): Unit = {
50+
log.debug("Fetching headers from block {}", blockNr)
51+
val msg = GetBlockHeaders(Left(blockNr), amount, skip = 0, reverse = false)
52+
53+
val resp = makeRequest(Request.create(msg, BestPeer), HeadersFetcher.RetryHeadersRequest)
54+
.flatMap {
55+
case AdaptedMessage(_, BlockHeaders(headers)) if headers.isEmpty =>
56+
log.debug("Empty BlockHeaders response. Retry in {}", syncConfig.syncRetryInterval)
57+
Task.now(HeadersFetcher.RetryHeadersRequest).delayResult(syncConfig.syncRetryInterval)
58+
case res => Task.now(res)
59+
}
60+
61+
context.pipeToSelf(resp.runToFuture) {
62+
case Success(res) => res
63+
case Failure(_) => HeadersFetcher.RetryHeadersRequest
64+
}
65+
}
66+
}
67+
68+
object HeadersFetcher {
69+
70+
def apply(
71+
peersClient: ClassicActorRef,
72+
syncConfig: SyncConfig,
73+
supervisor: ActorRef[FetchCommand]
74+
): Behavior[HeadersFetcherCommand] =
75+
Behaviors.setup(context => new HeadersFetcher(peersClient, syncConfig, supervisor, context))
76+
77+
sealed trait HeadersFetcherCommand
78+
final case class FetchHeaders(blockNumber: BigInt, amount: BigInt) extends HeadersFetcherCommand
79+
final case object RetryHeadersRequest extends HeadersFetcherCommand
80+
private final case class AdaptedMessage[T <: Message](peer: Peer, msg: T) extends HeadersFetcherCommand
81+
}

0 commit comments

Comments
 (0)