@@ -10,10 +10,13 @@ import io.iohk.ethereum.db.storage.TransactionMappingStorage.TransactionLocation
10
10
import io .iohk .ethereum .db .storage ._
11
11
import io .iohk .ethereum .db .storage .pruning .PruningMode
12
12
import io .iohk .ethereum .domain
13
+ import io .iohk .ethereum .domain .BlockchainImpl .BestBlockLatestCheckpointNumbers
13
14
import io .iohk .ethereum .ledger .{InMemoryWorldStateProxy , InMemoryWorldStateProxyStorage }
14
15
import io .iohk .ethereum .mpt .{MerklePatriciaTrie , MptNode }
15
16
import io .iohk .ethereum .vm .{Storage , WorldStateProxy }
16
17
18
+ import scala .annotation .tailrec
19
+
17
20
/**
18
21
* Entity to be used to persist and query Blockchain related objects (blocks, transactions, ommers)
19
22
*/
@@ -123,6 +126,7 @@ trait Blockchain {
123
126
124
127
def getBestBlock (): Block
125
128
129
+ def getLatestCheckpointBlockNumber (): BigInt
126
130
127
131
/**
128
132
* Persists full block along with receipts and total difficulty
@@ -158,7 +162,7 @@ trait Blockchain {
158
162
159
163
def storeTotalDifficulty (blockhash : ByteString , totalDifficulty : BigInt ): DataSourceBatchUpdate
160
164
161
- def saveBestKnownBlock ( number : BigInt ): Unit
165
+ def saveBestKnownBlocks ( bestBlockNumber : BigInt , latestCheckpointNumber : Option [ BigInt ] = None ): Unit
162
166
163
167
def saveNode (nodeHash : NodeHash , nodeEncoded : NodeEncoded , blockNumber : BigInt ): Unit
164
168
@@ -209,7 +213,8 @@ class BlockchainImpl(
209
213
210
214
// There is always only one writer thread (ensured by actor), but can by many readers (api calls)
211
215
// to ensure visibility of writes, needs to be volatile or atomic ref
212
- private val bestKnownBlock : AtomicReference [BigInt ] = new AtomicReference (BigInt (0 ))
216
+ private val bestKnownBlockAndLatestCheckpoint : AtomicReference [BestBlockLatestCheckpointNumbers ] =
217
+ new AtomicReference (BestBlockLatestCheckpointNumbers (BigInt (0 ), BigInt (0 )))
213
218
214
219
override def getBlockHeaderByHash (hash : ByteString ): Option [BlockHeader ] =
215
220
blockHeadersStorage.get(hash)
@@ -225,12 +230,22 @@ class BlockchainImpl(
225
230
226
231
override def getBestBlockNumber (): BigInt = {
227
232
val bestBlockNum = appStateStorage.getBestBlockNumber()
228
- if (bestKnownBlock .get() > bestBlockNum)
229
- bestKnownBlock .get()
233
+ if (bestKnownBlockAndLatestCheckpoint .get().bestBlockNumber > bestBlockNum)
234
+ bestKnownBlockAndLatestCheckpoint .get().bestBlockNumber
230
235
else
231
236
bestBlockNum
232
237
}
233
238
239
+ override def getLatestCheckpointBlockNumber (): BigInt = {
240
+ val latestCheckpointNumberInStorage = appStateStorage.getLatestCheckpointBlockNumber()
241
+ // The latest checkpoint number is firstly saved in memory and then persisted to the storage only when it's time to persist cache.
242
+ // The latest checkpoint number in memory can be bigger than the number in storage because the cache wasn't persisted yet
243
+ if (bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber > latestCheckpointNumberInStorage)
244
+ bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber
245
+ else
246
+ latestCheckpointNumberInStorage
247
+ }
248
+
234
249
override def getBestBlock (): Block =
235
250
getBlockByNumber(getBestBlockNumber()).get
236
251
@@ -252,8 +267,10 @@ class BlockchainImpl(
252
267
ByteString (mpt.get(position).getOrElse(BigInt (0 )).toByteArray)
253
268
}
254
269
255
- def saveBestBlock (bestBlock : Option [BigInt ]): Unit = {
256
- bestBlock.fold(appStateStorage.putBestBlockNumber(getBestBlockNumber()).commit())(best => appStateStorage.putBestBlockNumber(best).commit())
270
+ private def persistBestBlocksData (): Unit = {
271
+ appStateStorage.putBestBlockNumber(getBestBlockNumber())
272
+ .and(appStateStorage.putLatestCheckpointBlockNumber(getLatestCheckpointBlockNumber()))
273
+ .commit()
257
274
}
258
275
259
276
def save (block : Block , receipts : Seq [Receipt ], totalDifficulty : BigInt , saveAsBestBlock : Boolean ): Unit = {
@@ -263,8 +280,12 @@ class BlockchainImpl(
263
280
.commit()
264
281
265
282
// not transactional part
266
- stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(saveBestBlock)
267
- if (saveAsBestBlock) {
283
+ // the best blocks data will be persisted only when the cache will be persisted
284
+ stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)
285
+
286
+ if (saveAsBestBlock && block.hasCheckpoint) {
287
+ saveBestKnownBlockAndLatestCheckpointNumber(block.header.number, block.header.number)
288
+ } else if (saveAsBestBlock) {
268
289
saveBestKnownBlock(block.header.number)
269
290
}
270
291
}
@@ -289,8 +310,21 @@ class BlockchainImpl(
289
310
override def storeEvmCode (hash : ByteString , evmCode : ByteString ): DataSourceBatchUpdate =
290
311
evmCodeStorage.put(hash, evmCode)
291
312
292
- override def saveBestKnownBlock (number : BigInt ): Unit = {
293
- bestKnownBlock.set(number)
313
+ override def saveBestKnownBlocks (bestBlockNumber : BigInt , latestCheckpointNumber : Option [BigInt ] = None ): Unit = {
314
+ latestCheckpointNumber match {
315
+ case Some (number) =>
316
+ saveBestKnownBlockAndLatestCheckpointNumber(bestBlockNumber, number)
317
+ case None =>
318
+ saveBestKnownBlock(bestBlockNumber)
319
+ }
320
+ }
321
+
322
+ private def saveBestKnownBlock (bestBlockNumber : BigInt ): Unit = {
323
+ bestKnownBlockAndLatestCheckpoint.updateAndGet(_.copy(bestBlockNumber = bestBlockNumber))
324
+ }
325
+
326
+ private def saveBestKnownBlockAndLatestCheckpointNumber (number : BigInt , latestCheckpointNumber : BigInt ): Unit = {
327
+ bestKnownBlockAndLatestCheckpoint.set(BestBlockLatestCheckpointNumbers (number, latestCheckpointNumber))
294
328
}
295
329
296
330
def storeTotalDifficulty (blockhash : ByteString , td : BigInt ): DataSourceBatchUpdate =
@@ -310,6 +344,7 @@ class BlockchainImpl(
310
344
blockNumberMappingStorage.remove(number)
311
345
}
312
346
347
+ // scalastyle:off method.length
313
348
override def removeBlock (blockHash : ByteString , withState : Boolean ): Unit = {
314
349
val maybeBlockHeader = getBlockHeaderByHash(blockHash)
315
350
val maybeTxList = getBlockBodyByHash(blockHash).map(_.transactionList)
@@ -323,20 +358,66 @@ class BlockchainImpl(
323
358
)
324
359
}
325
360
361
+ val (checkpointUpdates, prevCheckpointNumber): (DataSourceBatchUpdate , Option [BigInt ]) = maybeBlockHeader match {
362
+ case Some (header) =>
363
+ if (header.hasCheckpoint && header.number == getLatestCheckpointBlockNumber()) {
364
+ val prev = findPreviousCheckpointBlockNumber(header.number, header.number)
365
+ prev.map { num =>
366
+ (appStateStorage.putLatestCheckpointBlockNumber(num), Some (num))
367
+ }.getOrElse {
368
+ (appStateStorage.removeLatestCheckpointBlockNumber(), Some (0 ))
369
+ }
370
+ } else (appStateStorage.emptyBatchUpdate, None )
371
+ case None =>
372
+ (appStateStorage.emptyBatchUpdate, None )
373
+ }
374
+
375
+ val newBestBlockNumber : BigInt = if (bestSavedBlock >= 1 ) bestSavedBlock - 1 else 0
376
+
326
377
blockHeadersStorage.remove(blockHash)
327
378
.and(blockBodiesStorage.remove(blockHash))
328
379
.and(totalDifficultyStorage.remove(blockHash))
329
380
.and(receiptStorage.remove(blockHash))
330
381
.and(maybeTxList.fold(transactionMappingStorage.emptyBatchUpdate)(removeTxsLocations))
331
382
.and(blockNumberMappingUpdates)
383
+ .and(appStateStorage.putBestBlockNumber(newBestBlockNumber))
384
+ .and(checkpointUpdates)
332
385
.commit()
333
386
334
387
// not transactional part
388
+ saveBestKnownBlocks(newBestBlockNumber, prevCheckpointNumber)
389
+
335
390
maybeBlockHeader.foreach { h =>
336
- if (withState)
337
- stateStorage.onBlockRollback(h.number, bestSavedBlock)(saveBestBlock)
391
+ if (withState) {
392
+ // do nothing as we already saved best blocks data
393
+ stateStorage.onBlockRollback(h.number, bestSavedBlock)(() => ())
394
+ }
338
395
}
339
396
}
397
+ // scalastyle:on method.length
398
+
399
+ /**
400
+ * Recursive function which try to find the previous checkpoint by traversing blocks from top to the bottom.
401
+ * In case of finding the checkpoint block number, the function will finish the job and return result
402
+ */
403
+ @ tailrec
404
+ private def findPreviousCheckpointBlockNumber (
405
+ blockNumberToCheck : BigInt ,
406
+ latestCheckpointBlockNumber : BigInt
407
+ ): Option [BigInt ] = {
408
+ if (blockNumberToCheck > 0 ) {
409
+ val maybePreviousCheckpointBlockNumber = for {
410
+ currentBlock <- getBlockByNumber(blockNumberToCheck)
411
+ if currentBlock.hasCheckpoint &&
412
+ currentBlock.number < latestCheckpointBlockNumber
413
+ } yield currentBlock.number
414
+
415
+ maybePreviousCheckpointBlockNumber match {
416
+ case Some (_) => maybePreviousCheckpointBlockNumber
417
+ case None => findPreviousCheckpointBlockNumber(blockNumberToCheck - 1 , latestCheckpointBlockNumber)
418
+ }
419
+ } else None
420
+ }
340
421
341
422
private def saveTxsLocations (blockHash : ByteString , blockBody : BlockBody ): DataSourceBatchUpdate =
342
423
blockBody.transactionList.zipWithIndex.foldLeft(transactionMappingStorage.emptyBatchUpdate) {
@@ -386,8 +467,8 @@ class BlockchainImpl(
386
467
387
468
// FIXME EC-495 this method should not be need when best block is handled properly during rollback
388
469
def persistCachedNodes (): Unit = {
389
- if (stateStorage.forcePersist(RollBackFlush )){
390
- appStateStorage.putBestBlockNumber(getBestBlockNumber()).commit ()
470
+ if (stateStorage.forcePersist(RollBackFlush )) {
471
+ persistBestBlocksData ()
391
472
}
392
473
}
393
474
}
@@ -423,4 +504,6 @@ object BlockchainImpl {
423
504
appStateStorage = storages.appStateStorage,
424
505
stateStorage = storages.stateStorage
425
506
)
507
+
508
+ private case class BestBlockLatestCheckpointNumbers (bestBlockNumber : BigInt , latestCheckpointNumber : BigInt )
426
509
}
0 commit comments