Skip to content

[ETCM-177] Fix Ommers pool #724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class BlockImporter(
start()
}

private def idle: Receive = {
case Start => start()
private def idle: Receive = { case Start =>
start()
}

private def handleTopMessages(state: ImporterState, currentBehavior: Behavior): Receive = {
Expand All @@ -63,8 +63,6 @@ class BlockImporter(
case MinedBlock(block) =>
if (!state.importing) {
importMinedBlock(block, state)
} else {
ommersPool ! AddOmmers(block.header)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this plus how ommers pool previously behave, it was the Testnet issue.

}
case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing => importNewBlock(block, peerId, state)
case ImportDone(newBehavior) =>
Expand Down Expand Up @@ -96,9 +94,10 @@ class BlockImporter(
}

private def pickBlocks(state: ImporterState): Unit = {
val msg = state.resolvingBranchFrom.fold[BlockFetcher.FetchMsg](
BlockFetcher.PickBlocks(syncConfig.blocksBatchSize))(
from => BlockFetcher.StrictPickBlocks(from, startingBlockNumber))
val msg =
state.resolvingBranchFrom.fold[BlockFetcher.FetchMsg](BlockFetcher.PickBlocks(syncConfig.blocksBatchSize))(from =>
BlockFetcher.StrictPickBlocks(from, startingBlockNumber)
)

fetcher ! msg
}
Expand Down Expand Up @@ -142,8 +141,9 @@ class BlockImporter(
}
}

private def tryImportBlocks(blocks: List[Block], importedBlocks: List[Block] = Nil)(
implicit ec: ExecutionContext): Future[(List[Block], Option[Any])] =
private def tryImportBlocks(blocks: List[Block], importedBlocks: List[Block] = Nil)(implicit
ec: ExecutionContext
): Future[(List[Block], Option[Any])] =
if (blocks.isEmpty) {
Future.successful((importedBlocks, None))
} else {
Expand Down Expand Up @@ -290,9 +290,11 @@ object BlockImporter {
syncConfig: SyncConfig,
ommersPool: ActorRef,
broadcaster: ActorRef,
pendingTransactionsManager: ActorRef): Props =
pendingTransactionsManager: ActorRef
): Props =
Props(
new BlockImporter(fetcher, ledger, blockchain, syncConfig, ommersPool, broadcaster, pendingTransactionsManager))
new BlockImporter(fetcher, ledger, blockchain, syncConfig, ommersPool, broadcaster, pendingTransactionsManager)
)

type Behavior = ImporterState => Receive
type ImportFn = ImporterState => Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.iohk.ethereum.consensus.ethash

import akka.actor.ActorRef
import akka.pattern.ask
import akka.util.Timeout
import akka.util.{Timeout, ByteString}
import io.iohk.ethereum.consensus.blocks.PendingBlock
import io.iohk.ethereum.consensus.ethash.blocks.EthashBlockGenerator
import io.iohk.ethereum.domain.{Address, Block}
Expand All @@ -28,23 +28,21 @@ class EthashBlockCreator(
def getBlockForMining(parentBlock: Block, withTransactions: Boolean = true): Future[PendingBlock] = {
val transactions =
if (withTransactions) getTransactionsFromPool else Future.successful(PendingTransactionsResponse(Nil))
getOmmersFromPool(parentBlock.header.number + 1).zip(transactions).flatMap {
case (ommers, pendingTxs) =>
blockGenerator
.generateBlock(parentBlock, pendingTxs.pendingTransactions.map(_.stx.tx), coinbase, ommers.headers) match {
case Right(pb) => Future.successful(pb)
case Left(err) => Future.failed(new RuntimeException(s"Error while generating block for mining: $err"))
}
getOmmersFromPool(parentBlock.hash).zip(transactions).flatMap { case (ommers, pendingTxs) =>
blockGenerator
.generateBlock(parentBlock, pendingTxs.pendingTransactions.map(_.stx.tx), coinbase, ommers.headers) match {
case Right(pb) => Future.successful(pb)
case Left(err) => Future.failed(new RuntimeException(s"Error while generating block for mining: $err"))
}
}
}

private def getOmmersFromPool(blockNumber: BigInt): Future[OmmersPool.Ommers] = {
(ommersPool ? OmmersPool.GetOmmers(blockNumber))(Timeout(miningConfig.ommerPoolQueryTimeout))
private def getOmmersFromPool(parentBlockHash: ByteString): Future[OmmersPool.Ommers] = {
(ommersPool ? OmmersPool.GetOmmers(parentBlockHash))(Timeout(miningConfig.ommerPoolQueryTimeout))
.mapTo[OmmersPool.Ommers]
.recover {
case ex =>
log.error("Failed to get ommers, mining block with empty ommers list", ex)
OmmersPool.Ommers(Nil)
.recover { case ex =>
log.error("Failed to get ommers, mining block with empty ommers list", ex)
OmmersPool.Ommers(Nil)
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/io/iohk/ethereum/jsonrpc/EthService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ class EthService(
import io.iohk.ethereum.consensus.ethash.EthashUtils.{epoch, seed}

val bestBlock = blockchain.getBestBlock()
getOmmersFromPool(bestBlock.header.number + 1).zip(getTransactionsFromPool).map { case (ommers, pendingTxs) =>
getOmmersFromPool(bestBlock.hash).zip(getTransactionsFromPool).map { case (ommers, pendingTxs) =>
val blockGenerator = ethash.blockGenerator
blockGenerator.generateBlock(
bestBlock,
Expand All @@ -530,12 +530,12 @@ class EthService(
}
})(Future.successful(Left(JsonRpcErrors.ConsensusIsNotEthash)))

private def getOmmersFromPool(blockNumber: BigInt): Future[OmmersPool.Ommers] =
private def getOmmersFromPool(parentBlockHash: ByteString): Future[OmmersPool.Ommers] =
consensus.ifEthash(ethash => {
val miningConfig = ethash.config.specific
implicit val timeout: Timeout = Timeout(miningConfig.ommerPoolQueryTimeout)

(ommersPool ? OmmersPool.GetOmmers(blockNumber))
(ommersPool ? OmmersPool.GetOmmers(parentBlockHash))
.mapTo[OmmersPool.Ommers]
.recover { case ex =>
log.error("failed to get ommer, mining block with empty ommers list", ex)
Expand Down
70 changes: 56 additions & 14 deletions src/main/scala/io/iohk/ethereum/ommers/OmmersPool.scala
Original file line number Diff line number Diff line change
@@ -1,37 +1,79 @@
package io.iohk.ethereum.ommers

import akka.actor.{Actor, Props}
import akka.util.ByteString
import akka.actor.{Actor, ActorLogging, Props}
import org.bouncycastle.util.encoders.Hex
import io.iohk.ethereum.domain.{BlockHeader, Blockchain}
import io.iohk.ethereum.ommers.OmmersPool.{AddOmmers, GetOmmers, RemoveOmmers}
import scala.annotation.tailrec

class OmmersPool(blockchain: Blockchain, ommersPoolSize: Int) extends Actor {
class OmmersPool(blockchain: Blockchain, ommersPoolSize: Int, ommerGenerationLimit: Int, returnedOmmersSizeLimit: Int)
extends Actor
with ActorLogging {

var ommersPool: Seq[BlockHeader] = Nil

val ommerGenerationLimit: Int = 6 //Stated on section 11.1, eq. (143) of the YP
val ommerSizeLimit: Int = 2

override def receive: Receive = {
case AddOmmers(ommers) =>
ommersPool = (ommers ++ ommersPool).take(ommersPoolSize).distinct
logStatus(event = "Ommers after add", ommers = ommersPool)

case RemoveOmmers(ommers) =>
val toDelete = ommers.map(_.hash).toSet
ommersPool = ommersPool.filter(b => !toDelete.contains(b.hash))
logStatus(event = "Ommers after remove", ommers = ommersPool)

case GetOmmers(blockNumber) =>
val ommers = ommersPool.filter { b =>
val generationDifference = blockNumber - b.number
generationDifference > 0 && generationDifference <= ommerGenerationLimit
}.filter { b =>
blockchain.getBlockHeaderByHash(b.parentHash).isDefined
}.take(ommerSizeLimit)
case GetOmmers(parentBlockHash) =>
val ancestors = collectAncestors(parentBlockHash, ommerGenerationLimit)
val ommers = ommersPool
.filter { b =>
val notAncestor = ancestors.find(_.hash == b.hash).isEmpty
ancestors.find(_.hash == b.parentHash).isDefined && notAncestor
}
.take(returnedOmmersSizeLimit)
logStatus(event = s"Ommers given parent block ${Hex.toHexString(parentBlockHash.toArray)}", ommers)
sender() ! OmmersPool.Ommers(ommers)
}

private def collectAncestors(parentHash: ByteString, generationLimit: Int): List[BlockHeader] = {
@tailrec
def rec(hash: ByteString, limit: Int, acc: List[BlockHeader]): List[BlockHeader] = {
if (limit > 0) {
blockchain.getBlockHeaderByHash(hash) match {
case Some(bh) => rec(bh.parentHash, limit - 1, acc :+ bh)
case None => acc
}
} else {
acc
}
}
rec(parentHash, generationLimit, List.empty)
}

private def logStatus(event: String, ommers: Seq[BlockHeader]): Unit = {
lazy val ommersAsString: Seq[String] = ommers.map { bh => s"[number = ${bh.number}, hash = ${bh.hashAsHexString}]" }
log.debug(s"$event ${ommersAsString}")
}
}

object OmmersPool {
def props(blockchain: Blockchain, ommersPoolSize: Int): Props = Props(new OmmersPool(blockchain, ommersPoolSize))

/**
* As is stated on section 11.1, eq. (143) of the YP
*
* @param ommerGenerationLimit should be === 6
* @param returnedOmmersSizeLimit should be === 2
*
* ^ Probably not worthy but those params could be placed in consensus config.
*/
def props(
blockchain: Blockchain,
ommersPoolSize: Int,
ommerGenerationLimit: Int = 6,
returnedOmmersSizeLimit: Int = 2
): Props = Props(
new OmmersPool(blockchain, ommersPoolSize, ommerGenerationLimit, returnedOmmersSizeLimit)
)

case class AddOmmers(ommers: List[BlockHeader])

Expand All @@ -45,7 +87,7 @@ object OmmersPool {
def apply(b: BlockHeader*): RemoveOmmers = RemoveOmmers(b.toList)
}

case class GetOmmers(blockNumber: BigInt)
case class GetOmmers(parentBlockHash: ByteString)

case class Ommers(headers: Seq[BlockHeader])
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class EthServiceSpec
pendingTransactionsManager.expectMsg(PendingTransactionsManager.GetPendingTransactions)
pendingTransactionsManager.reply(PendingTransactionsManager.PendingTransactionsResponse(Nil))

ommersPool.expectMsg(OmmersPool.GetOmmers(1))
ommersPool.expectMsg(OmmersPool.GetOmmers(parentBlock.hash))
ommersPool.reply(OmmersPool.Ommers(Nil))

response.futureValue shouldEqual Right(GetWorkResponse(powHash, seedHash, target))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ class JsonRpcControllerSpec
val blockToRequest = Block(Fixtures.Blocks.Block3125369.header, Fixtures.Blocks.Block3125369.body)
val blockTd = blockToRequest.header.difficulty

blockchain.storeBlock(blockToRequest)
blockchain
.storeBlock(blockToRequest)
.and(blockchain.storeTotalDifficulty(blockToRequest.header.hash, blockTd))
.commit()

Expand All @@ -250,7 +251,8 @@ class JsonRpcControllerSpec
val blockToRequest = Block(Fixtures.Blocks.Block3125369.header, Fixtures.Blocks.Block3125369.body)
val blockTd = blockToRequest.header.difficulty

blockchain.storeBlock(blockToRequest)
blockchain
.storeBlock(blockToRequest)
.and(blockchain.storeTotalDifficulty(blockToRequest.header.hash, blockTd))
.commit()

Expand Down Expand Up @@ -628,7 +630,7 @@ class JsonRpcControllerSpec
pendingTransactionsManager.expectMsg(PendingTransactionsManager.GetPendingTransactions)
pendingTransactionsManager.reply(PendingTransactionsManager.PendingTransactionsResponse(Nil))

ommersPool.expectMsg(OmmersPool.GetOmmers(2))
ommersPool.expectMsg(OmmersPool.GetOmmers(parentBlock.hash))
ommersPool.reply(Ommers(Nil))

val response = result.futureValue
Expand Down Expand Up @@ -674,7 +676,7 @@ class JsonRpcControllerSpec
val result: Future[JsonRpcResponse] = jsonRpcController.handleRequest(request)

pendingTransactionsManager.expectMsg(PendingTransactionsManager.GetPendingTransactions)
ommersPool.expectMsg(OmmersPool.GetOmmers(2))
ommersPool.expectMsg(OmmersPool.GetOmmers(parentBlock.hash))
//on time out it should respond with empty list

val response = result.futureValue(timeout(Timeouts.longTimeout))
Expand Down Expand Up @@ -786,7 +788,9 @@ class JsonRpcControllerSpec
}

it should "eth_gasPrice" in new TestSetup {
blockchain.storeBlock(Block(Fixtures.Blocks.Block3125369.header.copy(number = 42), Fixtures.Blocks.Block3125369.body)).commit()
blockchain
.storeBlock(Block(Fixtures.Blocks.Block3125369.header.copy(number = 42), Fixtures.Blocks.Block3125369.body))
.commit()
blockchain.saveBestKnownBlock(42)

val request: JsonRpcRequest = JsonRpcRequest(
Expand Down
Loading