1
+ package io .iohk .ethereum .sync
2
+
3
+ import java .net .{InetSocketAddress , ServerSocket }
4
+ import java .util .concurrent .TimeoutException
5
+ import java .util .concurrent .atomic .AtomicReference
6
+
7
+ import akka .actor .{ActorRef , ActorSystem }
8
+ import akka .testkit .TestProbe
9
+ import akka .util .{ByteString , Timeout }
10
+ import cats .effect .Resource
11
+ import io .iohk .ethereum .Mocks .MockValidatorsAlwaysSucceed
12
+ import io .iohk .ethereum .blockchain .sync .{BlockchainHostActor , FastSync , TestSyncConfig }
13
+ import io .iohk .ethereum .db .components .{SharedEphemDataSources , Storages }
14
+ import io .iohk .ethereum .db .storage .AppStateStorage
15
+ import io .iohk .ethereum .db .storage .pruning .{ArchivePruning , PruningMode }
16
+ import io .iohk .ethereum .domain .{Block , Blockchain , BlockchainImpl }
17
+ import io .iohk .ethereum .mpt .MerklePatriciaTrie
18
+ import io .iohk .ethereum .network .EtcPeerManagerActor .PeerInfo
19
+ import io .iohk .ethereum .network .PeerManagerActor .{FastSyncHostConfiguration , PeerConfiguration }
20
+ import io .iohk .ethereum .network .discovery .Node
21
+ import io .iohk .ethereum .network .discovery .PeerDiscoveryManager .{DiscoveredNodesInfo , DiscoveryNodeInfo }
22
+ import io .iohk .ethereum .network .handshaker .{EtcHandshaker , EtcHandshakerConfiguration , Handshaker }
23
+ import io .iohk .ethereum .network .p2p .EthereumMessageDecoder
24
+ import io .iohk .ethereum .network .rlpx .AuthHandshaker
25
+ import io .iohk .ethereum .network .rlpx .RLPxConnectionHandler .RLPxConfiguration
26
+ import io .iohk .ethereum .network .{EtcPeerManagerActor , ForkResolver , KnownNodesManager , PeerEventBusActor , PeerManagerActor , ServerActor }
27
+ import io .iohk .ethereum .nodebuilder .{PruningConfigBuilder , SecureRandomBuilder }
28
+ import io .iohk .ethereum .sync .FastSyncItSpec .{FakePeer , customTestCaseResourceM }
29
+ import io .iohk .ethereum .utils .ServerStatus .Listening
30
+ import io .iohk .ethereum .utils .{Config , NodeStatus , ServerStatus , VmConfig }
31
+ import io .iohk .ethereum .vm .EvmConfig
32
+ import io .iohk .ethereum .{Fixtures , Timeouts }
33
+ import monix .eval .Task
34
+ import monix .execution .Scheduler
35
+ import org .scalatest .{Assertion , AsyncFlatSpec , BeforeAndAfter , Matchers }
36
+
37
+ import scala .concurrent .Future
38
+ import scala .concurrent .duration ._
39
+
40
+ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter {
41
+ implicit val testScheduler = Scheduler .fixedPool(" test" , 16 )
42
+
43
+ " FastSync" should " should sync blockchain without state nodes" in customTestCaseResourceM(FakePeer .start3FakePeersRes()) {
44
+ case (peer1, peer2, peer3) =>
45
+ for {
46
+ _ <- Task .parZip3(peer1.startPeer(), peer2.startPeer(), peer3.startPeer())
47
+ _ <- peer2.saveNBlocks(1000 )
48
+ _ <- peer3.saveNBlocks(1000 )
49
+ _ <- peer1.connectToPeers(Set (peer2.node, peer3.node))
50
+ _ <- peer1.startFastSync()
51
+ _ <- peer1.waitForFastSyncFinish()
52
+ } yield {
53
+ assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.syncConfig.targetBlockOffset)
54
+ assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.syncConfig.targetBlockOffset)
55
+ }
56
+ }
57
+
58
+ }
59
+
60
+ object FastSyncItSpec {
61
+ private def retryUntilWithDelay [A ](source : Task [A ], delay : FiniteDuration , maxRetries : Int )(
62
+ predicate : A => Boolean
63
+ ): Task [A ] = {
64
+ source.delayExecution(delay).flatMap { result =>
65
+ if (predicate(result)) {
66
+ Task .now(result)
67
+ } else {
68
+ if (maxRetries > 0 ) {
69
+ retryUntilWithDelay(source, delay, maxRetries - 1 )(predicate)
70
+ } else {
71
+ Task .raiseError(new TimeoutException (" Task time out after all retries" ))
72
+ }
73
+ }
74
+ }
75
+ }
76
+
77
+ def randomAddress (): InetSocketAddress = {
78
+ val s = new ServerSocket (0 )
79
+ try {
80
+ new InetSocketAddress (" localhost" , s.getLocalPort)
81
+ } finally {
82
+ s.close()
83
+ }
84
+ }
85
+
86
+ def customTestCaseResourceM [T ](fixture : Resource [Task , T ])
87
+ (theTest : T => Task [Assertion ])(implicit s : Scheduler ): Future [Assertion ] = {
88
+ fixture.use(theTest).runToFuture
89
+ }
90
+
91
+ def generateBlockChain (startBlock : Block , number : Int ): Seq [Block ] = {
92
+ def recur (last : Block , blocksLeft : Int , blocksCreated : List [Block ]): List [Block ] = {
93
+ if (blocksLeft <= 0 ) {
94
+ blocksCreated.reverse
95
+ } else {
96
+ val newBlock = last.copy(header = last.header.copy(parentHash = last.header.hash, number = last.header.number + 1 ))
97
+ recur(newBlock, blocksLeft - 1 , newBlock :: blocksCreated)
98
+ }
99
+ }
100
+ recur(startBlock, number, List .empty)
101
+ }
102
+
103
+ class FakePeer (peerName : String ) extends SecureRandomBuilder with TestSyncConfig {
104
+ implicit val akkaTimeout : Timeout = Timeout (5 .second)
105
+
106
+ val config = Config .config
107
+
108
+ import scala .language .postfixOps
109
+
110
+ implicit val system = ActorSystem (peerName)
111
+
112
+ val peerDiscoveryManager = TestProbe ().ref
113
+
114
+ val nodeKey = io.iohk.ethereum.crypto.generateKeyPair(secureRandom)
115
+
116
+ private val nodeStatus =
117
+ NodeStatus (
118
+ key = nodeKey,
119
+ serverStatus = ServerStatus .NotListening ,
120
+ discoveryStatus = ServerStatus .NotListening
121
+ )
122
+
123
+ sealed trait LocalPruningConfigBuilder extends PruningConfigBuilder {
124
+ override lazy val pruningMode : PruningMode = ArchivePruning
125
+ }
126
+
127
+ lazy val nodeStatusHolder = new AtomicReference (nodeStatus)
128
+ lazy val storagesInstance = new SharedEphemDataSources with LocalPruningConfigBuilder with Storages .DefaultStorages
129
+ lazy val blockchainConfig = Config .blockchains.blockchainConfig
130
+ /**
131
+ * Default persist interval is 20s, which is too long for tests. As in all tests we treat peer as connected when
132
+ * it is persisted in storage.
133
+ */
134
+ lazy val knownNodesManagerConfig =
135
+ KnownNodesManager .KnownNodesManagerConfig (config).copy(persistInterval = 1 .seconds)
136
+
137
+ lazy val knownNodesManager = system.actorOf(
138
+ KnownNodesManager .props(
139
+ knownNodesManagerConfig,
140
+ storagesInstance.storages.knownNodesStorage
141
+ )
142
+ )
143
+
144
+ val bl = BlockchainImpl (storagesInstance.storages)
145
+
146
+ val genesis = Block (Fixtures .Blocks .Genesis .header.copy(stateRoot = ByteString (MerklePatriciaTrie .EmptyRootHash ) ), Fixtures .Blocks .Genesis .body)
147
+
148
+ bl.save(genesis, Seq (), genesis.header.difficulty, saveAsBestBlock = true )
149
+
150
+ lazy val nh = nodeStatusHolder
151
+
152
+ val peerConf = new PeerConfiguration {
153
+ override val fastSyncHostConfiguration : FastSyncHostConfiguration = new FastSyncHostConfiguration {
154
+ val maxBlocksHeadersPerMessage : Int = 200
155
+ val maxBlocksBodiesPerMessage : Int = 200
156
+ val maxReceiptsPerMessage : Int = 200
157
+ val maxMptComponentsPerMessage : Int = 200
158
+ }
159
+ override val rlpxConfiguration : RLPxConfiguration = new RLPxConfiguration {
160
+ override val waitForTcpAckTimeout : FiniteDuration = Timeouts .normalTimeout
161
+ override val waitForHandshakeTimeout : FiniteDuration = Timeouts .normalTimeout
162
+ }
163
+ override val waitForHelloTimeout : FiniteDuration = 3 seconds
164
+ override val waitForStatusTimeout : FiniteDuration = 30 seconds
165
+ override val waitForChainCheckTimeout : FiniteDuration = 15 seconds
166
+ override val connectMaxRetries : Int = 3
167
+ override val connectRetryDelay : FiniteDuration = 1 second
168
+ override val disconnectPoisonPillTimeout : FiniteDuration = 3 seconds
169
+ override val maxOutgoingPeers = 10
170
+ override val maxIncomingPeers = 5
171
+ override val maxPendingPeers = 5
172
+ override val networkId : Int = 1
173
+
174
+ override val updateNodesInitialDelay : FiniteDuration = 5 .seconds
175
+ override val updateNodesInterval : FiniteDuration = 20 .seconds
176
+ override val shortBlacklistDuration : FiniteDuration = 1 .minute
177
+ override val longBlacklistDuration : FiniteDuration = 3 .minutes
178
+ }
179
+
180
+ lazy val peerEventBus = system.actorOf(PeerEventBusActor .props, " peer-event-bus" )
181
+
182
+ private val handshakerConfiguration : EtcHandshakerConfiguration =
183
+ new EtcHandshakerConfiguration {
184
+ override val forkResolverOpt : Option [ForkResolver ] = None
185
+ override val nodeStatusHolder : AtomicReference [NodeStatus ] = nh
186
+ override val peerConfiguration : PeerConfiguration = peerConf
187
+ override val blockchain : Blockchain = bl
188
+ override val appStateStorage : AppStateStorage = storagesInstance.storages.appStateStorage
189
+ }
190
+
191
+ lazy val handshaker : Handshaker [PeerInfo ] = EtcHandshaker (handshakerConfiguration)
192
+
193
+ lazy val authHandshaker : AuthHandshaker = AuthHandshaker (nodeKey, secureRandom)
194
+
195
+ lazy val peerManager : ActorRef = system.actorOf(PeerManagerActor .props(
196
+ peerDiscoveryManager,
197
+ Config .Network .peer,
198
+ peerEventBus,
199
+ knownNodesManager,
200
+ handshaker,
201
+ authHandshaker,
202
+ EthereumMessageDecoder
203
+ ), " peer-manager" )
204
+
205
+ lazy val etcPeerManager : ActorRef = system.actorOf(EtcPeerManagerActor .props(
206
+ peerManager, peerEventBus, storagesInstance.storages.appStateStorage, None ), " etc-peer-manager" )
207
+
208
+ val blockchainHost : ActorRef = system.actorOf(BlockchainHostActor .props(
209
+ bl, peerConf, peerEventBus, etcPeerManager), " blockchain-host" )
210
+
211
+ lazy val server : ActorRef = system.actorOf(ServerActor .props(nodeStatusHolder, peerManager), " server" )
212
+
213
+ val listenAddress = randomAddress()
214
+
215
+ lazy val node =
216
+ DiscoveryNodeInfo (Node (ByteString (nodeStatus.nodeId), listenAddress.getAddress, listenAddress.getPort, listenAddress.getPort), 1 )
217
+
218
+ lazy val vmConfig = VmConfig (Config .config)
219
+
220
+ lazy val validators = new MockValidatorsAlwaysSucceed
221
+
222
+ val testSyncConfig = syncConfig.copy(
223
+ minPeersToChooseTargetBlock = 1 ,
224
+ peersScanInterval = 1 .second,
225
+ blockHeadersPerRequest = 200 ,
226
+ blockBodiesPerRequest = 50 ,
227
+ receiptsPerRequest = 50 ,
228
+ fastSyncThrottle = 10 .milliseconds,
229
+ startRetryInterval = 50 .milliseconds,
230
+ )
231
+
232
+ lazy val fastSync = system.actorOf(FastSync .props(
233
+ storagesInstance.storages.fastSyncStateStorage,
234
+ storagesInstance.storages.appStateStorage,
235
+ bl,
236
+ validators,
237
+ peerEventBus,
238
+ etcPeerManager,
239
+ testSyncConfig,
240
+ system.scheduler
241
+ ))
242
+
243
+ def getMptForBlock (blockHeaderNumber : BigInt ) = {
244
+ bl.getWorldStateProxy(
245
+ blockNumber = blockHeaderNumber,
246
+ accountStartNonce = blockchainConfig.accountStartNonce,
247
+ stateRootHash = bl.getBlockByNumber(blockHeaderNumber).map(_.header.stateRoot),
248
+ noEmptyAccounts = EvmConfig .forBlock(blockHeaderNumber, blockchainConfig).noEmptyAccounts,
249
+ ethCompatibleStorage = blockchainConfig.ethCompatibleStorage
250
+ )
251
+ }
252
+
253
+ def startPeer (): Task [Unit ] = {
254
+ for {
255
+ _ <- Task {
256
+ peerManager ! PeerManagerActor .StartConnecting
257
+ server ! ServerActor .StartServer (listenAddress)
258
+ }
259
+ _ <- retryUntilWithDelay(Task (nodeStatusHolder.get()), 1 .second, 5 ) {status =>
260
+ status.serverStatus == Listening (listenAddress)
261
+ }
262
+ } yield ()
263
+ }
264
+
265
+ def shutdown (): Task [Unit ] = {
266
+ Task .deferFuture(system.terminate()).map(_ => ())
267
+ }
268
+
269
+ def connectToPeers (nodes : Set [DiscoveryNodeInfo ]): Task [Unit ] = {
270
+ for {
271
+ _ <- Task {
272
+ peerManager ! DiscoveredNodesInfo (nodes)
273
+ }
274
+ _ <- retryUntilWithDelay(Task (storagesInstance.storages.knownNodesStorage.getKnownNodes()), 1 .second, 5 ){ knownNodes =>
275
+ val requestedNodes = nodes.map(_.node.id)
276
+ val currentNodes = knownNodes.map(Node .fromUri).map(_.id)
277
+ requestedNodes.subsetOf(currentNodes)
278
+ }
279
+ } yield ()
280
+ }
281
+
282
+ import akka .pattern .ask
283
+ def getHandshakedPeers : Task [PeerManagerActor .Peers ] = {
284
+ Task .deferFutureAction{s =>
285
+ implicit val ec = s
286
+ (peerManager ? PeerManagerActor .GetPeers ).mapTo[PeerManagerActor .Peers ]
287
+ }
288
+ }
289
+
290
+ def saveNBlocks (n : Int ) = Task {
291
+ val lastBlock = bl.getBestBlock()
292
+ val chain = generateBlockChain(lastBlock, n)
293
+ chain.foreach(block => bl.save(block, Seq (), block.header.difficulty, true ))
294
+ }
295
+
296
+ def startFastSync (): Task [Unit ] = Task {
297
+ fastSync ! FastSync .Start
298
+ }
299
+
300
+ def waitForFastSyncFinish (): Task [Boolean ] = {
301
+ retryUntilWithDelay(Task (storagesInstance.storages.appStateStorage.isFastSyncDone()), 1 .second, 30 ){ isDone =>
302
+ isDone
303
+ }
304
+ }
305
+ }
306
+
307
+ object FakePeer {
308
+ def startFakePeer (peerName : String ): Task [FakePeer ] = {
309
+ for {
310
+ peer <- Task (new FakePeer (peerName)).memoizeOnSuccess
311
+ _ <- peer.startPeer()
312
+ } yield peer
313
+ }
314
+
315
+ def start1FakePeerRes (): Resource [Task , FakePeer ] = {
316
+ Resource .make {
317
+ startFakePeer(" Peer1" )
318
+ } { peer =>
319
+ peer.shutdown()
320
+ }
321
+ }
322
+
323
+ def start2FakePeersRes () = {
324
+ Resource .make {
325
+ Task .parZip2(startFakePeer(" Peer1" ), startFakePeer(" Peer2" ))
326
+ } { case (peer, peer1) => Task .parMap2(peer.shutdown(), peer1.shutdown())((_ ,_)=> ())}
327
+ }
328
+
329
+ def start3FakePeersRes () = {
330
+ Resource .make {
331
+ Task .parZip3( startFakePeer(" Peer1" ), startFakePeer(" Peer2" ), startFakePeer(" Peer3" ))
332
+ } { case (peer, peer1, peer2) => Task .parMap3(peer.shutdown(), peer1.shutdown(), peer2.shutdown())((_ ,_, _)=> ())}
333
+ }
334
+ }
335
+ }
0 commit comments