Skip to content

Commit 72da7a3

Browse files
committed
[ETCM-102] Fix bugs found during integration testing
1 parent 1238df4 commit 72da7a3

21 files changed

+189
-136
lines changed

src/it/resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
</encoder>
2626
</appender>
2727

28-
<root level="DEBUG">
28+
<root level="INFO">
2929
<appender-ref ref="STDOUT" />
3030
<appender-ref ref="FILE" />
3131
</root>

src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.iohk.ethereum.sync
22

33
import java.net.{InetSocketAddress, ServerSocket}
4+
import java.nio.file.Files
45
import java.util.concurrent.TimeoutException
56
import java.util.concurrent.atomic.AtomicReference
67

@@ -9,9 +10,12 @@ import akka.testkit.TestProbe
910
import akka.util.{ByteString, Timeout}
1011
import cats.effect.Resource
1112
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
12-
import io.iohk.ethereum.blockchain.sync.{BlockchainHostActor, FastSync, TestSyncConfig}
13-
import io.iohk.ethereum.db.components.{SharedEphemDataSources, Storages}
14-
import io.iohk.ethereum.db.storage.AppStateStorage
13+
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor
14+
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
15+
import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig}
16+
import io.iohk.ethereum.db.components.{SharedRocksDbDataSources, Storages}
17+
import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource}
18+
import io.iohk.ethereum.db.storage.{AppStateStorage, Namespaces}
1519
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode}
1620
import io.iohk.ethereum.domain.{Account, Address, Block, Blockchain, BlockchainImpl}
1721
import io.iohk.ethereum.ledger.InMemoryWorldStateProxy
@@ -22,6 +26,7 @@ import io.iohk.ethereum.network.discovery.Node
2226
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo, DiscoveryNodeInfo}
2327
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
2428
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
29+
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
2530
import io.iohk.ethereum.network.rlpx.AuthHandshaker
2631
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
2732
import io.iohk.ethereum.network.{EtcPeerManagerActor, ForkResolver, KnownNodesManager, PeerEventBusActor, PeerManagerActor, ServerActor}
@@ -51,8 +56,8 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter {
5156
_ <- peer1.startFastSync()
5257
_ <- peer1.waitForFastSyncFinish()
5358
} yield {
54-
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.syncConfig.targetBlockOffset)
55-
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.syncConfig.targetBlockOffset)
59+
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
60+
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
5661
}
5762
}
5863

@@ -66,11 +71,25 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter {
6671
_ <- peer1.waitForFastSyncFinish()
6772
} yield {
6873
val trie = peer1.getBestBlockTrie()
69-
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.syncConfig.targetBlockOffset)
70-
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.syncConfig.targetBlockOffset)
74+
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
75+
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
7176
assert(trie.isDefined)
7277
}
7378
}
79+
80+
81+
it should "should update target block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
82+
case (peer1, peer2) =>
83+
for {
84+
_ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000)(IdentityUpdate)
85+
_ <- peer1.connectToPeers(Set(peer2.node))
86+
_ <- peer2.syncUntil(2000)(IdentityUpdate).startAndForget
87+
_ <- peer1.startFastSync()
88+
_ <- peer1.waitForFastSyncFinish()
89+
} yield {
90+
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
91+
}
92+
}
7493
}
7594

