1
1
package io .iohk .ethereum .blockchain .sync .regular
2
2
3
3
import akka .actor .Status .Failure
4
- import akka .actor .{Actor , ActorLogging , ActorRef , Props }
4
+ import akka .actor .{Actor , ActorLogging , ActorRef , ActorSystem , Props }
5
5
import akka .pattern .{ask , pipe }
6
+ import akka .stream .scaladsl .{Keep , Sink , Source , SourceQueue }
7
+ import akka .stream .{Attributes , DelayOverflowStrategy , OverflowStrategy }
6
8
import akka .util .{ByteString , Timeout }
7
9
import cats .data .NonEmptyList
8
10
import cats .instances .option ._
@@ -45,8 +47,24 @@ class BlockFetcher(
45
47
import BlockFetcher ._
46
48
47
49
implicit val ec : MonixScheduler = MonixScheduler (context.dispatcher)
50
+ implicit val sys : ActorSystem = context.system
48
51
implicit val timeout : Timeout = syncConfig.peerResponseTimeout + 2 .second // some margin for actor communication
49
52
53
+ private val queue : SourceQueue [BlockFetcherState ] = {
54
+ val cap = 1000
55
+ val numberOfElements = 1
56
+ Source
57
+ .queue[BlockFetcherState ](cap, OverflowStrategy .backpressure)
58
+ .delay(5 .seconds, DelayOverflowStrategy .dropHead)
59
+ .addAttributes(Attributes .inputBuffer(numberOfElements, numberOfElements))
60
+ .map { s =>
61
+ log.debug(" Resuming fetching with the latest state" )
62
+ fetchBlocks(s.withResumedFetching)
63
+ }
64
+ .toMat(Sink .ignore)(Keep .left)
65
+ .run()
66
+ }
67
+
50
68
override def receive : Receive = idle()
51
69
52
70
override def postStop (): Unit = {
@@ -103,7 +121,7 @@ class BlockFetcher(
103
121
}
104
122
105
123
private def handleHeadersMessages (state : BlockFetcherState ): Receive = {
106
- case Response (peer , BlockHeaders (headers)) if state.isFetchingHeaders =>
124
+ case Response (_ , BlockHeaders (headers)) if state.isFetchingHeaders =>
107
125
val newState =
108
126
if (state.fetchingHeadersState == AwaitingHeadersToBeIgnored ) {
109
127
log.debug(
@@ -153,7 +171,7 @@ class BlockFetcher(
153
171
state.withBodiesFetchReceived.handleRequestedBlocks(newBlocks, peer.id)
154
172
}
155
173
val waitingHeadersDequeued = state.waitingHeaders.size - newState.waitingHeaders.size
156
- log.debug(s " Processed ${ waitingHeadersDequeued} new blocks from received block bodies " )
174
+ log.debug(s " Processed $waitingHeadersDequeued new blocks from received block bodies " )
157
175
fetchBlocks(newState)
158
176
}
159
177
case RetryBodiesRequest if state.isFetchingBodies =>
@@ -267,20 +285,32 @@ class BlockFetcher(
267
285
}
268
286
269
287
private def handlePossibleTopUpdate (state : BlockFetcherState ): Receive = {
270
- // by handling these type of messages, fetcher can received from network, fresh info about blocks on top
288
+ // by handling these type of messages, fetcher can receive from network fresh info about blocks on top
271
289
// ex. After a successful handshake, fetcher will receive the info about the header of the peer best block
272
290
case MessageFromPeer (BlockHeaders (headers), _) =>
273
291
headers.lastOption.map { bh =>
274
292
log.debug(s " Candidate for new top at block ${bh.number}, current known top ${state.knownTop}" )
275
293
val newState = state.withPossibleNewTopAt(bh.number)
276
294
fetchBlocks(newState)
277
295
}
278
- // keep fetcher state updated in case new checkpoint block or mined block was imported
296
+ // keep fetcher state updated in case new mined block was imported
279
297
case InternalLastBlockImport (blockNr) =>
280
- log.debug(s " New last block $blockNr imported from the inside " )
298
+ log.debug(s " New mined block $blockNr imported from the inside " )
281
299
val newState = state.withLastBlock(blockNr).withPossibleNewTopAt(blockNr)
282
300
283
301
fetchBlocks(newState)
302
+
303
+ // keep fetcher state updated in case new checkpoint block was imported
304
+ case InternalCheckpointImport (blockNr) =>
305
+ log.debug(s " New checkpoint block $blockNr imported from the inside " )
306
+
307
+ val newState = state
308
+ .clearQueues()
309
+ .withLastBlock(blockNr)
310
+ .withPossibleNewTopAt(blockNr)
311
+ .withPausedFetching
312
+
313
+ fetchBlocks(newState)
284
314
}
285
315
286
316
private def handlePickedBlocks (
@@ -296,9 +326,14 @@ class BlockFetcher(
296
326
private def fetchBlocks (state : BlockFetcherState ): Unit = {
297
327
// Remember that tryFetchHeaders and tryFetchBodies can issue a request
298
328
// Nice and clean way to express that would be to use SyncIO from cats-effect
299
- val newState = state |> tryFetchHeaders |> tryFetchBodies
300
329
301
- context become started(newState)
330
+ if (state.pausedFetching) {
331
+ queue.offer(state)
332
+ context become started(state)
333
+ } else {
334
+ val newState = state |> tryFetchHeaders |> tryFetchBodies
335
+ context become started(newState)
336
+ }
302
337
}
303
338
304
339
private def tryFetchHeaders (fetcherState : BlockFetcherState ): BlockFetcherState =
@@ -329,7 +364,7 @@ class BlockFetcher(
329
364
.filter(! _.isFetchingBodies)
330
365
.filter(_.waitingHeaders.nonEmpty)
331
366
.tap(fetchBodies)
332
- .map(state => state .withNewBodiesFetch)
367
+ .map(_ .withNewBodiesFetch)
333
368
.getOrElse(fetcherState)
334
369
335
370
private def fetchBodies (state : BlockFetcherState ): Unit = {
@@ -403,13 +438,13 @@ object BlockFetcher {
403
438
Props (new BlockFetcher (peersClient, peerEventBus, supervisor, syncConfig, blockValidator))
404
439
405
440
sealed trait FetchMsg
406
- case class Start (importer : ActorRef , fromBlock : BigInt ) extends FetchMsg
407
- case class FetchStateNode (hash : ByteString ) extends FetchMsg
408
- case object RetryFetchStateNode extends FetchMsg
409
- case class PickBlocks (amount : Int ) extends FetchMsg
410
- case class StrictPickBlocks (from : BigInt , atLEastWith : BigInt ) extends FetchMsg
411
- case object PrintStatus extends FetchMsg
412
- case class InvalidateBlocksFrom (fromBlock : BigInt , reason : String , toBlacklist : Option [BigInt ]) extends FetchMsg
441
+ final case class Start (importer : ActorRef , fromBlock : BigInt ) extends FetchMsg
442
+ final case class FetchStateNode (hash : ByteString ) extends FetchMsg
443
+ final case object RetryFetchStateNode extends FetchMsg
444
+ final case class PickBlocks (amount : Int ) extends FetchMsg
445
+ final case class StrictPickBlocks (from : BigInt , atLEastWith : BigInt ) extends FetchMsg
446
+ final case object PrintStatus extends FetchMsg
447
+ final case class InvalidateBlocksFrom (fromBlock : BigInt , reason : String , toBlacklist : Option [BigInt ]) extends FetchMsg
413
448
414
449
object InvalidateBlocksFrom {
415
450
@@ -419,12 +454,13 @@ object BlockFetcher {
419
454
def apply (from : BigInt , reason : String , toBlacklist : Option [BigInt ]): InvalidateBlocksFrom =
420
455
new InvalidateBlocksFrom (from, reason, toBlacklist)
421
456
}
422
- case class BlockImportFailed (blockNr : BigInt , reason : String ) extends FetchMsg
423
- case class InternalLastBlockImport (blockNr : BigInt ) extends FetchMsg
424
- case object RetryBodiesRequest extends FetchMsg
425
- case object RetryHeadersRequest extends FetchMsg
457
+ final case class BlockImportFailed (blockNr : BigInt , reason : String ) extends FetchMsg
458
+ final case class InternalLastBlockImport (blockNr : BigInt ) extends FetchMsg
459
+ final case class InternalCheckpointImport (blockNr : BigInt ) extends FetchMsg
460
+ final case object RetryBodiesRequest extends FetchMsg
461
+ final case object RetryHeadersRequest extends FetchMsg
426
462
427
463
sealed trait FetchResponse
428
- case class PickedBlocks (blocks : NonEmptyList [Block ]) extends FetchResponse
429
- case class FetchedStateNode (stateNode : NodeData ) extends FetchResponse
464
+ final case class PickedBlocks (blocks : NonEmptyList [Block ]) extends FetchResponse
465
+ final case class FetchedStateNode (stateNode : NodeData ) extends FetchResponse
430
466
}
0 commit comments