Skip to content

[ETCM-454] Migrate Scala Futures to Monix Tasks #855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package io.iohk.ethereum.ets.blockchain

import java.util.concurrent.Executors

import akka.actor.ActorSystem
import io.iohk.ethereum.domain.Block
import io.iohk.ethereum.ets.common.TestOptions
import io.iohk.ethereum.extvm.ExtVMInterface
import io.iohk.ethereum.ledger.Ledger.VMImpl
import io.iohk.ethereum.nodebuilder.VmSetup
import io.iohk.ethereum.utils.{Config, Logger, VmConfig}
import monix.eval.Task
import monix.execution.Scheduler
import org.scalatest.{Args, BeforeAndAfterAll, Status}
import org.scalatest.freespec.AnyFreeSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

object BlockchainSuite {
implicit lazy val actorSystem: ActorSystem = ActorSystem("mantis_system")
implicit val testContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
implicit val testContext: Scheduler = Scheduler.fixedPool("blockchain-suite-pool", 4)
lazy val extvm: VMImpl = VmSetup.vm(VmConfig(Config.config), Config.blockchains.blockchainConfig, testMode = true)
}

Expand Down Expand Up @@ -94,9 +91,9 @@ class BlockchainSuite extends AnyFreeSpec with Matchers with BeforeAndAfterAll w

import setup._

