Skip to content

Commit 26b2f64

Browse files
author
Michał Mrożek
authored
Merge branch 'develop' into add-encypt-key-command
2 parents c944712 + 3f1d4df commit 26b2f64

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+388
-424
lines changed

src/ets/scala/io/iohk/ethereum/ets/blockchain/BlockchainSuite.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,21 @@
11
package io.iohk.ethereum.ets.blockchain
22

3-
import java.util.concurrent.Executors
4-
53
import akka.actor.ActorSystem
64
import io.iohk.ethereum.domain.Block
75
import io.iohk.ethereum.ets.common.TestOptions
86
import io.iohk.ethereum.extvm.ExtVMInterface
97
import io.iohk.ethereum.ledger.Ledger.VMImpl
108
import io.iohk.ethereum.nodebuilder.VmSetup
119
import io.iohk.ethereum.utils.{Config, Logger, VmConfig}
10+
import monix.eval.Task
11+
import monix.execution.Scheduler
1212
import org.scalatest.{Args, BeforeAndAfterAll, Status}
1313
import org.scalatest.freespec.AnyFreeSpec
1414
import org.scalatest.matchers.should.Matchers
1515

16-
import scala.concurrent.duration.Duration
17-
import scala.concurrent.{Await, ExecutionContext, Future}
18-
1916
object BlockchainSuite {
2017
implicit lazy val actorSystem: ActorSystem = ActorSystem("mantis_system")
21-
implicit val testContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
18+
implicit val testContext: Scheduler = Scheduler.fixedPool("blockchain-suite-pool", 4)
2219
lazy val extvm: VMImpl = VmSetup.vm(VmConfig(Config.config), Config.blockchains.blockchainConfig, testMode = true)
2320
}
2421

@@ -94,9 +91,9 @@ class BlockchainSuite extends AnyFreeSpec with Matchers with BeforeAndAfterAll w
9491

9592
import setup._
9693

