@@ -2,14 +2,15 @@ package io.iohk.ethereum.blockchain.sync
2
2
3
3
import akka .actor ._
4
4
import akka .util .ByteString
5
- import io .iohk .ethereum .domain .BlockHeader
5
+ import io .iohk .ethereum .domain .{ Block , BlockHeader }
6
6
import io .iohk .ethereum .network .EtcPeerManagerActor .PeerInfo
7
7
import io .iohk .ethereum .network .PeerEventBusActor .PeerEvent .MessageFromPeer
8
8
import io .iohk .ethereum .network .PeerEventBusActor .SubscriptionClassifier .MessageClassifier
9
9
import io .iohk .ethereum .network .PeerEventBusActor .{PeerSelector , Subscribe , Unsubscribe }
10
10
import io .iohk .ethereum .network .p2p .messages .PV62 .{BlockBody , BlockHeaders , GetBlockHeaders }
11
11
import io .iohk .ethereum .network .{EtcPeerManagerActor , Peer }
12
12
import io .iohk .ethereum .utils .Config .Sync ._
13
+ import io .iohk .ethereum .validators .BlockValidator
13
14
14
15
import scala .concurrent .ExecutionContext .Implicits .global
15
16
import scala .concurrent .duration .FiniteDuration
@@ -197,11 +198,15 @@ trait FastSync {
197
198
downloadedNodesCount += num
198
199
199
200
case BlockBodiesReceived (peer, requestedHashes, blockBodies) =>
200
- if (validateBlocks(requestedHashes, blockBodies)) {
201
- insertBlocks(requestedHashes, blockBodies)
202
- } else {
203
- blacklist(peer.id, blacklistDuration, s " responded with block bodies not matching block headers, blacklisting for $blacklistDuration" )
204
- self ! FastSync .EnqueueBlockBodies (requestedHashes)
201
+ validateBlocks(requestedHashes, blockBodies) match {
202
+ case Valid =>
203
+ insertBlocks(requestedHashes, blockBodies)
204
+ case Invalid =>
205
+ blacklist(peer.id, blacklistDuration, s " responded with block bodies not matching block headers, blacklisting for $blacklistDuration" )
206
+ self ! FastSync .EnqueueBlockBodies (requestedHashes)
207
+ case DbError =>
208
+ log.debug(" missing block header for known hash" )
209
+ self ! ProcessSyncing
205
210
}
206
211
207
212
case BlockHeadersReceived (_, headers) =>
@@ -276,14 +281,24 @@ trait FastSync {
276
281
requestedReceipts = requestedReceipts - handler
277
282
}
278
283
279
- private def validateBlocks (requestedHashes : Seq [ByteString ], blockBodies : Seq [BlockBody ]): Boolean = (requestedHashes zip blockBodies)
280
- .map { case (hash, body) => (blockchain.getBlockHeaderByHash(hash), body) }
281
- .forall {
282
- case (Some (header), body) =>
283
- val result = validators.blockValidator.validateHeaderAndBody(header, body)
284
- result.isRight
285
- case _ => false
286
- }
284
+ private def validateBlocks (requestedHashes : Seq [ByteString ], blockBodies : Seq [BlockBody ]): ValidationResult = {
285
+ var result : ValidationResult = Valid
286
+ (requestedHashes zip blockBodies)
287
+ .map { case (hash, body) => (blockchain.getBlockHeaderByHash(hash), body) }
288
+ .forall {
289
+ case (Some (header), body) =>
290
+ val validationResult : Either [BlockValidator .BlockError , Block ] = validators.blockValidator.validateHeaderAndBody(header, body)
291
+ result = validationResult.fold(_ => Invalid , _ => Valid )
292
+ validationResult.isRight
293
+ case _ =>
294
+ blockBodiesQueue = Seq .empty
295
+ receiptsQueue = Seq .empty
296
+ bestBlockHeaderNumber = bestBlockHeaderNumber - 2 * blockHeadersPerRequest
297
+ result = DbError
298
+ false
299
+ }
300
+ result
301
+ }
287
302
288
303
private def insertBlocks (requestedHashes : Seq [ByteString ], blockBodies : Seq [BlockBody ]): Unit = {
289
304
(requestedHashes zip blockBodies).foreach { case (hash, body) =>
@@ -473,6 +488,11 @@ object FastSync {
473
488
private case object PersistSyncState
474
489
case class MarkPeerBlockchainOnly (peer : Peer )
475
490
491
+ private sealed trait ValidationResult
492
+ private case object Valid extends ValidationResult
493
+ private case object Invalid extends ValidationResult
494
+ private case object DbError extends ValidationResult
495
+
476
496
case class SyncState (
477
497
targetBlock : BlockHeader ,
478
498
mptNodesQueue : Seq [HashType ] = Nil ,
0 commit comments