Skip to content

Commit cb678f3

Browse files
committed
unify fast and sync - change it regular sync assert
1 parent 87a4aa4 commit cb678f3

File tree

4 files changed

+330
-611
lines changed

4 files changed

+330
-611
lines changed

src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class RegularSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
2222
_ <- peer2.broadcastBlock()(IdentityUpdate).delayExecution(500.milliseconds)
2323
_ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer)
2424
} yield {
25-
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber())
25+
assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash)
2626
}
2727
}
2828

@@ -34,10 +34,10 @@ class RegularSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
3434
_ <- peer2.importBlocksUntil(blockNumer)(IdentityUpdate)
3535
_ <- peer1.connectToPeers(Set(peer2.node))
3636
_ <- peer1.startRegularSync().delayExecution(500.milliseconds)
37-
_ <- peer2.mineNewBlock()(IdentityUpdate).delayExecution(50.milliseconds)
38-
_ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer + 1)
37+
_ <- peer2.mineNewBlocks(100.milliseconds, 10)(IdentityUpdate)
38+
_ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer + 10)
3939
} yield {
40-
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber())
40+
assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash)
4141
}
4242
}
4343

@@ -59,9 +59,10 @@ class RegularSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
5959
_ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer + 3)
6060
_ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer + 3)
6161
} yield {
62-
assert(peer1.bl.getBestBlock().number == peer2.bl.getBestBlock().number)
63-
(peer1.bl.getBlockByNumber(blockNumer + 1), peer1.bl.getBlockByNumber(blockNumer + 1)) match {
64-
case (Some(blockP1), Some(blockP2)) => assert(blockP1.header.difficulty == blockP2.header.difficulty)
62+
assert(peer1.bl.getTotalDifficultyByHash(peer1.bl.getBestBlock().hash) == peer2.bl.getTotalDifficultyByHash(peer2.bl.getBestBlock().hash))
63+
(peer1.bl.getBlockByNumber(blockNumer + 1), peer2.bl.getBlockByNumber(blockNumer + 1)) match {
64+
case (Some(blockP1), Some(blockP2)) =>
65+
assert(peer1.bl.getTotalDifficultyByHash(blockP1.hash) == peer2.bl.getTotalDifficultyByHash(blockP2.hash))
6566
case (_ , _) => fail("invalid difficulty validation")
6667
}
6768
}
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
package io.iohk.ethereum.sync.util
2+
3+
import java.nio.file.Files
4+
import java.util.concurrent.atomic.AtomicReference
5+
6+
import akka.actor.{ActorRef, ActorSystem}
7+
import akka.testkit.TestProbe
8+
import akka.util.{ByteString, Timeout}
9+
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor
10+
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
11+
import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, TestSyncConfig}
12+
import io.iohk.ethereum.db.components.{RocksDbDataSourceComponent, Storages}
13+
import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource}
14+
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode}
15+
import io.iohk.ethereum.db.storage.{AppStateStorage, Namespaces}
16+
import io.iohk.ethereum.domain.{Block, Blockchain, BlockchainImpl}
17+
import io.iohk.ethereum.ledger.InMemoryWorldStateProxy
18+
import io.iohk.ethereum.mpt.MerklePatriciaTrie
19+
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
20+
import io.iohk.ethereum.network.PeerManagerActor.{FastSyncHostConfiguration, PeerConfiguration}
21+
import io.iohk.ethereum.network.discovery.Node
22+
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo, DiscoveryNodeInfo}
23+
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
24+
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
25+
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
26+
import io.iohk.ethereum.network.rlpx.AuthHandshaker
27+
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
28+
import io.iohk.ethereum.network.{EtcPeerManagerActor, ForkResolver, KnownNodesManager, PeerEventBusActor, PeerManagerActor, ServerActor}
29+
import io.iohk.ethereum.nodebuilder.{PruningConfigBuilder, SecureRandomBuilder}
30+
import io.iohk.ethereum.sync.util.SyncCommonItSpec._
31+
import io.iohk.ethereum.sync.util.SyncCommonItSpecUtils._
32+
import io.iohk.ethereum.utils.ServerStatus.Listening
33+
import io.iohk.ethereum.utils._
34+
import io.iohk.ethereum.vm.EvmConfig
35+
import io.iohk.ethereum.{Fixtures, Timeouts}
36+
import monix.eval.Task
37+
38+
import scala.concurrent.duration.{FiniteDuration, _}
39+
40+
abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig)
41+
extends SecureRandomBuilder
42+
with TestSyncConfig {
43+
implicit val akkaTimeout: Timeout = Timeout(5.second)
44+
45+
val config = Config.config
46+
47+
import scala.language.postfixOps
48+
49+
implicit val system = ActorSystem(peerName)
50+
51+
val peerDiscoveryManager = TestProbe().ref
52+
53+
val nodeKey = io.iohk.ethereum.crypto.generateKeyPair(secureRandom)
54+
55+
private val nodeStatus =
56+
NodeStatus(
57+
key = nodeKey,
58+
serverStatus = ServerStatus.NotListening,
59+
discoveryStatus = ServerStatus.NotListening
60+
)
61+
62+
lazy val tempDir = Files.createTempDirectory("temp-fast-sync")
63+
64+
def getRockDbTestConfig(dbPath: String) = {
65+
new RocksDbConfig {
66+
override val createIfMissing: Boolean = true
67+
override val paranoidChecks: Boolean = false
68+
override val path: String = dbPath
69+
override val maxThreads: Int = 1
70+
override val maxOpenFiles: Int = 32
71+
override val verifyChecksums: Boolean = false
72+
override val levelCompaction: Boolean = true
73+
override val blockSize: Long = 16384
74+
override val blockCacheSize: Long = 33554432
75+
}
76+
}
77+
78+
sealed trait LocalPruningConfigBuilder extends PruningConfigBuilder {
79+
override lazy val pruningMode: PruningMode = ArchivePruning
80+
}
81+
82+
lazy val nodeStatusHolder = new AtomicReference(nodeStatus)
83+
lazy val storagesInstance = new RocksDbDataSourceComponent
84+
with LocalPruningConfigBuilder
85+
with Storages.DefaultStorages {
86+
override lazy val dataSource: RocksDbDataSource =
87+
RocksDbDataSource(getRockDbTestConfig(tempDir.toAbsolutePath.toString), Namespaces.nsSeq)
88+
}
89+
lazy val blockchainConfig = Config.blockchains.blockchainConfig
90+
91+
/**
92+
* Default persist interval is 20s, which is too long for tests. As in all tests we treat peer as connected when
93+
* it is persisted in storage.
94+
*/
95+
lazy val knownNodesManagerConfig =
96+
KnownNodesManager.KnownNodesManagerConfig(config).copy(persistInterval = 1.seconds)
97+
98+
lazy val knownNodesManager = system.actorOf(
99+
KnownNodesManager.props(
100+
knownNodesManagerConfig,
101+
storagesInstance.storages.knownNodesStorage
102+
)
103+
)
104+
105+
val bl = BlockchainImpl(storagesInstance.storages)
106+
107+
val genesis = Block(
108+
Fixtures.Blocks.Genesis.header.copy(stateRoot = ByteString(MerklePatriciaTrie.EmptyRootHash)),
109+
Fixtures.Blocks.Genesis.body
110+
)
111+
112+
bl.save(genesis, Seq(), genesis.header.difficulty, saveAsBestBlock = true)
113+
114+
lazy val nh = nodeStatusHolder
115+
116+
val peerConf = new PeerConfiguration {
117+
override val fastSyncHostConfiguration: FastSyncHostConfiguration = new FastSyncHostConfiguration {
118+
val maxBlocksHeadersPerMessage: Int = fakePeerCustomConfig.hostConfig.maxBlocksHeadersPerMessage
119+
val maxBlocksBodiesPerMessage: Int = fakePeerCustomConfig.hostConfig.maxBlocksBodiesPerMessage
120+
val maxReceiptsPerMessage: Int = fakePeerCustomConfig.hostConfig.maxReceiptsPerMessage
121+
val maxMptComponentsPerMessage: Int = fakePeerCustomConfig.hostConfig.maxMptComponentsPerMessage
122+
}
123+
override val rlpxConfiguration: RLPxConfiguration = new RLPxConfiguration {
124+
override val waitForTcpAckTimeout: FiniteDuration = Timeouts.normalTimeout
125+
override val waitForHandshakeTimeout: FiniteDuration = Timeouts.normalTimeout
126+
}
127+
override val waitForHelloTimeout: FiniteDuration = 3 seconds
128+
override val waitForStatusTimeout: FiniteDuration = 30 seconds
129+
override val waitForChainCheckTimeout: FiniteDuration = 15 seconds
130+
override val connectMaxRetries: Int = 3
131+
override val connectRetryDelay: FiniteDuration = 1 second
132+
override val disconnectPoisonPillTimeout: FiniteDuration = 3 seconds
133+
override val maxOutgoingPeers = 10
134+
override val maxIncomingPeers = 5
135+
override val maxPendingPeers = 5
136+
override val networkId: Int = 1
137+
138+
override val updateNodesInitialDelay: FiniteDuration = 5.seconds
139+
override val updateNodesInterval: FiniteDuration = 20.seconds
140+
override val shortBlacklistDuration: FiniteDuration = 1.minute
141+
override val longBlacklistDuration: FiniteDuration = 3.minutes
142+
}
143+
144+
lazy val peerEventBus = system.actorOf(PeerEventBusActor.props, "peer-event-bus")
145+
146+
private val handshakerConfiguration: EtcHandshakerConfiguration =
147+
new EtcHandshakerConfiguration {
148+
override val forkResolverOpt: Option[ForkResolver] = None
149+
override val nodeStatusHolder: AtomicReference[NodeStatus] = nh
150+
override val peerConfiguration: PeerConfiguration = peerConf
151+
override val blockchain: Blockchain = bl
152+
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
153+
}
154+
155+
lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration)
156+
157+
lazy val authHandshaker: AuthHandshaker = AuthHandshaker(nodeKey, secureRandom)
158+
159+
lazy val peerManager: ActorRef = system.actorOf(
160+
PeerManagerActor.props(
161+
peerDiscoveryManager,
162+
Config.Network.peer,
163+
peerEventBus,
164+
knownNodesManager,
165+
handshaker,
166+
authHandshaker,
167+
EthereumMessageDecoder
168+
),
169+
"peer-manager"
170+
)
171+
172+
lazy val etcPeerManager: ActorRef = system.actorOf(
173+
EtcPeerManagerActor.props(peerManager, peerEventBus, storagesInstance.storages.appStateStorage, None),
174+
"etc-peer-manager"
175+
)
176+
177+
val blockchainHost: ActorRef =
178+
system.actorOf(BlockchainHostActor.props(bl, peerConf, peerEventBus, etcPeerManager), "blockchain-host")
179+
180+
lazy val server: ActorRef = system.actorOf(ServerActor.props(nodeStatusHolder, peerManager), "server")
181+
182+
val listenAddress = randomAddress()
183+
184+
lazy val node =
185+
DiscoveryNodeInfo(
186+
Node(ByteString(nodeStatus.nodeId), listenAddress.getAddress, listenAddress.getPort, listenAddress.getPort),
187+
1
188+
)
189+
190+
lazy val vmConfig = VmConfig(Config.config)
191+
192+
val testSyncConfig = syncConfig.copy(
193+
minPeersToChoosePivotBlock = 1,
194+
peersScanInterval = 5.milliseconds,
195+
blockHeadersPerRequest = 200,
196+
blockBodiesPerRequest = 50,
197+
receiptsPerRequest = 50,
198+
fastSyncThrottle = 10.milliseconds,
199+
startRetryInterval = 50.milliseconds,
200+
nodesPerRequest = 200,
201+
maxTargetDifference = 1,
202+
syncRetryInterval = 50.milliseconds
203+
)
204+
205+
lazy val broadcaster = new BlockBroadcast(etcPeerManager, testSyncConfig)
206+
207+
lazy val broadcasterActor = system.actorOf(
208+
BlockBroadcasterActor.props(broadcaster, peerEventBus, etcPeerManager, testSyncConfig, system.scheduler)
209+
)
210+
211+
private def getMptForBlock(block: Block) = {
212+
bl.getWorldStateProxy(
213+
blockNumber = block.number,
214+
accountStartNonce = blockchainConfig.accountStartNonce,
215+
stateRootHash = Some(block.header.stateRoot),
216+
noEmptyAccounts = EvmConfig.forBlock(block.number, blockchainConfig).noEmptyAccounts,
217+
ethCompatibleStorage = blockchainConfig.ethCompatibleStorage
218+
)
219+
}
220+
221+
private def broadcastBlock(block: Block, td: BigInt) = {
222+
broadcasterActor ! BroadcastBlock(NewBlock(block, td))
223+
}
224+
225+
def getCurrentState(): BlockchainState = {
226+
val bestBlock = bl.getBestBlock()
227+
val currentWorldState = getMptForBlock(bestBlock)
228+
val currentTd = bl.getTotalDifficultyByHash(bestBlock.hash).get
229+
BlockchainState(bestBlock, currentWorldState, currentTd)
230+
}
231+
232+
def startPeer(): Task[Unit] = {
233+
for {
234+
_ <- Task {
235+
peerManager ! PeerManagerActor.StartConnecting
236+
server ! ServerActor.StartServer(listenAddress)
237+
}
238+
_ <- retryUntilWithDelay(Task(nodeStatusHolder.get()), 1.second, 5) { status =>
239+
status.serverStatus == Listening(listenAddress)
240+
}
241+
} yield ()
242+
}
243+
244+
def shutdown(): Task[Unit] = {
245+
for {
246+
_ <- Task.deferFuture(system.terminate())
247+
_ <- Task(storagesInstance.dataSource.destroy())
248+
} yield ()
249+
}
250+
251+
def connectToPeers(nodes: Set[DiscoveryNodeInfo]): Task[Unit] = {
252+
for {
253+
_ <- Task {
254+
peerManager ! DiscoveredNodesInfo(nodes)
255+
}
256+
_ <- retryUntilWithDelay(Task(storagesInstance.storages.knownNodesStorage.getKnownNodes()), 1.second, 5) {
257+
knownNodes =>
258+
val requestedNodes = nodes.map(_.node.id)
259+
val currentNodes = knownNodes.map(Node.fromUri).map(_.id)
260+
requestedNodes.subsetOf(currentNodes)
261+
}
262+
} yield ()
263+
}
264+
265+
private def createChildBlock(parent: Block, parentTd: BigInt, parentWorld: InMemoryWorldStateProxy)(
266+
updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy
267+
): (Block, BigInt, InMemoryWorldStateProxy) = {
268+
val newBlockNumber = parent.header.number + 1
269+
val newWorld = updateWorldForBlock(newBlockNumber, parentWorld)
270+
val newBlock = parent.copy(header =
271+
parent.header.copy(parentHash = parent.header.hash, number = newBlockNumber, stateRoot = newWorld.stateRootHash)
272+
)
273+
val newTd = newBlock.header.difficulty + parentTd
274+
(newBlock, newTd, parentWorld)
275+
}
276+
277+
def importBlocksUntil(
278+
n: BigInt
279+
)(updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = {
280+
Task(bl.getBestBlock()).flatMap { block =>
281+
if (block.number >= n) {
282+
Task(())
283+
} else {
284+
Task {
285+
val currentTd = bl.getTotalDifficultyByHash(block.hash).get
286+
val currentWolrd = getMptForBlock(block)
287+
val (newBlock, newTd, newWorld) = createChildBlock(block, currentTd, currentWolrd)(updateWorldForBlock)
288+
bl.save(newBlock, Seq(), newTd, saveAsBestBlock = true)
289+
bl.persistCachedNodes()
290+
broadcastBlock(newBlock, newTd)
291+
}.flatMap(_ => importBlocksUntil(n)(updateWorldForBlock))
292+
}
293+
}
294+
}
295+
296+
}

0 commit comments

Comments
 (0)