97-
def importBlocks(blocks: List[Block], importedBlocks: List[Block] = Nil): Future[List[Block]] = {
94+
def importBlocks(blocks: List[Block], importedBlocks: List[Block] = Nil): Task[List[Block]] = {
9895
if (blocks.isEmpty) {
99-
Future.successful(importedBlocks)
96+
Task.now(importedBlocks)
10097
} else {
10198
val blockToImport = blocks.head
10299
ledger.importBlock(blockToImport).flatMap { _ =>
@@ -109,9 +106,9 @@ class BlockchainSuite extends AnyFreeSpec with Matchers with BeforeAndAfterAll w
109106

110107
val blocksToProcess = getBlocks(scenario.blocks)
111108

112-
val invalidBlocks = getBlocks(getInvalid)
109+
getBlocks(getInvalid)
113110

114-
val ready = Await.result(importBlocks(blocksToProcess), Duration.Inf)
111+
importBlocks(blocksToProcess).runSyncUnsafe()
115112

116113
val lastBlock = getBestBlock
117114

src/ets/scala/io/iohk/ethereum/ets/blockchain/ScenarioSetup.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package io.iohk.ethereum.ets.blockchain
22

33
import akka.util.ByteString
4-
import java.util.concurrent.Executors
5-
64
import io.iohk.ethereum.consensus.Protocol.NoAdditionalEthashData
75
import io.iohk.ethereum.consensus.ethash.EthashConsensus
86
import io.iohk.ethereum.consensus.ethash.validators.ValidatorsExecutor
@@ -18,12 +16,12 @@ import io.iohk.ethereum.ledger._
1816
import io.iohk.ethereum.mpt.MerklePatriciaTrie
1917
import io.iohk.ethereum.utils.BigIntExtensionMethods._
2018
import io.iohk.ethereum.utils.{BlockchainConfig, Config}
19+
import monix.execution.Scheduler
2120
import org.bouncycastle.util.encoders.Hex
22-
import scala.concurrent.ExecutionContext
2321
import scala.util.{Failure, Success, Try}
2422

2523
object ScenarioSetup {
26-
val testContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
24+
val testContext = Scheduler.fixedPool("scenario-setup-pool", 4)
2725
val specificConfig = ethash.EthashConfig(Config.config)
2826
val fullConfig = FullConsensusConfig(ConsensusConfig(Config.config), specificConfig)
2927

src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ import io.iohk.ethereum.transactions.PendingTransactionsManager
2222
import io.iohk.ethereum.utils._
2323
import io.iohk.ethereum.vm.EvmConfig
2424
import monix.eval.Task
25-
26-
import scala.concurrent.ExecutionContext
25+
import monix.execution.Scheduler
2726
import scala.concurrent.duration._
2827
object RegularSyncItSpecUtils {
2928

@@ -56,7 +55,7 @@ object RegularSyncItSpecUtils {
5655
system.actorOf(PeersClient.props(etcPeerManager, peerEventBus, testSyncConfig, system.scheduler), "peers-client")
5756

5857
lazy val ledger: Ledger =
59-
new LedgerImpl(bl, blockchainConfig, syncConfig, buildEthashConsensus, ExecutionContext.global)
58+
new LedgerImpl(bl, blockchainConfig, syncConfig, buildEthashConsensus, Scheduler.global)
6059

6160
lazy val ommersPool: ActorRef = system.actorOf(OmmersPool.props(bl, 1), "ommers-pool")
6261

src/main/scala/io/iohk/ethereum/blockchain/sync/fast/StateStorageActor.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ import akka.pattern.pipe
55
import io.iohk.ethereum.blockchain.sync.fast.FastSync.SyncState
66
import io.iohk.ethereum.blockchain.sync.fast.StateStorageActor.GetStorage
77
import io.iohk.ethereum.db.storage.FastSyncStateStorage
8-
9-
import scala.concurrent.Future
8+
import monix.eval.Task
9+
import monix.execution.Scheduler
1010
import scala.util.{Failure, Success, Try}
1111

1212
/**
@@ -43,8 +43,9 @@ class StateStorageActor extends Actor with ActorLogging {
4343
}
4444

4545
private def persistState(storage: FastSyncStateStorage, syncState: SyncState): Unit = {
46-
import context.dispatcher
47-
val persistingQueues: Future[Try[FastSyncStateStorage]] = Future {
46+
implicit val scheduler: Scheduler = Scheduler(context.dispatcher)
47+
48+
val persistingQueues: Task[Try[FastSyncStateStorage]] = Task {
4849
lazy val result = Try { storage.putSyncState(syncState) }
4950
if (log.isDebugEnabled) {
5051
val now = System.currentTimeMillis()
@@ -56,7 +57,7 @@ class StateStorageActor extends Actor with ActorLogging {
5657
result
5758
}
5859
}
59-
persistingQueues pipeTo self
60+
persistingQueues.runToFuture pipeTo self
6061
context become busy(storage, None)
6162
}
6263

src/main/scala/io/iohk/ethereum/blockchain/sync/fast/SyncStateSchedulerActor.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@ import io.iohk.ethereum.network.p2p.messages.Codes
1919
import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData}
2020
import io.iohk.ethereum.utils.ByteStringUtils
2121
import io.iohk.ethereum.utils.Config.SyncConfig
22+
import monix.eval.Task
2223
import monix.execution.Scheduler
23-
24-
import scala.concurrent.Future
2524
import scala.concurrent.duration._
2625

2726
class SyncStateSchedulerActor(
@@ -211,7 +210,7 @@ class SyncStateSchedulerActor(
211210
)
212211
val (requests, newState1) = newState.assignTasksToPeers(peers, syncConfig.nodesPerRequest)
213212
requests.foreach(req => requestNodes(req))
214-
Future(processNodes(newState1, nodes)).pipeTo(self)
213+
Task(processNodes(newState1, nodes)).runToFuture.pipeTo(self)
215214
context.become(syncing(newState1))
216215

217216
case (Some((nodes, newState)), None) =>
@@ -220,7 +219,7 @@ class SyncStateSchedulerActor(
220219
newState.numberOfRemainingRequests
221220
)
222221
// we do not have any peers and cannot assign new tasks, but we can still process remaining requests
223-
Future(processNodes(newState, nodes)).pipeTo(self)
222+
Task(processNodes(newState, nodes)).runToFuture.pipeTo(self)
224223
context.become(syncing(newState))
225224

226225
case (None, Some(peers)) =>
@@ -264,7 +263,7 @@ class SyncStateSchedulerActor(
264263
} else {
265264
log.debug("Response received while idle. Initiating response processing")
266265
val newState = currentState.initProcessing
267-
Future(processNodes(newState, result)).pipeTo(self)
266+
Task(processNodes(newState, result)).runToFuture.pipeTo(self)
268267
context.become(syncing(newState))
269268
}
270269

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package io.iohk.ethereum.blockchain.sync.regular
22

33
import akka.actor.Status.Failure
4-
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Scheduler}
4+
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
55
import akka.pattern.{ask, pipe}
66
import akka.util.{ByteString, Timeout}
77
import cats.data.NonEmptyList
8-
import cats.instances.future._
98
import cats.instances.option._
109
import cats.syntax.either._
1110
import io.iohk.ethereum.consensus.validators.BlockValidator
@@ -28,25 +27,24 @@ import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData}
2827
import io.iohk.ethereum.utils.ByteStringUtils
2928
import io.iohk.ethereum.utils.Config.SyncConfig
3029
import io.iohk.ethereum.utils.FunctorOps._
31-
import io.iohk.ethereum.utils.FutureOps._
30+
import monix.eval.Task
31+
import monix.execution.{Scheduler => MonixScheduler}
3232
import mouse.all._
3333

3434
import scala.concurrent.duration._
35-
import scala.concurrent.{ExecutionContext, Future}
3635

3736
class BlockFetcher(
3837
val peersClient: ActorRef,
3938
val peerEventBus: ActorRef,
4039
val supervisor: ActorRef,
4140
val syncConfig: SyncConfig,
42-
val blockValidator: BlockValidator,
43-
implicit val scheduler: Scheduler
41+
val blockValidator: BlockValidator
4442
) extends Actor
4543
with ActorLogging {
4644

4745
import BlockFetcher._
4846

49-
implicit val ec: ExecutionContext = context.dispatcher
47+
implicit val ec: MonixScheduler = MonixScheduler(context.dispatcher)
5048
implicit val timeout: Timeout = syncConfig.peerResponseTimeout + 2.second // some margin for actor communication
5149

5250
override def receive: Receive = idle()
@@ -279,10 +277,10 @@ class BlockFetcher(
279277
val blockNr = state.nextBlockToFetch
280278
val amount = syncConfig.blockHeadersPerRequest
281279

282-
fetchHeadersFrom(blockNr, amount) pipeTo self
280+
fetchHeadersFrom(blockNr, amount).runToFuture pipeTo self
283281
}
284282

285-
private def fetchHeadersFrom(blockNr: BigInt, amount: Int): Future[Any] = {
283+
private def fetchHeadersFrom(blockNr: BigInt, amount: Int): Task[Any] = {
286284
log.debug("Fetching headers from block {}", blockNr)
287285
val msg = GetBlockHeaders(Left(blockNr), amount, skip = 0, reverse = false)
288286

@@ -301,37 +299,38 @@ class BlockFetcher(
301299
log.debug("Fetching bodies")
302300

303301
val hashes = state.takeHashes(syncConfig.blockBodiesPerRequest)
304-
requestBlockBodies(hashes) pipeTo self
302+
requestBlockBodies(hashes).runToFuture pipeTo self
305303
}
306304

307305
private def fetchStateNode(hash: ByteString, originalSender: ActorRef, state: BlockFetcherState): Unit = {
308306
log.debug("Fetching state node for hash {}", ByteStringUtils.hash2string(hash))
309-
requestStateNode(hash, originalSender) pipeTo self
307+
requestStateNode(hash).runToFuture pipeTo self
310308
val newState = state.fetchingStateNode(hash, originalSender)
311309

312310
context become started(newState)
313311
}
314312

315-
private def requestBlockHeaders(msg: GetBlockHeaders): Future[Any] =
313+
private def requestBlockHeaders(msg: GetBlockHeaders): Task[Any] =
316314
makeRequest(Request.create(msg, BestPeer), RetryHeadersRequest)
317315
.flatMap {
318316
case Response(_, BlockHeaders(headers)) if headers.isEmpty =>
319317
log.debug("Empty BlockHeaders response. Retry in {}", syncConfig.syncRetryInterval)
320-
Future.successful(RetryHeadersRequest).delayedBy(syncConfig.syncRetryInterval)
321-
case res => Future.successful(res)
318+
Task.now(RetryHeadersRequest).delayResult(syncConfig.syncRetryInterval)
319+
case res => Task.now(res)
322320
}
323321

324-
private def requestBlockBodies(hashes: Seq[ByteString]): Future[Any] =
322+
private def requestBlockBodies(hashes: Seq[ByteString]): Task[Any] =
325323
makeRequest(Request.create(GetBlockBodies(hashes), BestPeer), RetryBodiesRequest)
326324

327-
private def requestStateNode(hash: ByteString, requestor: ActorRef): Future[Any] =
325+
private def requestStateNode(hash: ByteString): Task[Any] =
328326
makeRequest(Request.create(GetNodeData(List(hash)), BestPeer), RetryFetchStateNode)
329327

330-
private def makeRequest(request: Request[_], responseFallback: FetchMsg): Future[Any] =
331-
(peersClient ? request)
328+
private def makeRequest(request: Request[_], responseFallback: FetchMsg): Task[Any] =
329+
Task
330+
.deferFuture(peersClient ? request)
332331
.tap(blacklistPeerOnFailedRequest)
333332
.flatMap(handleRequestResult(responseFallback))
334-
.recover { case error =>
333+
.onErrorHandle { error =>
335334
log.error(error, "Unexpected error while doing a request")
336335
responseFallback
337336
}
@@ -341,16 +340,17 @@ class BlockFetcher(
341340
case _ => ()
342341
}
343342

344-
private def handleRequestResult(fallback: FetchMsg)(msg: Any): Future[Any] = msg match {
343+
private def handleRequestResult(fallback: FetchMsg)(msg: Any): Task[Any] = msg match {
345344
case failed: RequestFailed =>
346345
log.debug("Request failed due to {}", failed)
347-
Future.successful(fallback)
346+
Task.now(fallback)
347+
case NoSuitablePeer =>
348+
Task.now(fallback).delayExecution(syncConfig.syncRetryInterval)
348349
case Failure(cause) =>
349350
log.error(cause, "Unexpected error on the request result")
350-
Future.successful(fallback)
351-
case NoSuitablePeer =>
352-
Future.successful(fallback).delayedBy(syncConfig.syncRetryInterval)
353-
case m => Future.successful(m)
351+
Task.now(fallback)
352+
case m =>
353+
Task.now(m)
354354
}
355355
}
356356

@@ -361,10 +361,9 @@ object BlockFetcher {
361361
peerEventBus: ActorRef,
362362
supervisor: ActorRef,
363363
syncConfig: SyncConfig,
364-
blockValidator: BlockValidator,
365-
scheduler: Scheduler
364+
blockValidator: BlockValidator
366365
): Props =
367-
Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, blockValidator, scheduler))
366+
Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, blockValidator))
368367

369368
sealed trait FetchMsg
370369
case class Start(importer: ActorRef, fromBlock: BigInt) extends FetchMsg

0 commit comments

Comments
 (0)