def importBlocks(blocks: List[Block], importedBlocks: List[Block] = Nil): Future[List[Block]] = {
def importBlocks(blocks: List[Block], importedBlocks: List[Block] = Nil): Task[List[Block]] = {
if (blocks.isEmpty) {
Future.successful(importedBlocks)
Task.now(importedBlocks)
} else {
val blockToImport = blocks.head
ledger.importBlock(blockToImport).flatMap { _ =>
Expand All @@ -109,9 +106,9 @@ class BlockchainSuite extends AnyFreeSpec with Matchers with BeforeAndAfterAll w

val blocksToProcess = getBlocks(scenario.blocks)

val invalidBlocks = getBlocks(getInvalid)
getBlocks(getInvalid)

val ready = Await.result(importBlocks(blocksToProcess), Duration.Inf)
importBlocks(blocksToProcess).runSyncUnsafe()

val lastBlock = getBestBlock

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.iohk.ethereum.ets.blockchain

import akka.util.ByteString
import java.util.concurrent.Executors

import io.iohk.ethereum.consensus.Protocol.NoAdditionalEthashData
import io.iohk.ethereum.consensus.ethash.EthashConsensus
import io.iohk.ethereum.consensus.ethash.validators.ValidatorsExecutor
Expand All @@ -18,12 +16,12 @@ import io.iohk.ethereum.ledger._
import io.iohk.ethereum.mpt.MerklePatriciaTrie
import io.iohk.ethereum.utils.BigIntExtensionMethods._
import io.iohk.ethereum.utils.{BlockchainConfig, Config}
import monix.execution.Scheduler
import org.bouncycastle.util.encoders.Hex
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}

object ScenarioSetup {
val testContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
val testContext = Scheduler.fixedPool("scenario-setup-pool", 4)
val specificConfig = ethash.EthashConfig(Config.config)
val fullConfig = FullConsensusConfig(ConsensusConfig(Config.config), specificConfig)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import io.iohk.ethereum.transactions.PendingTransactionsManager
import io.iohk.ethereum.utils._
import io.iohk.ethereum.vm.EvmConfig
import monix.eval.Task

import scala.concurrent.ExecutionContext
import monix.execution.Scheduler
import scala.concurrent.duration._
object RegularSyncItSpecUtils {

Expand Down Expand Up @@ -56,7 +55,7 @@ object RegularSyncItSpecUtils {
system.actorOf(PeersClient.props(etcPeerManager, peerEventBus, testSyncConfig, system.scheduler), "peers-client")

lazy val ledger: Ledger =
new LedgerImpl(bl, blockchainConfig, syncConfig, buildEthashConsensus, ExecutionContext.global)
new LedgerImpl(bl, blockchainConfig, syncConfig, buildEthashConsensus, Scheduler.global)

lazy val ommersPool: ActorRef = system.actorOf(OmmersPool.props(bl, 1), "ommers-pool")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import akka.pattern.pipe
import io.iohk.ethereum.blockchain.sync.fast.FastSync.SyncState
import io.iohk.ethereum.blockchain.sync.fast.StateStorageActor.GetStorage
import io.iohk.ethereum.db.storage.FastSyncStateStorage

import scala.concurrent.Future
import monix.eval.Task
import monix.execution.Scheduler
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -43,8 +43,9 @@ class StateStorageActor extends Actor with ActorLogging {
}

private def persistState(storage: FastSyncStateStorage, syncState: SyncState): Unit = {
import context.dispatcher
val persistingQueues: Future[Try[FastSyncStateStorage]] = Future {
implicit val scheduler: Scheduler = Scheduler(context.dispatcher)

val persistingQueues: Task[Try[FastSyncStateStorage]] = Task {
lazy val result = Try { storage.putSyncState(syncState) }
if (log.isDebugEnabled) {
val now = System.currentTimeMillis()
Expand All @@ -56,7 +57,7 @@ class StateStorageActor extends Actor with ActorLogging {
result
}
}
persistingQueues pipeTo self
persistingQueues.runToFuture pipeTo self
context become busy(storage, None)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import io.iohk.ethereum.network.p2p.messages.Codes
import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData}
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig
import monix.eval.Task
import monix.execution.Scheduler

import scala.concurrent.Future
import scala.concurrent.duration._

class SyncStateSchedulerActor(
Expand Down Expand Up @@ -211,7 +210,7 @@ class SyncStateSchedulerActor(
)
val (requests, newState1) = newState.assignTasksToPeers(peers, syncConfig.nodesPerRequest)
requests.foreach(req => requestNodes(req))
Future(processNodes(newState1, nodes)).pipeTo(self)
Task(processNodes(newState1, nodes)).runToFuture.pipeTo(self)
context.become(syncing(newState1))

case (Some((nodes, newState)), None) =>
Expand All @@ -220,7 +219,7 @@ class SyncStateSchedulerActor(
newState.numberOfRemainingRequests
)
// we do not have any peers and cannot assign new tasks, but we can still process remaining requests
Future(processNodes(newState, nodes)).pipeTo(self)
Task(processNodes(newState, nodes)).runToFuture.pipeTo(self)
context.become(syncing(newState))

case (None, Some(peers)) =>
Expand Down Expand Up @@ -264,7 +263,7 @@ class SyncStateSchedulerActor(
} else {
log.debug("Response received while idle. Initiating response processing")
val newState = currentState.initProcessing
Future(processNodes(newState, result)).pipeTo(self)
Task(processNodes(newState, result)).runToFuture.pipeTo(self)
context.become(syncing(newState))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package io.iohk.ethereum.blockchain.sync.regular

import akka.actor.Status.Failure
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Scheduler}
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.pattern.{ask, pipe}
import akka.util.{ByteString, Timeout}
import cats.data.NonEmptyList
import cats.instances.future._
import cats.instances.option._
import cats.syntax.either._
import io.iohk.ethereum.consensus.validators.BlockValidator
Expand All @@ -28,25 +27,24 @@ import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData}
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.utils.FunctorOps._
import io.iohk.ethereum.utils.FutureOps._
import monix.eval.Task
import monix.execution.{Scheduler => MonixScheduler}
import mouse.all._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

class BlockFetcher(
val peersClient: ActorRef,
val peerEventBus: ActorRef,
val supervisor: ActorRef,
val syncConfig: SyncConfig,
val blockValidator: BlockValidator,
implicit val scheduler: Scheduler
val blockValidator: BlockValidator
) extends Actor
with ActorLogging {

import BlockFetcher._

implicit val ec: ExecutionContext = context.dispatcher
implicit val ec: MonixScheduler = MonixScheduler(context.dispatcher)
implicit val timeout: Timeout = syncConfig.peerResponseTimeout + 2.second // some margin for actor communication

override def receive: Receive = idle()
Expand Down Expand Up @@ -279,10 +277,10 @@ class BlockFetcher(
val blockNr = state.nextBlockToFetch
val amount = syncConfig.blockHeadersPerRequest

fetchHeadersFrom(blockNr, amount) pipeTo self
fetchHeadersFrom(blockNr, amount).runToFuture pipeTo self
}

private def fetchHeadersFrom(blockNr: BigInt, amount: Int): Future[Any] = {
private def fetchHeadersFrom(blockNr: BigInt, amount: Int): Task[Any] = {
log.debug("Fetching headers from block {}", blockNr)
val msg = GetBlockHeaders(Left(blockNr), amount, skip = 0, reverse = false)

Expand All @@ -301,37 +299,38 @@ class BlockFetcher(
log.debug("Fetching bodies")

val hashes = state.takeHashes(syncConfig.blockBodiesPerRequest)
requestBlockBodies(hashes) pipeTo self
requestBlockBodies(hashes).runToFuture pipeTo self
}

private def fetchStateNode(hash: ByteString, originalSender: ActorRef, state: BlockFetcherState): Unit = {
log.debug("Fetching state node for hash {}", ByteStringUtils.hash2string(hash))
requestStateNode(hash, originalSender) pipeTo self
requestStateNode(hash).runToFuture pipeTo self
val newState = state.fetchingStateNode(hash, originalSender)

context become started(newState)
}

private def requestBlockHeaders(msg: GetBlockHeaders): Future[Any] =
private def requestBlockHeaders(msg: GetBlockHeaders): Task[Any] =
makeRequest(Request.create(msg, BestPeer), RetryHeadersRequest)
.flatMap {
case Response(_, BlockHeaders(headers)) if headers.isEmpty =>
log.debug("Empty BlockHeaders response. Retry in {}", syncConfig.syncRetryInterval)
Future.successful(RetryHeadersRequest).delayedBy(syncConfig.syncRetryInterval)
case res => Future.successful(res)
Task.now(RetryHeadersRequest).delayResult(syncConfig.syncRetryInterval)
case res => Task.now(res)
}

private def requestBlockBodies(hashes: Seq[ByteString]): Future[Any] =
private def requestBlockBodies(hashes: Seq[ByteString]): Task[Any] =
makeRequest(Request.create(GetBlockBodies(hashes), BestPeer), RetryBodiesRequest)

private def requestStateNode(hash: ByteString, requestor: ActorRef): Future[Any] =
private def requestStateNode(hash: ByteString): Task[Any] =
makeRequest(Request.create(GetNodeData(List(hash)), BestPeer), RetryFetchStateNode)

private def makeRequest(request: Request[_], responseFallback: FetchMsg): Future[Any] =
(peersClient ? request)
private def makeRequest(request: Request[_], responseFallback: FetchMsg): Task[Any] =
Task
.deferFuture(peersClient ? request)
.tap(blacklistPeerOnFailedRequest)
.flatMap(handleRequestResult(responseFallback))
.recover { case error =>
.onErrorHandle { error =>
log.error(error, "Unexpected error while doing a request")
responseFallback
}
Expand All @@ -341,16 +340,17 @@ class BlockFetcher(
case _ => ()
}

private def handleRequestResult(fallback: FetchMsg)(msg: Any): Future[Any] = msg match {
private def handleRequestResult(fallback: FetchMsg)(msg: Any): Task[Any] = msg match {
case failed: RequestFailed =>
log.debug("Request failed due to {}", failed)
Future.successful(fallback)
Task.now(fallback)
case NoSuitablePeer =>
Task.now(fallback).delayExecution(syncConfig.syncRetryInterval)
case Failure(cause) =>
log.error(cause, "Unexpected error on the request result")
Future.successful(fallback)
case NoSuitablePeer =>
Future.successful(fallback).delayedBy(syncConfig.syncRetryInterval)
case m => Future.successful(m)
Task.now(fallback)
case m =>
Task.now(m)
}
}

Expand All @@ -361,10 +361,9 @@ object BlockFetcher {
peerEventBus: ActorRef,
supervisor: ActorRef,
syncConfig: SyncConfig,
blockValidator: BlockValidator,
scheduler: Scheduler
blockValidator: BlockValidator
): Props =
Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, blockValidator, scheduler))
Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, blockValidator))

sealed trait FetchMsg
case class Start(importer: ActorRef, fromBlock: BigInt) extends FetchMsg
Expand Down
Loading