@@ -149,18 +149,23 @@ trait FastSync {
149
149
150
150
private var assignedHandlers : Map [ActorRef , ActorRef ] = Map .empty
151
151
152
- private val syncStateStorageActor = context.actorOf(Props [FastSyncStateActor ], " state-storage" )
152
+ private val syncStateStorageActor = context.actorOf(Props [FastSyncStateActor ], " state-storage" )
153
+
154
+ private var requestedMptNodes : Map [ActorRef , Seq [HashType ]] = Map .empty
155
+ private var requestedNonMptNodes : Map [ActorRef , Seq [HashType ]] = Map .empty
156
+ private var requestedBlockBodies : Map [ActorRef , Seq [ByteString ]] = Map .empty
157
+ private var requestedReceipts : Map [ActorRef , Seq [ByteString ]] = Map .empty
153
158
154
159
syncStateStorageActor ! fastSyncStateStorage
155
160
156
161
private val syncStatePersistCancellable =
157
162
scheduler.schedule(persistStateSnapshotInterval, persistStateSnapshotInterval) {
158
163
syncStateStorageActor ! SyncState (
159
164
initialSyncState.targetBlock,
160
- mptNodesQueue,
161
- nonMptNodesQueue,
162
- blockBodiesQueue,
163
- receiptsQueue,
165
+ requestedMptNodes.values.flatten.toSeq.distinct ++ mptNodesQueue,
166
+ requestedNonMptNodes.values.flatten.toSeq.distinct ++ nonMptNodesQueue,
167
+ requestedBlockBodies.values.flatten.toSeq.distinct ++ blockBodiesQueue,
168
+ requestedReceipts.values.flatten.toSeq.distinct ++ receiptsQueue,
164
169
downloadedNodesCount,
165
170
bestBlockHeaderNumber)
166
171
}
@@ -195,11 +200,17 @@ trait FastSync {
195
200
case SyncRequestHandler .Done =>
196
201
context unwatch sender()
197
202
assignedHandlers -= sender()
203
+ cleanupRequestedMaps(sender())
198
204
processSyncing()
199
205
200
206
case Terminated (ref) if assignedHandlers.contains(ref) =>
201
207
context unwatch ref
202
208
assignedHandlers -= ref
209
+ mptNodesQueue ++= requestedMptNodes.getOrElse(ref, Nil )
210
+ nonMptNodesQueue ++= requestedNonMptNodes.getOrElse(ref, Nil )
211
+ blockBodiesQueue ++= requestedBlockBodies.getOrElse(ref, Nil )
212
+ receiptsQueue ++= requestedReceipts.getOrElse(ref, Nil )
213
+ cleanupRequestedMaps(ref)
203
214
204
215
case PrintStatus =>
205
216
val totalNodesCount = downloadedNodesCount + mptNodesQueue.size + nonMptNodesQueue.size
@@ -209,6 +220,13 @@ trait FastSync {
209
220
|State: $downloadedNodesCount/ $totalNodesCount known nodes. """ .stripMargin.replace(" \n " , " " ))
210
221
}
211
222
223
+ private def cleanupRequestedMaps (handler : ActorRef ): Unit = {
224
+ requestedMptNodes = requestedMptNodes - handler
225
+ requestedNonMptNodes = requestedNonMptNodes - handler
226
+ requestedBlockBodies = requestedBlockBodies - handler
227
+ requestedReceipts = requestedReceipts - handler
228
+ }
229
+
212
230
private def insertBlocks (requestedHashes : Seq [ByteString ], blockBodies : Seq [BlockBody ]): Unit = {
213
231
// todo this is moved from FastSyncBlockBodiesRequestHandler.scala we should add block validation here
214
232
// load header from chain by hash and check consistency with BlockValidator.validateHeaderAndBody
@@ -300,6 +318,7 @@ trait FastSync {
300
318
context watch handler
301
319
assignedHandlers += (handler -> peer)
302
320
receiptsQueue = remainingReceipts
321
+ requestedReceipts += handler -> receiptsToGet
303
322
}
304
323
305
324
def requestBlockBodies (peer : ActorRef ): Unit = {
@@ -308,6 +327,7 @@ trait FastSync {
308
327
context watch handler
309
328
assignedHandlers += (handler -> peer)
310
329
blockBodiesQueue = remainingBlockBodies
330
+ requestedBlockBodies += handler -> blockBodiesToGet
311
331
}
312
332
313
333
def requestBlockHeaders (peer : ActorRef ): Unit = {
@@ -331,6 +351,8 @@ trait FastSync {
331
351
assignedHandlers += (handler -> peer)
332
352
nonMptNodesQueue = remainingNonMptNodes
333
353
mptNodesQueue = remainingMptNodes
354
+ requestedMptNodes += handler -> mptNodesToGet
355
+ requestedNonMptNodes += handler -> nonMptNodesToGet
334
356
}
335
357
336
358
def unassignedPeers : Set [ActorRef ] = peersToDownloadFrom.keySet diff assignedHandlers.values.toSet
0 commit comments