@@ -140,6 +140,12 @@ class FastSync(
140
140
private var requestedBlockBodies : Map [ActorRef , Seq [ByteString ]] = Map .empty
141
141
private var requestedReceipts : Map [ActorRef , Seq [ByteString ]] = Map .empty
142
142
143
+ private var peerRequestHandlerCount = 0
144
+ private def countPeerRequestHandler : Int = {
145
+ peerRequestHandlerCount += 1
146
+ peerRequestHandlerCount
147
+ }
148
+
143
149
private val syncStateStorageActor = context.actorOf(Props [StateStorageActor ](), " state-storage" )
144
150
syncStateStorageActor ! fastSyncStateStorage
145
151
@@ -180,7 +186,7 @@ class FastSync(
180
186
syncState = syncState.copy(downloadedNodesCount = saved, totalNodesCount = saved + missing)
181
187
}
182
188
183
- def receive : Receive = handlePeerListMessages orElse handleStatus orElse {
189
+ def receive : Receive = handlePeerListMessages orElse handleStatus orElse handleRequestFailure orElse {
184
190
case UpdatePivotBlock (reason) => updatePivotBlock(reason)
185
191
case WaitingForNewTargetBlock =>
186
192
log.info(" State sync stopped until receiving new pivot block" )
@@ -192,10 +198,15 @@ class FastSync(
192
198
case StateSyncFinished =>
193
199
syncState = syncState.copy(stateSyncFinished = true )
194
200
processSyncing()
201
+ }
202
+
203
+ def handleRequestFailure : Receive = {
195
204
case PeerRequestHandler .RequestFailed (peer, reason) =>
196
205
handleRequestFailure(peer, sender(), RequestFailed (reason))
197
- case Terminated (ref) if assignedHandlers.contains(ref) =>
198
- handleRequestFailure(assignedHandlers(ref), ref, PeerActorTerminated )
206
+ case Terminated (ref) =>
207
+ assignedHandlers.get(ref).foreach {
208
+ handleRequestFailure(_, ref, PeerActorTerminated )
209
+ }
199
210
}
200
211
201
212
// TODO ETCM-701 will be moved to separate actor and refactored
@@ -342,12 +353,12 @@ class FastSync(
342
353
batchFailuresCount += 1
343
354
if (batchFailuresCount > fastSyncMaxBatchRetries) {
344
355
log.info(" Max number of allowed failures reached. Switching branch and master peer." )
356
+ currentSkeletonState = None
357
+ blockHeadersQueue.dequeueAll(_ => true )
345
358
handleRewind(header, masterPeer.get, fastSyncBlockValidationN, blacklistDuration)
346
359
347
360
// Start branch resolution and wait for response from the FastSyncBranchResolver actor.
348
361
context become waitingForBranchResolution
349
- currentSkeletonState = None
350
- blockHeadersQueue.dequeueAll(_ => true )
351
362
branchResolver ! FastSyncBranchResolverActor .StartBranchResolver
352
363
}
353
364
}
@@ -364,8 +375,11 @@ class FastSync(
364
375
}
365
376
}
366
377
367
- private def waitingForBranchResolution : Receive = handleStatus orElse {
378
+ private def waitingForBranchResolution : Receive = handleStatus orElse handleRequestFailure orElse {
368
379
case FastSyncBranchResolverActor .BranchResolvedSuccessful (firstCommonBlockNumber, newMasterPeer) =>
380
+ log.debug(
381
+ s " resolved branch with first common block number $firstCommonBlockNumber for new master peer $newMasterPeer"
382
+ )
369
383
// Reset the batch failures count
370
384
batchFailuresCount = 0
371
385
@@ -392,7 +406,8 @@ class FastSync(
392
406
log.info(" Asking for new pivot block" )
393
407
val pivotBlockSelector = {
394
408
context.actorOf(
395
- PivotBlockSelector .props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self, blacklist)
409
+ PivotBlockSelector .props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self, blacklist),
410
+ " pivot-block-selector-update"
396
411
)
397
412
}
398
413
pivotBlockSelector ! PivotBlockSelector .SelectPivotBlock
@@ -410,7 +425,7 @@ class FastSync(
410
425
}
411
426
412
427
def waitingForPivotBlockUpdate (updateReason : PivotBlockUpdateReason ): Receive =
413
- handlePeerListMessages orElse handleStatus orElse {
428
+ handlePeerListMessages orElse handleStatus orElse handleRequestFailure orElse {
414
429
case PivotBlockSelector .Result (pivotBlockHeader)
415
430
if newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
416
431
log.info(" New pivot block with number {} received" , pivotBlockHeader.number)
@@ -509,7 +524,9 @@ class FastSync(
509
524
}
510
525
511
526
private def removeRequestHandler (handler : ActorRef ): Unit = {
527
+ log.debug(s " removing request handler ${handler.path}" )
512
528
context unwatch handler
529
+ skeletonHandler = skeletonHandler.filter(_ != handler)
513
530
assignedHandlers -= handler
514
531
}
515
532
@@ -687,6 +704,9 @@ class FastSync(
687
704
}
688
705
689
706
private def handleRequestFailure (peer : Peer , handler : ActorRef , reason : BlacklistReason ): Unit = {
707
+ if (skeletonHandler == Some (handler))
708
+ currentSkeletonState = None
709
+
690
710
removeRequestHandler(handler)
691
711
692
712
requestedHeaders.get(peer).foreach(blockHeadersQueue.enqueue)
@@ -818,6 +838,16 @@ class FastSync(
818
838
819
839
def processSyncing (): Unit = {
820
840
FastSyncMetrics .measure(syncState)
841
+ log.debug(
842
+ " processSyncing: [{}]" ,
843
+ Map (
844
+ " fullySynced" -> fullySynced,
845
+ " blockchainDataToDownload" -> blockchainDataToDownload,
846
+ " noBlockchainWorkRemaining" -> noBlockchainWorkRemaining,
847
+ " stateSyncFinished" -> syncState.stateSyncFinished,
848
+ " notInTheMiddleOfUpdate" -> notInTheMiddleOfUpdate
849
+ )
850
+ )
821
851
if (fullySynced) {
822
852
finish()
823
853
} else {
@@ -889,8 +919,9 @@ class FastSync(
889
919
requestSkeletonHeaders(peer)
890
920
} else {
891
921
log.debug(
892
- " Nothing to request. Waiting for responses for [{}] sent requests." ,
893
- assignedHandlers.size + skeletonHandler.size
922
+ " Nothing to request. Waiting for responses from: {} and/or {}" ,
923
+ assignedHandlers.keys,
924
+ skeletonHandler
894
925
)
895
926
}
896
927
}
@@ -913,7 +944,8 @@ class FastSync(
913
944
peerEventBus,
914
945
requestMsg = GetReceipts (receiptsToGet),
915
946
responseMsgCode = Codes .ReceiptsCode
916
- )
947
+ ),
948
+ s " peer-request-handler-receipts- $countPeerRequestHandler"
917
949
)
918
950
919
951
context watch handler
@@ -936,7 +968,8 @@ class FastSync(
936
968
peerEventBus,
937
969
requestMsg = GetBlockBodies (blockBodiesToGet),
938
970
responseMsgCode = Codes .BlockBodiesCode
939
- )
971
+ ),
972
+ s " peer-request-handler-block-bodies- $countPeerRequestHandler"
940
973
)
941
974
942
975
context watch handler
@@ -964,7 +997,8 @@ class FastSync(
964
997
peerEventBus,
965
998
requestMsg = GetBlockHeaders (Left (toRequest.from), toRequest.limit, skip = 0 , reverse = false ),
966
999
responseMsgCode = Codes .BlockHeadersCode
967
- )
1000
+ ),
1001
+ s " peer-request-handler-block-headers- $countPeerRequestHandler"
968
1002
)
969
1003
970
1004
context watch handler
@@ -1028,7 +1062,8 @@ class FastSync(
1028
1062
peerEventBus,
1029
1063
requestMsg = msg,
1030
1064
responseMsgCode = Codes .BlockHeadersCode
1031
- )
1065
+ ),
1066
+ s " peer-request-handler-block-headers-skeleton- $countPeerRequestHandler"
1032
1067
)
1033
1068
1034
1069
context watch handler
0 commit comments