@@ -13,8 +13,9 @@ import io.iohk.ethereum.blockchain.sync.{BlockchainHostActor, FastSync, TestSync
13
13
import io .iohk .ethereum .db .components .{SharedEphemDataSources , Storages }
14
14
import io .iohk .ethereum .db .storage .AppStateStorage
15
15
import io .iohk .ethereum .db .storage .pruning .{ArchivePruning , PruningMode }
16
- import io .iohk .ethereum .domain .{Block , Blockchain , BlockchainImpl }
17
- import io .iohk .ethereum .mpt .MerklePatriciaTrie
16
+ import io .iohk .ethereum .domain .{Account , Address , Block , Blockchain , BlockchainImpl }
17
+ import io .iohk .ethereum .ledger .InMemoryWorldStateProxy
18
+ import io .iohk .ethereum .mpt .{HashNode , MerklePatriciaTrie , MptNode , MptTraversals }
18
19
import io .iohk .ethereum .network .EtcPeerManagerActor .PeerInfo
19
20
import io .iohk .ethereum .network .PeerManagerActor .{FastSyncHostConfiguration , PeerConfiguration }
20
21
import io .iohk .ethereum .network .discovery .Node
@@ -25,7 +26,7 @@ import io.iohk.ethereum.network.rlpx.AuthHandshaker
25
26
import io .iohk .ethereum .network .rlpx .RLPxConnectionHandler .RLPxConfiguration
26
27
import io .iohk .ethereum .network .{EtcPeerManagerActor , ForkResolver , KnownNodesManager , PeerEventBusActor , PeerManagerActor , ServerActor }
27
28
import io .iohk .ethereum .nodebuilder .{PruningConfigBuilder , SecureRandomBuilder }
28
- import io .iohk .ethereum .sync .FastSyncItSpec .{FakePeer , customTestCaseResourceM }
29
+ import io .iohk .ethereum .sync .FastSyncItSpec .{FakePeer , IdentityUpdate , customTestCaseResourceM , updateStateAtBlock }
29
30
import io .iohk .ethereum .utils .ServerStatus .Listening
30
31
import io .iohk .ethereum .utils .{Config , NodeStatus , ServerStatus , VmConfig }
31
32
import io .iohk .ethereum .vm .EvmConfig
@@ -36,16 +37,16 @@ import org.scalatest.{Assertion, AsyncFlatSpec, BeforeAndAfter, Matchers}
36
37
37
38
import scala .concurrent .Future
38
39
import scala .concurrent .duration ._
40
+ import scala .util .Try
39
41
40
42
class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter {
41
43
implicit val testScheduler = Scheduler .fixedPool(" test" , 16 )
42
44
43
45
" FastSync" should " should sync blockchain without state nodes" in customTestCaseResourceM(FakePeer .start3FakePeersRes()) {
44
46
case (peer1, peer2, peer3) =>
45
47
for {
46
- _ <- Task .parZip3(peer1.startPeer(), peer2.startPeer(), peer3.startPeer())
47
- _ <- peer2.saveNBlocks(1000 )
48
- _ <- peer3.saveNBlocks(1000 )
48
+ _ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000 )(IdentityUpdate )
49
+ _ <- peer3.importNBlocksToTheTopForm(peer3.getCurrentState(), 1000 )(IdentityUpdate )
49
50
_ <- peer1.connectToPeers(Set (peer2.node, peer3.node))
50
51
_ <- peer1.startFastSync()
51
52
_ <- peer1.waitForFastSyncFinish()
@@ -55,6 +56,21 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter {
55
56
}
56
57
}
57
58
59
+ it should " should sync blockchain with state nodes" in customTestCaseResourceM(FakePeer .start3FakePeersRes()) {
60
+ case (peer1, peer2, peer3) =>
61
+ for {
62
+ _ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000 )(updateStateAtBlock(500 ))
63
+ _ <- peer3.importNBlocksToTheTopForm(peer3.getCurrentState(), 1000 )(updateStateAtBlock(500 ))
64
+ _ <- peer1.connectToPeers(Set (peer2.node, peer3.node))
65
+ _ <- peer1.startFastSync()
66
+ _ <- peer1.waitForFastSyncFinish()
67
+ } yield {
68
+ val trie = peer1.getBestBlockTrie()
69
+ assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.syncConfig.targetBlockOffset)
70
+ assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.syncConfig.targetBlockOffset)
71
+ assert(trie.isDefined)
72
+ }
73
+ }
58
74
}
59
75
60
76
object FastSyncItSpec {
@@ -88,16 +104,31 @@ object FastSyncItSpec {
88
104
fixture.use(theTest).runToFuture
89
105
}
90
106
91
- def generateBlockChain (startBlock : Block , number : Int ): Seq [Block ] = {
92
- def recur (last : Block , blocksLeft : Int , blocksCreated : List [Block ]): List [Block ] = {
93
- if (blocksLeft <= 0 ) {
94
- blocksCreated.reverse
95
- } else {
96
- val newBlock = last.copy(header = last.header.copy(parentHash = last.header.hash, number = last.header.number + 1 ))
97
- recur(newBlock, blocksLeft - 1 , newBlock :: blocksCreated)
98
- }
107
+ final case class BlockchainState (bestBlock : Block , currentWorldState : InMemoryWorldStateProxy , currentTd : BigInt )
108
+
109
+ val IdentityUpdate : (BigInt , InMemoryWorldStateProxy ) => InMemoryWorldStateProxy = (_, world) => world
110
+
111
+ def updateWorldWithNRandomAcounts (n: Int , world : InMemoryWorldStateProxy ): InMemoryWorldStateProxy = {
112
+ val resultWorld = (0 until n).foldLeft(world) { (world, num) =>
113
+ val randomBalance = num
114
+ val randomAddress = Address (num)
115
+ val codeBytes = BigInt (num).toByteArray
116
+ val storage = world.getStorage(randomAddress)
117
+ val changedStorage = (num until num + 20 ).foldLeft(storage)((storage, value) => storage.store(value, value))
118
+ world
119
+ .saveAccount(randomAddress, Account .empty().copy(balance = randomBalance))
120
+ .saveCode(randomAddress, ByteString (codeBytes))
121
+ .saveStorage(randomAddress, changedStorage)
122
+ }
123
+ InMemoryWorldStateProxy .persistState(resultWorld)
124
+ }
125
+
126
+ def updateStateAtBlock (blockWithUpdate : BigInt ): (BigInt , InMemoryWorldStateProxy ) => InMemoryWorldStateProxy = { (blockNr : BigInt , world : InMemoryWorldStateProxy ) =>
127
+ if (blockNr == blockWithUpdate) {
128
+ updateWorldWithNRandomAcounts(1000 , world)
129
+ } else {
130
+ IdentityUpdate (blockNr, world)
99
131
}
100
- recur(startBlock, number, List .empty)
101
132
}
102
133
103
134
class FakePeer (peerName : String ) extends SecureRandomBuilder with TestSyncConfig {
@@ -227,6 +258,7 @@ object FastSyncItSpec {
227
258
receiptsPerRequest = 50 ,
228
259
fastSyncThrottle = 10 .milliseconds,
229
260
startRetryInterval = 50 .milliseconds,
261
+ nodesPerRequest = 200
230
262
)
231
263
232
264
lazy val fastSync = system.actorOf(FastSync .props(
@@ -240,16 +272,23 @@ object FastSyncItSpec {
240
272
system.scheduler
241
273
))
242
274
243
- def getMptForBlock (blockHeaderNumber : BigInt ) = {
275
+ private def getMptForBlock (block : Block ) = {
244
276
bl.getWorldStateProxy(
245
- blockNumber = blockHeaderNumber ,
277
+ blockNumber = block.number ,
246
278
accountStartNonce = blockchainConfig.accountStartNonce,
247
- stateRootHash = bl.getBlockByNumber(blockHeaderNumber).map(_ .header.stateRoot),
248
- noEmptyAccounts = EvmConfig .forBlock(blockHeaderNumber , blockchainConfig).noEmptyAccounts,
279
+ stateRootHash = Some (block .header.stateRoot),
280
+ noEmptyAccounts = EvmConfig .forBlock(block.number , blockchainConfig).noEmptyAccounts,
249
281
ethCompatibleStorage = blockchainConfig.ethCompatibleStorage
250
282
)
251
283
}
252
284
285
+ def getCurrentState (): BlockchainState = {
286
+ val bestBlock = bl.getBestBlock()
287
+ val currentWorldState = getMptForBlock(bestBlock)
288
+ val currentTd = bl.getTotalDifficultyByHash(bestBlock.hash).get
289
+ BlockchainState (bestBlock, currentWorldState, currentTd)
290
+ }
291
+
253
292
def startPeer (): Task [Unit ] = {
254
293
for {
255
294
_ <- Task {
@@ -279,18 +318,29 @@ object FastSyncItSpec {
279
318
} yield ()
280
319
}
281
320
282
- import akka .pattern .ask
283
- def getHandshakedPeers : Task [PeerManagerActor .Peers ] = {
284
- Task .deferFutureAction{s =>
285
- implicit val ec = s
286
- (peerManager ? PeerManagerActor .GetPeers ).mapTo[PeerManagerActor .Peers ]
287
- }
321
+ private def createChildBlock (parent : Block , parentTd : BigInt , parentWorld : InMemoryWorldStateProxy )
322
+ (updateWorldForBlock : (BigInt , InMemoryWorldStateProxy ) => InMemoryWorldStateProxy ): (Block , BigInt , InMemoryWorldStateProxy ) = {
323
+ val newBlockNumber = parent.header.number + 1
324
+ val newWorld = updateWorldForBlock(newBlockNumber, parentWorld)
325
+ val newBlock = parent.copy(header = parent.header.copy(parentHash = parent.header.hash, number = newBlockNumber, stateRoot = newWorld.stateRootHash))
326
+ val newTd = newBlock.header.difficulty + parentTd
327
+ (newBlock, newTd, parentWorld)
288
328
}
289
329
290
- def saveNBlocks (n : Int ) = Task {
291
- val lastBlock = bl.getBestBlock()
292
- val chain = generateBlockChain(lastBlock, n)
293
- chain.foreach(block => bl.save(block, Seq (), block.header.difficulty, true ))
330
+ def importNBlocksToTheTopForm (startState : BlockchainState , n : Int )
331
+ (updateWorldForBlock : (BigInt , InMemoryWorldStateProxy ) => InMemoryWorldStateProxy ): Task [Unit ] = {
332
+ def go (parent : Block , parentTd : BigInt , parentWorld : InMemoryWorldStateProxy , blocksLeft : Int ): Task [Unit ] = {
333
+ if (blocksLeft <= 0 ) {
334
+ Task .now(())
335
+ } else {
336
+ val (newBlock, newTd, newWorld) = createChildBlock(parent, parentTd, parentWorld)(updateWorldForBlock)
337
+ bl.save(newBlock, Seq (), newTd, saveAsBestBlock = true )
338
+ bl.persistCachedNodes()
339
+ go(newBlock, newTd, newWorld, blocksLeft - 1 )
340
+ }
341
+ }
342
+
343
+ go(startState.bestBlock, startState.currentTd, startState.currentWorldState, n)
294
344
}
295
345
296
346
def startFastSync (): Task [Unit ] = Task {
@@ -302,6 +352,15 @@ object FastSyncItSpec {
302
352
isDone
303
353
}
304
354
}
355
+
356
+ // Reads whole trie into memory, if the trie lacks nodes in storage it will be None
357
+ def getBestBlockTrie (): Option [MptNode ] = {
358
+ Try {
359
+ val bestBlock = bl.getBestBlock()
360
+ val bestStateRoot = bestBlock.header.stateRoot
361
+ MptTraversals .parseTrieIntoMemory(HashNode (bestStateRoot.toArray), storagesInstance.storages.stateStorage.getBackingStorage(bestBlock.number))
362
+ }.toOption
363
+ }
305
364
}
306
365
307
366
object FakePeer {
0 commit comments