Skip to content

Commit 56a9ea9

Browse files
KonradStanieckapke
authored andcommitted
[ETCM-275] Properly handle unrequested failures
1 parent 0554a62 commit 56a9ea9

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,16 @@ class SyncStateSchedulerActor(
148148
log.info(s"Waiting for target block to start the state sync")
149149
}
150150

151-
private def finalizeSync(state: SchedulerState, targetBlock: BigInt, syncInitiator: ActorRef): Unit = {
151+
private def finalizeSync(
152+
state: SchedulerState,
153+
currentStats: ProcessingStatistics,
154+
targetBlock: BigInt,
155+
syncInitiator: ActorRef
156+
): Unit = {
152157
if (state.memBatch.nonEmpty) {
153158
log.debug(s"Persisting ${state.memBatch.size} elements to blockchain and finalizing the state sync")
154-
sync.persistBatch(state, targetBlock)
159+
val finalState = sync.persistBatch(state, targetBlock)
160+
reportStats(syncInitiator, currentStats, finalState)
155161
syncInitiator ! StateSyncFinished
156162
context.become(idle(ProcessingStatistics()))
157163
} else {
@@ -191,6 +197,7 @@ class SyncStateSchedulerActor(
191197
}
192198
}
193199
case RequestFailed(from, reason) =>
200+
log.debug("Processing failed request from {}. Failure reason {}", from, reason)
194201
val newDownloaderState = currentDownloaderState.handleRequestFailure(from)
195202
ProcessingResult(Left(DownloaderError(newDownloaderState, from, reason, critical = true)))
196203
}
@@ -326,7 +333,7 @@ class SyncStateSchedulerActor(
326333
handleRestart(currentState, currentStats, targetBlock, restartRequested.get)
327334

328335
case Sync if currentState.numberOfPendingRequests == 0 =>
329-
finalizeSync(currentState, targetBlock, syncInitiator)
336+
finalizeSync(currentState, currentStats, targetBlock, syncInitiator)
330337

331338
case result: RequestResult =>
332339
if (processing) {
@@ -414,13 +421,15 @@ class SyncStateSchedulerActor(
414421
self ! Sync
415422

416423
case ProcessingResult(Left(err)) =>
424+
log.debug("Received error result")
417425
err match {
418426
case Critical(er) =>
419427
log.error(s"Critical error while state syncing ${er}, stopping state sync")
420428
// TODO we should probably start sync again from new target block, as current trie is malformed or declare
421429
// fast sync as failure and start normal sync from scratch
422430
context.stop(self)
423431
case DownloaderError(newDownloaderState, peer, description, critical) =>
432+
log.debug("Downloader error by peer {}", peer)
424433
if (critical && handshakedPeers.contains(peer)) {
425434
blacklist(peer.id, syncConfig.blacklistDuration, description)
426435
}
@@ -563,12 +572,16 @@ object SyncStateSchedulerActor {
563572
}
564573

565574
def handleRequestFailure(from: Peer): DownloaderState = {
566-
val requestedNodes = activeRequests(from.id)
567-
val newNodesToGet = requestedNodes.foldLeft(nodesToGet) { case (map, node) =>
568-
map + (node -> None)
569-
}
575+
activeRequests
576+
.get(from.id)
577+
.map { requestedNodes =>
578+
val newNodesToGet = requestedNodes.foldLeft(nodesToGet) { case (map, node) =>
579+
map + (node -> None)
580+
}
570581

571-
copy(activeRequests = activeRequests - from.id, nodesToGet = newNodesToGet)
582+
copy(activeRequests = activeRequests - from.id, nodesToGet = newNodesToGet)
583+
}
584+
.getOrElse(this)
572585
}
573586

574587
/**

src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncSpec.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import io.iohk.ethereum.network.p2p.messages.Versions
2525
import io.iohk.ethereum.network.{Peer, PeerId}
2626
import io.iohk.ethereum.utils.Config
2727
import io.iohk.ethereum.{Fixtures, ObjectGenerators, WithActorSystemShutDown}
28-
import monix.eval.Task
2928
import monix.execution.Scheduler
3029
import monix.reactive.Observable
3130
import org.scalactic.anyvals.PosInt
@@ -121,7 +120,7 @@ class StateSyncSpec
121120
stateStorage = storages.stateStorage
122121
) {
123122
override def mptStateSavedKeys(): Observable[Either[IterationError, ByteString]] = {
124-
Observable.repeatEvalF(Task(Right(ByteString(1)))).takeWhile(_ => !loadingFinished)
123+
Observable.repeat(Right(ByteString(1))).takeWhile(_ => !loadingFinished)
125124
}
126125
}
127126

0 commit comments

Comments
 (0)