Skip to content

Commit 404ba7b

Browse files
authored
[ETCM-716] Readd block headers to work queue in case of errors (#943)
1 parent a744dd8 commit 404ba7b

File tree

1 file changed

+52
-37
lines changed

1 file changed

+52
-37
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ class FastSync(
131131
private var currentSkeletonState: Option[HeaderSkeleton] = None
132132
private var skeletonHandler: Option[ActorRef] = None
133133
private var batchFailuresCount = 0
134-
private var blockHeadersQueue: Seq[HeaderRange] = Nil
135134

136135
private var requestedBlockBodies: Map[ActorRef, Seq[ByteString]] = Map.empty
137136
private var requestedReceipts: Map[ActorRef, Seq[ByteString]] = Map.empty
@@ -260,9 +259,13 @@ class FastSync(
260259
updatedSkeleton.batchStartingHeaderNumbers.mkString(", ")
261260
)
262261
currentSkeletonState = Some(updatedSkeleton)
263-
blockHeadersQueue ++= updatedSkeleton.batchStartingHeaderNumbers.map(from =>
264-
HeaderRange(from, updatedSkeleton.batchSize)
265-
)
262+
263+
val blockHeadersToRequest =
264+
updatedSkeleton.batchStartingHeaderNumbers.map { from =>
265+
HeaderRange(from, updatedSkeleton.batchSize)
266+
}
267+
268+
syncState = syncState.enqueueHeaderRanges(blockHeadersToRequest)
266269
}
267270
}
268271
}
@@ -333,7 +336,7 @@ class FastSync(
333336
def handleMasterPeerFailure(header: BlockHeader): Unit = {
334337
batchFailuresCount += 1
335338
if (batchFailuresCount > fastSyncMaxBatchRetries) {
336-
log.info("Max skeleton batch failures reached. Master peer must be wrong.")
339+
log.info("Max number of allowed failures reached. Switching branch and master peer.")
337340
handleRewind(header, masterPeer.get, fastSyncBlockValidationN, blacklistDuration)
338341

339342
// Start branch resolution and wait for response from the FastSyncBranchResolver actor.
@@ -343,14 +346,14 @@ class FastSync(
343346
}
344347
}
345348

346-
blockHeadersQueue :+= request
349+
syncState = syncState.enqueueHeaderRange(request)
347350
error match {
348351
// These are the reasons that make the master peer suspicious
349352
case InvalidPenultimateHeader(_, header) => handleMasterPeerFailure(header)
350353
case InvalidBatchHash(_, header) => handleMasterPeerFailure(header)
351354
// Otherwise probably it's just this peer's fault
352355
case _ =>
353-
log.info(error.msg)
356+
log.warning(error.msg)
354357
blockHeadersError(peer, reason)
355358
}
356359
}
@@ -677,17 +680,16 @@ class FastSync(
677680
private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: BlacklistReason): Unit = {
678681
removeRequestHandler(handler)
679682

683+
requestedHeaders.get(peer).foreach(requested => syncState = syncState.enqueueHeaderRange(requested))
680684
syncState = syncState
681685
.enqueueBlockBodies(requestedBlockBodies.getOrElse(handler, Nil))
682686
.enqueueReceipts(requestedReceipts.getOrElse(handler, Nil))
683687

688+
requestedHeaders -= peer
684689
requestedBlockBodies = requestedBlockBodies - handler
685690
requestedReceipts = requestedReceipts - handler
686691

687-
requestedHeaders -= peer
688-
if (handshakedPeers.contains(peer.id)) {
689-
blacklist.add(peer.id, blacklistDuration, reason)
690-
}
692+
blacklistIfHandshaked(peer.id, blacklistDuration, reason)
691693
}
692694

693695
/**
@@ -872,7 +874,7 @@ class FastSync(
872874
requestReceipts(peer)
873875
} else if (syncState.blockBodiesQueue.nonEmpty) {
874876
requestBlockBodies(peer)
875-
} else if (blockHeadersQueue.nonEmpty) {
877+
} else if (syncState.blockHeadersQueue.nonEmpty) {
876878
requestBlockHeaders(peer)
877879
} else if (shouldRequestNewSkeleton(peerInfo)) {
878880
requestSkeletonHeaders(peer)
@@ -934,31 +936,36 @@ class FastSync(
934936
}
935937

936938
def requestBlockHeaders(peer: Peer): Unit = {
937-
val (request, remaining) = (blockHeadersQueue.head, blockHeadersQueue.tail)
939+
val (toRequest, remaining) = (syncState.blockHeadersQueue.headOption, syncState.blockHeadersQueue.tail)
940+
941+
toRequest match {
942+
case Some(request) =>
943+
log.debug(
944+
"Requesting [{}] block headers starting at block header [{}] from peer [{}]",
945+
request.limit,
946+
request.from,
947+
peer.id.value
948+
)
938949

939-
log.debug(
940-
"Requesting [{}] block headers starting at block header [{}] from peer [{}]",
941-
request.limit,
942-
request.from,
943-
peer.id.value
944-
)
950+
val handler = context.actorOf(
951+
PeerRequestHandler.props[GetBlockHeaders, BlockHeaders](
952+
peer,
953+
peerResponseTimeout,
954+
etcPeerManager,
955+
peerEventBus,
956+
requestMsg = GetBlockHeaders(Left(request.from), request.limit, skip = 0, reverse = false),
957+
responseMsgCode = Codes.BlockHeadersCode
958+
)
959+
)
945960

946-
val handler = context.actorOf(
947-
PeerRequestHandler.props[GetBlockHeaders, BlockHeaders](
948-
peer,
949-
peerResponseTimeout,
950-
etcPeerManager,
951-
peerEventBus,
952-
requestMsg = GetBlockHeaders(Left(request.from), request.limit, skip = 0, reverse = false),
953-
responseMsgCode = Codes.BlockHeadersCode
954-
)
955-
)
961+
context watch handler
962+
assignedHandlers += (handler -> peer)
963+
requestedHeaders += (peer -> request)
964+
syncState = syncState.copy(blockHeadersQueue = remaining)
965+
peerRequestsTime += (peer -> Instant.now())
966+
case None => log.warning("Tried to request more block headers but work queue was empty.")
967+
}
956968

957-
context watch handler
958-
assignedHandlers += (handler -> peer)
959-
requestedHeaders += (peer -> request)
960-
blockHeadersQueue = remaining
961-
peerRequestsTime += (peer -> Instant.now())
962969
}
963970

964971
def requestSkeletonHeaders(peer: Peer): Unit = {
@@ -1071,10 +1078,11 @@ object FastSync {
10711078
private case object PersistSyncState
10721079
private case object PrintStatus
10731080

1074-
case class SyncState(
1081+
final case class SyncState(
10751082
pivotBlock: BlockHeader,
10761083
lastFullBlockNumber: BigInt = 0,
10771084
safeDownloadTarget: BigInt = 0,
1085+
blockHeadersQueue: Seq[HeaderRange] = Nil,
10781086
blockBodiesQueue: Seq[ByteString] = Nil,
10791087
receiptsQueue: Seq[ByteString] = Nil,
10801088
downloadedNodesCount: Long = 0,
@@ -1086,13 +1094,20 @@ object FastSync {
10861094
stateSyncFinished: Boolean = false
10871095
) {
10881096

1097+
def enqueueHeaderRange(headerRange: HeaderRange): SyncState =
1098+
copy(blockHeadersQueue = blockHeadersQueue :+ headerRange)
1099+
1100+
def enqueueHeaderRanges(headerRanges: Seq[HeaderRange]): SyncState =
1101+
copy(blockHeadersQueue = blockHeadersQueue ++ headerRanges)
1102+
10891103
def enqueueBlockBodies(blockBodies: Seq[ByteString]): SyncState =
10901104
copy(blockBodiesQueue = blockBodiesQueue ++ blockBodies)
10911105

10921106
def enqueueReceipts(receipts: Seq[ByteString]): SyncState =
10931107
copy(receiptsQueue = receiptsQueue ++ receipts)
10941108

1095-
def blockChainWorkQueued: Boolean = blockBodiesQueue.nonEmpty || receiptsQueue.nonEmpty
1109+
def blockChainWorkQueued: Boolean =
1110+
blockHeadersQueue.nonEmpty || blockBodiesQueue.nonEmpty || receiptsQueue.nonEmpty
10961111

10971112
def updateNextBlockToValidate(header: BlockHeader, K: Int, X: Int): SyncState = copy(
10981113
nextBlockToFullyValidate =
@@ -1149,5 +1164,5 @@ object FastSync {
11491164
case object LastBlockValidationFailed extends PivotBlockUpdateReason
11501165
case object SyncRestart extends PivotBlockUpdateReason
11511166

1152-
private final case class HeaderRange(from: BigInt, limit: BigInt)
1167+
private[fast] final case class HeaderRange(from: BigInt, limit: BigInt)
11531168
}

0 commit comments

Comments
 (0)