@@ -2,14 +2,15 @@ package io.iohk.ethereum.blockchain.sync
2
2
3
3
import akka .actor ._
4
4
import akka .util .ByteString
5
- import io .iohk .ethereum .domain .BlockHeader
5
+ import io .iohk .ethereum .domain .{ Block , BlockHeader }
6
6
import io .iohk .ethereum .network .EtcPeerManagerActor .PeerInfo
7
7
import io .iohk .ethereum .network .PeerEventBusActor .PeerEvent .MessageFromPeer
8
8
import io .iohk .ethereum .network .PeerEventBusActor .SubscriptionClassifier .MessageClassifier
9
9
import io .iohk .ethereum .network .PeerEventBusActor .{PeerSelector , Subscribe , Unsubscribe }
10
10
import io .iohk .ethereum .network .p2p .messages .PV62 .{BlockBody , BlockHeaders , GetBlockHeaders }
11
11
import io .iohk .ethereum .network .{EtcPeerManagerActor , Peer }
12
12
import io .iohk .ethereum .utils .Config .Sync ._
13
+ import io .iohk .ethereum .validators .BlockValidator
13
14
14
15
import scala .concurrent .ExecutionContext .Implicits .global
15
16
import scala .concurrent .duration .FiniteDuration
@@ -197,12 +198,7 @@ trait FastSync {
197
198
downloadedNodesCount += num
198
199
199
200
case BlockBodiesReceived (peer, requestedHashes, blockBodies) =>
200
- if (validateBlocks(requestedHashes, blockBodies)) {
201
- insertBlocks(requestedHashes, blockBodies)
202
- } else {
203
- blacklist(peer.id, blacklistDuration, s " responded with block bodies not matching block headers, blacklisting for $blacklistDuration" )
204
- self ! FastSync .EnqueueBlockBodies (requestedHashes)
205
- }
201
+ handleBlockBodies(peer, requestedHashes, blockBodies)
206
202
207
203
case BlockHeadersReceived (_, headers) =>
208
204
insertHeaders(headers)
@@ -226,6 +222,23 @@ trait FastSync {
226
222
persistSyncState()
227
223
}
228
224
225
+ private def handleBlockBodies (peer : Peer , requestedHashes : Seq [ByteString ], blockBodies : Seq [BlockBody ]) = {
226
+ validateBlocks(requestedHashes, blockBodies) match {
227
+ case Valid =>
228
+ insertBlocks(requestedHashes, blockBodies)
229
+ case Invalid =>
230
+ blacklist(peer.id, blacklistDuration, s " responded with block bodies not matching block headers, blacklisting for $blacklistDuration" )
231
+ self ! FastSync .EnqueueBlockBodies (requestedHashes)
232
+ case DbError =>
233
+ blockBodiesQueue = Seq .empty
234
+ receiptsQueue = Seq .empty
235
+ // todo adjust the formula to minimize redownloaded block headers
236
+ bestBlockHeaderNumber = bestBlockHeaderNumber - 2 * blockHeadersPerRequest
237
+ log.debug(" missing block header for known hash" )
238
+ self ! ProcessSyncing
239
+ }
240
+ }
241
+
229
242
private def handleActorTerminate (ref : ActorRef ) = {
230
243
context unwatch ref
231
244
assignedHandlers -= ref
@@ -276,14 +289,21 @@ trait FastSync {
276
289
requestedReceipts = requestedReceipts - handler
277
290
}
278
291
279
- private def validateBlocks (requestedHashes : Seq [ByteString ], blockBodies : Seq [BlockBody ]): Boolean = (requestedHashes zip blockBodies)
280
- .map { case (hash, body) => (blockchain.getBlockHeaderByHash(hash), body) }
281
- .forall {
282
- case (Some (header), body) =>
283
- val result = validators.blockValidator.validateHeaderAndBody(header, body)
284
- result.isRight
285
- case _ => false
286
- }
292
+ private def validateBlocks (requestedHashes : Seq [ByteString ], blockBodies : Seq [BlockBody ]): BlockBodyValidationResult = {
293
+ var result : BlockBodyValidationResult = Valid
294
+ (requestedHashes zip blockBodies)
295
+ .map { case (hash, body) => (blockchain.getBlockHeaderByHash(hash), body) }
296
+ .forall {
297
+ case (Some (header), body) =>
298
+ val validationResult : Either [BlockValidator .BlockError , Block ] = validators.blockValidator.validateHeaderAndBody(header, body)
299
+ result = validationResult.fold(_ => Invalid , _ => Valid )
300
+ validationResult.isRight
301
+ case _ =>
302
+ result = DbError
303
+ false
304
+ }
305
+ result
306
+ }
287
307
288
308
private def insertBlocks (requestedHashes : Seq [ByteString ], blockBodies : Seq [BlockBody ]): Unit = {
289
309
(requestedHashes zip blockBodies).foreach { case (hash, body) =>
@@ -473,6 +493,11 @@ object FastSync {
473
493
private case object PersistSyncState
474
494
case class MarkPeerBlockchainOnly (peer : Peer )
475
495
496
+ private sealed trait BlockBodyValidationResult
497
+ private case object Valid extends BlockBodyValidationResult
498
+ private case object Invalid extends BlockBodyValidationResult
499
+ private case object DbError extends BlockBodyValidationResult
500
+
476
501
case class SyncState (
477
502
targetBlock : BlockHeader ,
478
503
mptNodesQueue : Seq [HashType ] = Nil ,
0 commit comments