Skip to content

Commit 524a463

Browse files
committed
[ETCM-213] Extract bloom loading to sparate class. More tests.
1 parent 116782f commit 524a463

File tree

7 files changed

+184
-40
lines changed

7 files changed

+184
-40
lines changed

src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,21 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
129129
}
130130
}
131131

132+
it should "sync state to peer from partially synced state" in customTestCaseResourceM(
133+
FakePeer.start2FakePeersRes()
134+
) { case (peer1, peer2) =>
135+
for {
136+
_ <- peer2.importBlocksUntil(2000)(updateStateAtBlock(1500))
137+
_ <- peer2.importBlocksUntil(3000)(updateStateAtBlock(2500, 1000, 2000))
138+
_ <- peer1.importBlocksUntil(2000)(updateStateAtBlock(1500))
139+
_ <- peer1.startWithState()
140+
_ <- peer1.connectToPeers(Set(peer2.node))
141+
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
142+
_ <- peer1.waitForFastSyncFinish()
143+
} yield {
144+
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset)
145+
}
146+
}
132147
}
133148

134149
object FastSyncItSpec {
@@ -145,8 +160,12 @@ object FastSyncItSpec {
145160

146161
val IdentityUpdate: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = (_, world) => world
147162

148-
def updateWorldWithNAccounts(n: Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = {
149-
val resultWorld = (0 until n).foldLeft(world) { (world, num) =>
163+
def updateWorldWithAccounts(
164+
startAccount: Int,
165+
endAccount: Int,
166+
world: InMemoryWorldStateProxy
167+
): InMemoryWorldStateProxy = {
168+
val resultWorld = (startAccount until endAccount).foldLeft(world) { (world, num) =>
150169
val randomBalance = num
151170
val randomAddress = Address(num)
152171
val codeBytes = BigInt(num).toByteArray
@@ -160,10 +179,14 @@ object FastSyncItSpec {
160179
InMemoryWorldStateProxy.persistState(resultWorld)
161180
}
162181

163-
def updateStateAtBlock(blockWithUpdate: BigInt): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = {
182+
def updateStateAtBlock(
183+
blockWithUpdate: BigInt,
184+
startAccount: Int = 0,
185+
endAccount: Int = 1000
186+
): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = {
164187
(blockNr: BigInt, world: InMemoryWorldStateProxy) =>
165188
if (blockNr == blockWithUpdate) {
166-
updateWorldWithNAccounts(1000, world)
189+
updateWorldWithAccounts(startAccount, endAccount, world)
167190
} else {
168191
IdentityUpdate(blockNr, world)
169192
}

src/it/scala/io/iohk/ethereum/sync/FastSyncItSpecUtils.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import akka.testkit.TestProbe
99
import akka.util.{ByteString, Timeout}
1010
import cats.effect.Resource
1111
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
12+
import io.iohk.ethereum.blockchain.sync.FastSync.SyncState
1213
import io.iohk.ethereum.{Fixtures, Timeouts}
1314
import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig}
1415
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor
@@ -363,6 +364,14 @@ object FastSyncItSpecUtils {
363364
}
364365
}
365366

367+
def startWithState(): Task[Unit] = {
368+
val currentBest = bl.getBestBlock().header
369+
val safeTarget = currentBest.number + syncConfig.fastSyncBlockValidationX
370+
val nextToValidate = currentBest.number + 1
371+
val syncState = SyncState(currentBest, safeTarget, Seq(), Seq(), 0, 0, currentBest.number, nextToValidate)
372+
Task(storagesInstance.storages.fastSyncStateStorage.putSyncState(syncState)).map(_ => ())
373+
}
374+
366375
def startFastSync(): Task[Unit] = Task {
367376
fastSync ! FastSync.Start
368377
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.iohk.ethereum.blockchain.sync
2+
3+
import com.google.common.hash.{BloomFilter, Funnel}
4+
import io.iohk.ethereum.blockchain.sync.LoadableBloomFilter.BloomFilterLoadingResult
5+
import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError
6+
import monix.eval.Task
7+
import monix.reactive.{Consumer, Observable}
8+
9+
class LoadableBloomFilter[A](bloomFilter: BloomFilter[A], source: Observable[Either[IterationError, A]]) {
10+
val loadFromSource: Task[BloomFilterLoadingResult] = {
11+
source
12+
.consumeWith(Consumer.foldLeftTask(BloomFilterLoadingResult()) { (s, e) =>
13+
e match {
14+
case Left(value) => Task.now(s.copy(error = Some(value)))
15+
case Right(value) => Task(bloomFilter.put(value)).map(_ => s.copy(writtenElements = s.writtenElements + 1))
16+
}
17+
})
18+
.memoizeOnSuccess
19+
}
20+
21+
def put(elem: A): Boolean = bloomFilter.put(elem)
22+
23+
def mightContain(elem: A): Boolean = bloomFilter.mightContain(elem)
24+
25+
def approximateElementCount: Long = bloomFilter.approximateElementCount()
26+
}
27+
28+
object LoadableBloomFilter {
29+
def apply[A](expectedSize: Int, loadingSource: Observable[Either[IterationError, A]])(implicit
30+
f: Funnel[A]
31+
): LoadableBloomFilter[A] = {
32+
new LoadableBloomFilter[A](BloomFilter.create[A](f, expectedSize), loadingSource)
33+
}
34+
35+
case class BloomFilterLoadingResult(writtenElements: Long, error: Option[IterationError])
36+
object BloomFilterLoadingResult {
37+
def apply(): BloomFilterLoadingResult = new BloomFilterLoadingResult(0, None)
38+
39+
def apply(ex: Throwable): BloomFilterLoadingResult = new BloomFilterLoadingResult(0, Some(IterationError(ex)))
40+
}
41+
}

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateScheduler.scala

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ import java.util.Comparator
44

55
import akka.util.ByteString
66
import com.google.common.hash.{BloomFilter, Funnel, PrimitiveSink}
7+
import io.iohk.ethereum.blockchain.sync.LoadableBloomFilter.BloomFilterLoadingResult
78
import io.iohk.ethereum.blockchain.sync.SyncStateScheduler._
8-
import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError
99
import io.iohk.ethereum.domain.{Account, Blockchain}
1010
import io.iohk.ethereum.mpt.{BranchNode, ExtensionNode, HashNode, LeafNode, MerklePatriciaTrie, MptNode}
1111
import io.vavr.collection.PriorityQueue
1212
import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._
1313
import monix.eval.Task
14-
import monix.reactive.Consumer
1514

1615
import scala.annotation.tailrec
1716
import scala.util.Try
@@ -43,18 +42,9 @@ import scala.util.Try
4342
*
4443
* Important part is that nodes retrieved by getMissingNodes, must eventually be provided for scheduler to make progress
4544
*/
46-
class SyncStateScheduler(blockchain: Blockchain, bloomFilter: BloomFilter[ByteString]) {
47-
48-
def loadFilterFromBlockchain: Task[BloomFilterLoadingResult] = {
49-
blockchain
50-
.mptStateSavedKeys()
51-
.consumeWith(Consumer.foldLeftTask(BloomFilterLoadingResult()) { (s, e) =>
52-
e match {
53-
case Left(value) => Task.now(s.copy(error = Some(value)))
54-
case Right(value) => Task(bloomFilter.put(value)).map(_ => s.copy(writtenElements = s.writtenElements + 1))
55-
}
56-
})
57-
}
45+
class SyncStateScheduler(blockchain: Blockchain, bloomFilter: LoadableBloomFilter[ByteString]) {
46+
47+
val loadFilterFromBlockchain: Task[BloomFilterLoadingResult] = bloomFilter.loadFromSource
5848

5949
def initState(targetRootHash: ByteString): Option[SchedulerState] = {
6050
if (targetRootHash == emptyStateRootHash) {
@@ -282,7 +272,7 @@ object SyncStateScheduler {
282272

283273
case object StorageNode extends NodeRequest
284274

285-
object ByteStringFunnel extends Funnel[ByteString] {
275+
implicit object ByteStringFunnel extends Funnel[ByteString] {
286276
override def funnel(from: ByteString, into: PrimitiveSink): Unit = {
287277
into.putBytes(from.toArray)
288278
}
@@ -291,10 +281,14 @@ object SyncStateScheduler {
291281
def getEmptyFilter(expectedFilterSize: Int): BloomFilter[ByteString] = {
292282
BloomFilter.create[ByteString](ByteStringFunnel, expectedFilterSize)
293283
}
294-
// TODO [ETCM-213] add method to load bloom filter after node restart. Perfect way to do it would be to expose Observable
295-
// in RocksDBDataSource which underneath would use RockDbIterator which would traverse whole namespace.
284+
296285
def apply(blockchain: Blockchain, expectedBloomFilterSize: Int): SyncStateScheduler = {
297-
new SyncStateScheduler(blockchain, getEmptyFilter(expectedBloomFilterSize))
286+
// provided source i.e mptStateSavedKeys() is guaranteed to finish on first `Left` element which means that returned
287+
// error is the reason why loading has stopped
288+
new SyncStateScheduler(
289+
blockchain,
290+
LoadableBloomFilter[ByteString](expectedBloomFilterSize, blockchain.mptStateSavedKeys())
291+
)
298292
}
299293

300294
final case class StateNodeRequest(
@@ -477,12 +471,4 @@ object SyncStateScheduler {
477471
object ProcessingStatistics {
478472
def apply(): ProcessingStatistics = new ProcessingStatistics(0, 0, 0)
479473
}
480-
481-
case class BloomFilterLoadingResult(writtenElements: Long, error: Option[IterationError])
482-
object BloomFilterLoadingResult {
483-
def apply(): BloomFilterLoadingResult = new BloomFilterLoadingResult(0, None)
484-
485-
def apply(ex: Throwable): BloomFilterLoadingResult = new BloomFilterLoadingResult(0, Some(IterationError(ex)))
486-
}
487-
488474
}

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@ package io.iohk.ethereum.blockchain.sync
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers}
44
import akka.util.ByteString
5+
import io.iohk.ethereum.blockchain.sync.LoadableBloomFilter.BloomFilterLoadingResult
56
import io.iohk.ethereum.blockchain.sync.SyncStateDownloaderActor.{CancelDownload, RegisterScheduler}
6-
import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.{
7-
BloomFilterLoadingResult,
8-
ProcessingStatistics,
9-
SchedulerState,
10-
SyncResponse
11-
}
7+
import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.{ProcessingStatistics, SchedulerState, SyncResponse}
128
import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{
139
BloomFilterResult,
1410
GetMissingNodes,
@@ -48,6 +44,10 @@ class SyncStateSchedulerActor(downloader: ActorRef, sync: SyncStateScheduler, sy
4844

4945
def waitingForBloomFilterToLoad(lastReceivedCommand: Option[(SyncStateSchedulerActorCommand, ActorRef)]): Receive = {
5046
case BloomFilterResult(result) =>
47+
log.debug(
48+
s"Loaded ${result.writtenElements} already known elements from storage to bloom filter the error while loading " +
49+
s"was ${result.error}"
50+
)
5151
lastReceivedCommand match {
5252
case Some((startSignal: StartSyncingTo, sender)) =>
5353
val initStats = ProcessingStatistics().addSaved(result.writtenElements)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.iohk.ethereum.blockchain.sync
2+
3+
import com.google.common.hash.{Funnel, Funnels, PrimitiveSink}
4+
import io.iohk.ethereum.FlatSpecBase
5+
import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError
6+
import monix.eval.Task
7+
import monix.reactive.Observable
8+
9+
class LoadableBloomFilterSpec extends FlatSpecBase {
10+
implicit object LongFun extends Funnel[Long] {
11+
override def funnel(from: Long, into: PrimitiveSink): Unit = {
12+
Funnels.longFunnel().funnel(from, into)
13+
}
14+
}
15+
16+
"LoadableBloomFilter" should "load all correct elements " in testCaseM[Task] {
17+
for {
18+
source <- Task(Observable.fromIterable(Seq(Right(1L), Right(2L), Right(3L))))
19+
filter <- Task.now(LoadableBloomFilter[Long](1000, source))
20+
result <- filter.loadFromSource
21+
} yield {
22+
assert(result.writtenElements == 3)
23+
assert(result.error.isEmpty)
24+
assert(filter.approximateElementCount == 3)
25+
}
26+
}
27+
28+
it should "load filter only once" in testCaseM[Task] {
29+
for {
30+
source <- Task(Observable.fromIterable(Seq(Right(1L), Right(2L), Right(3L))))
31+
filter <- Task.now(LoadableBloomFilter[Long](1000, source))
32+
result <- filter.loadFromSource
33+
result1 <- filter.loadFromSource
34+
} yield {
35+
assert(result.writtenElements == 3)
36+
assert(result.error.isEmpty)
37+
assert(filter.approximateElementCount == 3)
38+
assert(result1 == result)
39+
}
40+
}
41+
42+
it should "report last error if encountered" in testCaseM[Task] {
43+
for {
44+
error <- Task(IterationError(new RuntimeException("test")))
45+
source <- Task(Observable.fromIterable(Seq(Right(1L), Right(2L), Right(3L), Left(error))))
46+
filter <- Task.now(LoadableBloomFilter[Long](1000, source))
47+
result <- filter.loadFromSource
48+
} yield {
49+
assert(result.writtenElements == 3)
50+
assert(result.error.contains(error))
51+
assert(filter.approximateElementCount == 3)
52+
}
53+
}
54+
55+
}

src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncSpec.scala

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import java.util.concurrent.ThreadLocalRandom
66
import akka.actor.{ActorRef, ActorSystem}
77
import akka.testkit.TestActor.AutoPilot
88
import akka.testkit.{TestKit, TestProbe}
9-
import io.iohk.ethereum.blockchain.sync.StateSyncUtils.TrieProvider
9+
import akka.util.ByteString
10+
import io.iohk.ethereum.blockchain.sync.StateSyncUtils.{MptNodeData, TrieProvider}
1011
import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{
1112
RestartRequested,
1213
StartSyncingTo,
1314
StateSyncFinished,
1415
WaitingForNewTargetBlock
1516
}
16-
import io.iohk.ethereum.domain.BlockchainImpl
17+
import io.iohk.ethereum.domain.{Address, BlockchainImpl}
1718
import io.iohk.ethereum.network.EtcPeerManagerActor.{GetHandshakedPeers, HandshakedPeers, PeerInfo, SendMessage}
1819
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
1920
import io.iohk.ethereum.network.p2p.messages.CommonMessages.Status
@@ -30,6 +31,7 @@ import org.scalatest.matchers.should.Matchers
3031
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
3132

3233
import scala.concurrent.duration._
34+
import scala.util.Random
3335

3436
class StateSyncSpec
3537
extends TestKit(ActorSystem("MySpec"))
@@ -89,6 +91,24 @@ class StateSyncSpec
8991
}
9092
}
9193

94+
it should "start state sync when receiving start signal while bloom filter is loading" in new TestSetup() {
95+
override def buildBlockChain(): BlockchainImpl = {
96+
val storages = getNewStorages
97+
//iterating 1M key and values should force scheduler actor o enqueue last received command i.e StartSyncing
98+
(0 until 1000000).foreach { i =>
99+
storages.storages.nodeStorage.update(Seq(), Seq(genRandomByteString() -> genRandomArray()))
100+
}
101+
BlockchainImpl(storages.storages)
102+
}
103+
val nodeData = (0 until 1000).map(i => MptNodeData(Address(i), None, Seq(), i))
104+
val initiator = TestProbe()
105+
val trieProvider1 = TrieProvider()
106+
val target = trieProvider1.buildWorld(nodeData)
107+
setAutoPilotWithProvider(trieProvider1)
108+
initiator.send(scheduler, StartSyncingTo(target, 1))
109+
initiator.expectMsg(20.seconds, StateSyncFinished)
110+
}
111+
92112
class TestSetup extends EphemBlockchainTestSetup with TestSyncConfig {
93113
override implicit lazy val system = actorSystem
94114
type PeerConfig = Map[PeerId, PeerAction]
@@ -195,12 +215,22 @@ class StateSyncSpec
195215
BlockchainImpl(getNewStorages.storages)
196216
}
197217

218+
def genRandomArray(): Array[Byte] = {
219+
val arr = new Array[Byte](32)
220+
Random.nextBytes(arr)
221+
arr
222+
}
223+
224+
def genRandomByteString(): ByteString = {
225+
ByteString.fromArrayUnsafe(genRandomArray())
226+
}
227+
198228
lazy val scheduler = system.actorOf(
199229
SyncStateSchedulerActor.props(
200230
downloader,
201-
new SyncStateScheduler(
231+
SyncStateScheduler(
202232
buildBlockChain(),
203-
SyncStateScheduler.getEmptyFilter(syncConfig.stateSyncBloomFilterSize)
233+
syncConfig.stateSyncBloomFilterSize
204234
),
205235
syncConfig
206236
)

0 commit comments

Comments
 (0)