7695
object FastSyncItSpec {
@@ -108,7 +127,7 @@ object FastSyncItSpec {
108127

109128
val IdentityUpdate: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = (_, world) => world
110129

111-
def updateWorldWithNRandomAcounts(n:Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = {
130+
def updateWorldWithNRandomAccounts(n:Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = {
112131
val resultWorld = (0 until n).foldLeft(world) { (world, num) =>
113132
val randomBalance = num
114133
val randomAddress = Address(num)
@@ -125,7 +144,7 @@ object FastSyncItSpec {
125144

126145
def updateStateAtBlock(blockWithUpdate: BigInt): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = { (blockNr: BigInt, world: InMemoryWorldStateProxy) =>
127146
if (blockNr == blockWithUpdate) {
128-
updateWorldWithNRandomAcounts(1000, world)
147+
updateWorldWithNRandomAccounts(1000, world)
129148
} else {
130149
IdentityUpdate(blockNr, world)
131150
}
@@ -151,12 +170,30 @@ object FastSyncItSpec {
151170
discoveryStatus = ServerStatus.NotListening
152171
)
153172

173+
lazy val tempDir = Files.createTempDirectory("temp-fast-sync")
174+
175+
def getRockDbTestConfig(dbPath: String) = {
176+
new RocksDbConfig {
177+
override val createIfMissing: Boolean = true
178+
override val paranoidChecks: Boolean = false
179+
override val path: String = dbPath
180+
override val maxThreads: Int = 1
181+
override val maxOpenFiles: Int = 32
182+
override val verifyChecksums: Boolean = false
183+
override val levelCompaction: Boolean = true
184+
override val blockSize: Long = 16384
185+
override val blockCacheSize: Long = 33554432
186+
}
187+
}
188+
154189
sealed trait LocalPruningConfigBuilder extends PruningConfigBuilder {
155190
override lazy val pruningMode: PruningMode = ArchivePruning
156191
}
157192

158193
lazy val nodeStatusHolder = new AtomicReference(nodeStatus)
159-
lazy val storagesInstance = new SharedEphemDataSources with LocalPruningConfigBuilder with Storages.DefaultStorages
194+
lazy val storagesInstance = new SharedRocksDbDataSources with LocalPruningConfigBuilder with Storages.DefaultStorages {
195+
override lazy val dataSource: RocksDbDataSource = RocksDbDataSource(getRockDbTestConfig(tempDir.toAbsolutePath.toString), Namespaces.nsSeq)
196+
}
160197
lazy val blockchainConfig = Config.blockchains.blockchainConfig
161198
/**
162199
* Default persist interval is 20s, which is too long for tests. As in all tests we treat peer as connected when
@@ -252,15 +289,21 @@ object FastSyncItSpec {
252289

253290
val testSyncConfig = syncConfig.copy(
254291
minPeersToChooseTargetBlock = 1,
255-
peersScanInterval = 1.second,
292+
peersScanInterval = 5.milliseconds,
256293
blockHeadersPerRequest = 200,
257294
blockBodiesPerRequest = 50,
258295
receiptsPerRequest = 50,
259296
fastSyncThrottle = 10.milliseconds,
260297
startRetryInterval = 50.milliseconds,
261-
nodesPerRequest = 200
298+
nodesPerRequest = 200,
299+
maxTargetDifference = 1,
300+
syncRetryInterval = 50.milliseconds
262301
)
263302

303+
lazy val broadcaster = new BlockBroadcast(etcPeerManager, testSyncConfig)
304+
305+
lazy val broadcasterActor = system.actorOf(BlockBroadcasterActor.props(broadcaster, peerEventBus, etcPeerManager, testSyncConfig, system.scheduler))
306+
264307
lazy val fastSync = system.actorOf(FastSync.props(
265308
storagesInstance.storages.fastSyncStateStorage,
266309
storagesInstance.storages.appStateStorage,
@@ -282,6 +325,10 @@ object FastSyncItSpec {
282325
)
283326
}
284327

328+
private def broadcastBlock(block: Block, td: BigInt) = {
329+
broadcasterActor ! BroadcastBlock(NewBlock(block, td))
330+
}
331+
285332
def getCurrentState(): BlockchainState = {
286333
val bestBlock = bl.getBestBlock()
287334
val currentWorldState = getMptForBlock(bestBlock)
@@ -302,7 +349,10 @@ object FastSyncItSpec {
302349
}
303350

304351
def shutdown(): Task[Unit] = {
305-
Task.deferFuture(system.terminate()).map(_ => ())
352+
for {
353+
_ <- Task.deferFuture(system.terminate())
354+
_ <- Task(storagesInstance.dataSource.destroy())
355+
} yield ()
306356
}
307357

308358
def connectToPeers(nodes: Set[DiscoveryNodeInfo]): Task[Unit] = {
@@ -343,12 +393,29 @@ object FastSyncItSpec {
343393
go(startState.bestBlock, startState.currentTd, startState.currentWorldState, n)
344394
}
345395

396+
def syncUntil(n: BigInt)(updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = {
397+
Task(bl.getBestBlock()).flatMap { block =>
398+
if (block.number >= n) {
399+
Task(())
400+
} else {
401+
Task {
402+
val currentTd = bl.getTotalDifficultyByHash(block.hash).get
403+
val currentWolrd = getMptForBlock(block)
404+
val (newBlock, newTd, newWorld) = createChildBlock(block, currentTd, currentWolrd)(updateWorldForBlock)
405+
bl.save(newBlock, Seq(), newTd, saveAsBestBlock = true)
406+
bl.persistCachedNodes()
407+
broadcastBlock(newBlock, newTd)
408+
}.flatMap(_ => syncUntil(n)(updateWorldForBlock))
409+
}
410+
}
411+
}
412+
346413
def startFastSync(): Task[Unit] = Task {
347414
fastSync ! FastSync.Start
348415
}
349416

350417
def waitForFastSyncFinish(): Task[Boolean] = {
351-
retryUntilWithDelay(Task(storagesInstance.storages.appStateStorage.isFastSyncDone()), 1.second, 30){ isDone =>
418+
retryUntilWithDelay(Task(storagesInstance.storages.appStateStorage.isFastSyncDone()), 1.second, 90){ isDone =>
352419
isDone
353420
}
354421
}
@@ -357,7 +424,7 @@ object FastSyncItSpec {
357424
def getBestBlockTrie(): Option[MptNode] = {
358425
Try {
359426
val bestBlock = bl.getBestBlock()
360-
val bestStateRoot =bestBlock.header.stateRoot
427+
val bestStateRoot = bestBlock.header.stateRoot
361428
MptTraversals.parseTrieIntoMemory(HashNode(bestStateRoot.toArray), storagesInstance.storages.stateStorage.getBackingStorage(bestBlock.number))
362429
}.toOption
363430
}
@@ -391,4 +458,4 @@ object FastSyncItSpec {
391458
} { case (peer, peer1, peer2) => Task.parMap3(peer.shutdown(), peer1.shutdown(), peer2.shutdown())((_ ,_, _)=> ())}
392459
}
393460
}
394-
}
461+
}

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSyncTargetBlockSelector.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ class FastSyncTargetBlockSelector(
3434
val peersUsedToChooseTarget = peersToDownloadFrom.filter(_._2.forkAccepted)
3535

3636
if (peersUsedToChooseTarget.size >= minPeersToChooseTargetBlock) {
37-
peersUsedToChooseTarget.foreach { case (peer, PeerInfo(status, _, _, _)) =>
37+
peersUsedToChooseTarget.foreach { case (peer, PeerInfo(_, _, _, _, bestBlockHash)) =>
3838
peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id)))
39-
etcPeerManager ! EtcPeerManagerActor.SendMessage(GetBlockHeaders(Right(status.bestHash), 1, 0, reverse = false), peer.id)
39+
etcPeerManager ! EtcPeerManagerActor.SendMessage(GetBlockHeaders(Right(bestBlockHash), 1, 0, reverse = false), peer.id)
4040
}
4141
log.debug("Asking {} peers for block headers", peersUsedToChooseTarget.size)
4242
val timeout = scheduler.scheduleOnce(peerResponseTimeout, self, BlockHeadersTimeout)

src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ object PeersClient {
134134
def bestPeer(peersToDownloadFrom: Map[Peer, PeerInfo]): Option[Peer] = {
135135
val peersToUse = peersToDownloadFrom
136136
.collect {
137-
case (ref, PeerInfo(_, totalDifficulty, true, _)) =>
137+
case (ref, PeerInfo(_, totalDifficulty, true, _, _)) =>
138138
(ref, totalDifficulty)
139139
}
140140

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/OldRegularSync.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ class OldRegularSync(
559559

560560
private def bestPeer: Option[Peer] = {
561561
val peersToUse = peersToDownloadFrom
562-
.collect{ case (ref, PeerInfo(_, totalDifficulty, true, _)) => (ref, totalDifficulty) }
562+
.collect{ case (ref, PeerInfo(_, totalDifficulty, true, _, _)) => (ref, totalDifficulty) }
563563

564564
if (peersToUse.nonEmpty) {
565565
val (peer, _) = peersToUse.maxBy{ case (_, td) => td }

src/main/scala/io/iohk/ethereum/db/dataSource/EphemDataSource.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,23 @@ class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSou
99
* key.drop to remove namespace prefix from the key
1010
* @return key values paris from this storage
1111
*/
12-
def getAll(namespace: Namespace): Seq[(IndexedSeq[Byte], IndexedSeq[Byte])] =
12+
def getAll(namespace: Namespace): Seq[(IndexedSeq[Byte], IndexedSeq[Byte])] = synchronized {
1313
storage.toSeq.map{case (key, value) => (key.array().drop(namespace.length).toIndexedSeq, value.toIndexedSeq)}
14+
}
1415

15-
override def get(namespace: Namespace, key: Key): Option[Value] = storage.get(ByteBuffer.wrap((namespace ++ key).toArray)).map(_.toIndexedSeq)
16+
override def get(namespace: Namespace, key: Key): Option[Value] = synchronized {
17+
storage.get(ByteBuffer.wrap((namespace ++ key).toArray)).map(_.toIndexedSeq)
18+
}
1619

17-
override def update(namespace: Namespace, toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): DataSource = {
20+
override def update(namespace: Namespace, toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): DataSource = synchronized {
1821
val afterRemoval = toRemove.foldLeft(storage)((storage, key) => storage - ByteBuffer.wrap((namespace ++ key).toArray))
1922
val afterUpdate = toUpsert.foldLeft(afterRemoval)((storage, toUpdate) =>
2023
storage + (ByteBuffer.wrap((namespace ++ toUpdate._1).toArray) -> toUpdate._2.toArray))
2124
storage = afterUpdate
2225
this
2326
}
2427

25-
override def clear: DataSource = {
28+
override def clear: DataSource = synchronized {
2629
storage = Map()
2730
this
2831
}
@@ -31,7 +34,7 @@ class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSou
3134

3235
override def destroy(): Unit = ()
3336

34-
override def updateOptimized(toRemove: Seq[Array[Byte]], toUpsert: Seq[(Array[Byte], Array[Byte])]): DataSource = {
37+
override def updateOptimized(toRemove: Seq[Array[Byte]], toUpsert: Seq[(Array[Byte], Array[Byte])]): DataSource = synchronized {
3538
val afterRemoval = toRemove.foldLeft(storage)((storage, key) => storage - ByteBuffer.wrap(key))
3639
val afterUpdate = toUpsert.foldLeft(afterRemoval)((storage, toUpdate) =>
3740
storage + (ByteBuffer.wrap(toUpdate._1) -> toUpdate._2))

0 commit comments

Comments
 (0)