@@ -10,10 +10,13 @@ import io.iohk.ethereum.db.storage.TransactionMappingStorage.TransactionLocation
10
10
import io .iohk .ethereum .db .storage ._
11
11
import io .iohk .ethereum .db .storage .pruning .PruningMode
12
12
import io .iohk .ethereum .domain
13
+ import io .iohk .ethereum .domain .BlockchainImpl .BestBlockLatestCheckpointNumbers
13
14
import io .iohk .ethereum .ledger .{InMemoryWorldStateProxy , InMemoryWorldStateProxyStorage }
14
15
import io .iohk .ethereum .mpt .{MerklePatriciaTrie , MptNode }
15
16
import io .iohk .ethereum .vm .{Storage , WorldStateProxy }
16
17
18
+ import scala .annotation .tailrec
19
+
17
20
/**
18
21
* Entity to be used to persist and query Blockchain related objects (blocks, transactions, ommers)
19
22
*/
@@ -123,6 +126,7 @@ trait Blockchain {
123
126
124
127
def getBestBlock (): Block
125
128
129
+ def getLatestCheckpointBlockNumber (): BigInt
126
130
127
131
/**
128
132
* Persists full block along with receipts and total difficulty
@@ -158,7 +162,7 @@ trait Blockchain {
158
162
159
163
def storeTotalDifficulty (blockhash : ByteString , totalDifficulty : BigInt ): DataSourceBatchUpdate
160
164
161
- def saveBestKnownBlock ( number : BigInt ): Unit
165
+ def saveBestKnownBlocks ( bestBlockNumber : BigInt , latestCheckpointNumber : Option [ BigInt ] = None ): Unit
162
166
163
167
def saveNode (nodeHash : NodeHash , nodeEncoded : NodeEncoded , blockNumber : BigInt ): Unit
164
168
@@ -209,7 +213,8 @@ class BlockchainImpl(
209
213
210
214
// There is always only one writer thread (ensured by actor), but can by many readers (api calls)
211
215
// to ensure visibility of writes, needs to be volatile or atomic ref
212
- private val bestKnownBlock : AtomicReference [BigInt ] = new AtomicReference (BigInt (0 ))
216
+ private val bestKnownBlockAndLatestCheckpoint : AtomicReference [BestBlockLatestCheckpointNumbers ] =
217
+ new AtomicReference (BestBlockLatestCheckpointNumbers (BigInt (0 ), BigInt (0 )))
213
218
214
219
override def getBlockHeaderByHash (hash : ByteString ): Option [BlockHeader ] =
215
220
blockHeadersStorage.get(hash)
@@ -225,12 +230,22 @@ class BlockchainImpl(
225
230
226
231
override def getBestBlockNumber (): BigInt = {
227
232
val bestBlockNum = appStateStorage.getBestBlockNumber()
228
- if (bestKnownBlock .get() > bestBlockNum)
229
- bestKnownBlock .get()
233
+ if (bestKnownBlockAndLatestCheckpoint .get().bestBlockNumber > bestBlockNum)
234
+ bestKnownBlockAndLatestCheckpoint .get().bestBlockNumber
230
235
else
231
236
bestBlockNum
232
237
}
233
238
239
+ override def getLatestCheckpointBlockNumber (): BigInt = {
240
+ val latestCheckpointNumberInStorage = appStateStorage.getLatestCheckpointBlockNumber()
241
+ // The latest checkpoint number is firstly saved in memory and then persisted to the storage only when it's time to persist cache.
242
+ // The latest checkpoint number in memory can be bigger than the number in storage because the cache wasn't persisted yet
243
+ if (bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber > latestCheckpointNumberInStorage)
244
+ bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber
245
+ else
246
+ latestCheckpointNumberInStorage
247
+ }
248
+
234
249
override def getBestBlock (): Block =
235
250
getBlockByNumber(getBestBlockNumber()).get
236
251
@@ -252,8 +267,10 @@ class BlockchainImpl(
252
267
ByteString (mpt.get(position).getOrElse(BigInt (0 )).toByteArray)
253
268
}
254
269
255
- def saveBestBlock (bestBlock : Option [BigInt ]): Unit = {
256
- bestBlock.fold(appStateStorage.putBestBlockNumber(getBestBlockNumber()).commit())(best => appStateStorage.putBestBlockNumber(best).commit())
270
+ private def persistBestBlocksData (): Unit = {
271
+ appStateStorage.putBestBlockNumber(getBestBlockNumber())
272
+ .and(appStateStorage.putLatestCheckpointBlockNumber(getLatestCheckpointBlockNumber()))
273
+ .commit()
257
274
}
258
275
259
276
def save (block : Block , receipts : Seq [Receipt ], totalDifficulty : BigInt , saveAsBestBlock : Boolean ): Unit = {
@@ -263,8 +280,12 @@ class BlockchainImpl(
263
280
.commit()
264
281
265
282
// not transactional part
266
- stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(saveBestBlock)
267
- if (saveAsBestBlock) {
283
+ // the best blocks data will be persisted only when the cache will be persisted
284
+ stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)
285
+
286
+ if (saveAsBestBlock && block.hasCheckpoint) {
287
+ saveBestKnownBlockAndLatestCheckpointNumber(block.header.number, block.header.number)
288
+ } else if (saveAsBestBlock) {
268
289
saveBestKnownBlock(block.header.number)
269
290
}
270
291
}
@@ -289,8 +310,21 @@ class BlockchainImpl(
289
310
override def storeEvmCode (hash : ByteString , evmCode : ByteString ): DataSourceBatchUpdate =
290
311
evmCodeStorage.put(hash, evmCode)
291
312
292
- override def saveBestKnownBlock (number : BigInt ): Unit = {
293
- bestKnownBlock.set(number)
313
+ override def saveBestKnownBlocks (bestBlockNumber : BigInt , latestCheckpointNumber : Option [BigInt ] = None ): Unit = {
314
+ latestCheckpointNumber match {
315
+ case Some (number) =>
316
+ saveBestKnownBlockAndLatestCheckpointNumber(bestBlockNumber, number)
317
+ case None =>
318
+ saveBestKnownBlock(bestBlockNumber)
319
+ }
320
+ }
321
+
322
+ private def saveBestKnownBlock (bestBlockNumber : BigInt ): Unit = {
323
+ bestKnownBlockAndLatestCheckpoint.updateAndGet(_.copy(bestBlockNumber = bestBlockNumber))
324
+ }
325
+
326
+ private def saveBestKnownBlockAndLatestCheckpointNumber (number : BigInt , latestCheckpointNumber : BigInt ): Unit = {
327
+ bestKnownBlockAndLatestCheckpoint.set(BestBlockLatestCheckpointNumbers (number, latestCheckpointNumber))
294
328
}
295
329
296
330
def storeTotalDifficulty (blockhash : ByteString , td : BigInt ): DataSourceBatchUpdate =
@@ -310,10 +344,18 @@ class BlockchainImpl(
310
344
blockNumberMappingStorage.remove(number)
311
345
}
312
346
347
+ // scalastyle:off method.length
313
348
override def removeBlock (blockHash : ByteString , withState : Boolean ): Unit = {
314
349
val maybeBlockHeader = getBlockHeaderByHash(blockHash)
315
350
val maybeTxList = getBlockBodyByHash(blockHash).map(_.transactionList)
316
- val bestSavedBlock = getBestBlockNumber()
351
+ val bestBlocks = bestKnownBlockAndLatestCheckpoint.get()
352
+ // as we are decreasing block numbers in memory more often than in storage,
353
+ // we can't use here getBestBlockNumber / getLatestCheckpointBlockNumber
354
+ val bestBlockNumber = if (bestBlocks.bestBlockNumber != 0 ) bestBlocks.bestBlockNumber else appStateStorage.getBestBlockNumber()
355
+ val latestCheckpointNumber = {
356
+ if (bestBlocks.latestCheckpointNumber != 0 ) bestBlocks.latestCheckpointNumber
357
+ else appStateStorage.getLatestCheckpointBlockNumber()
358
+ }
317
359
318
360
val blockNumberMappingUpdates = {
319
361
maybeBlockHeader.fold(blockNumberMappingStorage.emptyBatchUpdate)( h =>
@@ -323,6 +365,22 @@ class BlockchainImpl(
323
365
)
324
366
}
325
367
368
+ val (checkpointUpdates, prevCheckpointNumber): (DataSourceBatchUpdate , Option [BigInt ]) = maybeBlockHeader match {
369
+ case Some (header) =>
370
+ if (header.hasCheckpoint && header.number == latestCheckpointNumber) {
371
+ val prev = findPreviousCheckpointBlockNumber(header.number, header.number)
372
+ prev.map { num =>
373
+ (appStateStorage.putLatestCheckpointBlockNumber(num), Some (num))
374
+ }.getOrElse {
375
+ (appStateStorage.removeLatestCheckpointBlockNumber(), Some (0 ))
376
+ }
377
+ } else (appStateStorage.emptyBatchUpdate, None )
378
+ case None =>
379
+ (appStateStorage.emptyBatchUpdate, None )
380
+ }
381
+
382
+ val newBestBlockNumber : BigInt = if (bestBlockNumber >= 1 ) bestBlockNumber - 1 else 0
383
+
326
384
blockHeadersStorage.remove(blockHash)
327
385
.and(blockBodiesStorage.remove(blockHash))
328
386
.and(totalDifficultyStorage.remove(blockHash))
@@ -332,11 +390,40 @@ class BlockchainImpl(
332
390
.commit()
333
391
334
392
// not transactional part
393
+ saveBestKnownBlocks(newBestBlockNumber, prevCheckpointNumber)
394
+
335
395
maybeBlockHeader.foreach { h =>
336
- if (withState)
337
- stateStorage.onBlockRollback(h.number, bestSavedBlock)(saveBestBlock)
396
+ if (withState) {
397
+ val bestBlocksUpdates = appStateStorage.putBestBlockNumber(newBestBlockNumber)
398
+ .and(checkpointUpdates)
399
+ stateStorage.onBlockRollback(h.number, bestBlockNumber)(() => bestBlocksUpdates.commit())
400
+ }
338
401
}
339
402
}
403
+ // scalastyle:on method.length
404
+
405
+ /**
406
+ * Recursive function which try to find the previous checkpoint by traversing blocks from top to the bottom.
407
+ * In case of finding the checkpoint block number, the function will finish the job and return result
408
+ */
409
+ @ tailrec
410
+ private def findPreviousCheckpointBlockNumber (
411
+ blockNumberToCheck : BigInt ,
412
+ latestCheckpointBlockNumber : BigInt
413
+ ): Option [BigInt ] = {
414
+ if (blockNumberToCheck > 0 ) {
415
+ val maybePreviousCheckpointBlockNumber = for {
416
+ currentBlock <- getBlockByNumber(blockNumberToCheck)
417
+ if currentBlock.hasCheckpoint &&
418
+ currentBlock.number < latestCheckpointBlockNumber
419
+ } yield currentBlock.number
420
+
421
+ maybePreviousCheckpointBlockNumber match {
422
+ case Some (_) => maybePreviousCheckpointBlockNumber
423
+ case None => findPreviousCheckpointBlockNumber(blockNumberToCheck - 1 , latestCheckpointBlockNumber)
424
+ }
425
+ } else None
426
+ }
340
427
341
428
private def saveTxsLocations (blockHash : ByteString , blockBody : BlockBody ): DataSourceBatchUpdate =
342
429
blockBody.transactionList.zipWithIndex.foldLeft(transactionMappingStorage.emptyBatchUpdate) {
@@ -386,8 +473,8 @@ class BlockchainImpl(
386
473
387
474
// FIXME EC-495 this method should not be need when best block is handled properly during rollback
388
475
def persistCachedNodes (): Unit = {
389
- if (stateStorage.forcePersist(RollBackFlush )){
390
- appStateStorage.putBestBlockNumber(getBestBlockNumber()).commit ()
476
+ if (stateStorage.forcePersist(RollBackFlush )) {
477
+ persistBestBlocksData ()
391
478
}
392
479
}
393
480
}
@@ -423,4 +510,6 @@ object BlockchainImpl {
423
510
appStateStorage = storages.appStateStorage,
424
511
stateStorage = storages.stateStorage
425
512
)
513
+
514
+ private case class BestBlockLatestCheckpointNumbers (bestBlockNumber : BigInt , latestCheckpointNumber : BigInt )
426
515
}
0 commit comments