Skip to content

Commit ee6fd9c

Browse files
author
Michał Mrożek
authored
Merge branch 'develop' into ectm-104-target-block-selection
2 parents 8932dc1 + 8e45269 commit ee6fd9c

File tree

12 files changed

+376
-117
lines changed

12 files changed

+376
-117
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,17 @@ class BlockFetcher(
6868
private def handleCommands(state: BlockFetcherState): Receive = {
6969
case PickBlocks(amount) => state.pickBlocks(amount) |> handlePickedBlocks(state) |> fetchBlocks
7070
case StrictPickBlocks(from, atLeastWith) =>
71-
val minBlock = from.min(atLeastWith).max(1)
72-
log.debug("Strict Pick blocks from {} to {}", from, atLeastWith)
71+
// FIXME: Consider having StrictPickBlocks calls guaranteeing this
72+
// from parameter could be negative or 0 so we should cap it to 1 if that's the case
73+
val fromCapped = from.max(1)
74+
val minBlock = fromCapped.min(atLeastWith).max(1)
75+
log.debug("Strict Pick blocks from {} to {}", fromCapped, atLeastWith)
7376
log.debug("Lowest available block is {}", state.lowestBlock)
7477

7578
val newState = if (minBlock < state.lowestBlock) {
7679
state.invalidateBlocksFrom(minBlock, None)._2
7780
} else {
78-
state.strictPickBlocks(from, atLeastWith) |> handlePickedBlocks(state)
81+
state.strictPickBlocks(fromCapped, atLeastWith) |> handlePickedBlocks(state)
7982
}
8083

8184
fetchBlocks(newState)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,15 @@ case class BlockFetcherState(
107107
None
108108
}
109109

110+
/**
111+
* Returns all the ready blocks but only if it includes blocks with number:
112+
* - lower = min(from, atLeastWith)
113+
* - upper = max(from, atLeastWith)
114+
*/
110115
def strictPickBlocks(from: BigInt, atLeastWith: BigInt): Option[(NonEmptyList[Block], BlockFetcherState)] = {
111116
val lower = from.min(atLeastWith)
112117
val upper = from.max(atLeastWith)
118+
113119
readyBlocks.some
114120
.filter(_.headOption.exists(block => block.number <= lower))
115121
.filter(_.lastOption.exists(block => block.number >= upper))

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ class BlockImporter(
4545
start()
4646
}
4747

48-
private def idle: Receive = {
49-
case Start => start()
48+
private def idle: Receive = { case Start =>
49+
start()
5050
}
5151

5252
private def handleTopMessages(state: ImporterState, currentBehavior: Behavior): Receive = {
@@ -63,8 +63,6 @@ class BlockImporter(
6363
case MinedBlock(block) =>
6464
if (!state.importing) {
6565
importMinedBlock(block, state)
66-
} else {
67-
ommersPool ! AddOmmers(block.header)
6866
}
6967
case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing => importNewBlock(block, peerId, state)
7068
case ImportDone(newBehavior) =>
@@ -96,16 +94,21 @@ class BlockImporter(
9694
}
9795

9896
private def pickBlocks(state: ImporterState): Unit = {
99-
val msg = state.resolvingBranchFrom.fold[BlockFetcher.FetchMsg](
100-
BlockFetcher.PickBlocks(syncConfig.blocksBatchSize))(
101-
from => BlockFetcher.StrictPickBlocks(from, startingBlockNumber))
97+
val msg =
98+
state.resolvingBranchFrom.fold[BlockFetcher.FetchMsg](BlockFetcher.PickBlocks(syncConfig.blocksBatchSize))(from =>
99+
BlockFetcher.StrictPickBlocks(from, startingBlockNumber)
100+
)
102101

103102
fetcher ! msg
104103
}
105104

106105
private def importBlocks(blocks: NonEmptyList[Block]): ImportFn =
107106
importWith {
108-
log.debug("Attempting to import blocks starting from {}", blocks.head.number)
107+
log.debug(
108+
"Attempting to import blocks starting from {} and ending with {}",
109+
blocks.head.number,
110+
blocks.last.number
111+
)
109112
Future
110113
.successful(resolveBranch(blocks))
111114
.flatMap {
@@ -142,8 +145,9 @@ class BlockImporter(
142145
}
143146
}
144147

145-
private def tryImportBlocks(blocks: List[Block], importedBlocks: List[Block] = Nil)(
146-
implicit ec: ExecutionContext): Future[(List[Block], Option[Any])] =
148+
private def tryImportBlocks(blocks: List[Block], importedBlocks: List[Block] = Nil)(implicit
149+
ec: ExecutionContext
150+
): Future[(List[Block], Option[Any])] =
147151
if (blocks.isEmpty) {
148152
Future.successful((importedBlocks, None))
149153
} else {
@@ -290,9 +294,11 @@ object BlockImporter {
290294
syncConfig: SyncConfig,
291295
ommersPool: ActorRef,
292296
broadcaster: ActorRef,
293-
pendingTransactionsManager: ActorRef): Props =
297+
pendingTransactionsManager: ActorRef
298+
): Props =
294299
Props(
295-
new BlockImporter(fetcher, ledger, blockchain, syncConfig, ommersPool, broadcaster, pendingTransactionsManager))
300+
new BlockImporter(fetcher, ledger, blockchain, syncConfig, ommersPool, broadcaster, pendingTransactionsManager)
301+
)
296302

297303
type Behavior = ImporterState => Receive
298304
type ImportFn = ImporterState => Unit

src/main/scala/io/iohk/ethereum/consensus/ethash/EthashBlockCreator.scala

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package io.iohk.ethereum.consensus.ethash
22

33
import akka.actor.ActorRef
44
import akka.pattern.ask
5-
import akka.util.Timeout
5+
import akka.util.{Timeout, ByteString}
66
import io.iohk.ethereum.consensus.blocks.PendingBlock
77
import io.iohk.ethereum.consensus.ethash.blocks.EthashBlockGenerator
88
import io.iohk.ethereum.domain.{Address, Block}
@@ -28,23 +28,21 @@ class EthashBlockCreator(
2828
def getBlockForMining(parentBlock: Block, withTransactions: Boolean = true): Future[PendingBlock] = {
2929
val transactions =
3030
if (withTransactions) getTransactionsFromPool else Future.successful(PendingTransactionsResponse(Nil))
31-
getOmmersFromPool(parentBlock.header.number + 1).zip(transactions).flatMap {
32-
case (ommers, pendingTxs) =>
33-
blockGenerator
34-
.generateBlock(parentBlock, pendingTxs.pendingTransactions.map(_.stx.tx), coinbase, ommers.headers) match {
35-
case Right(pb) => Future.successful(pb)
36-
case Left(err) => Future.failed(new RuntimeException(s"Error while generating block for mining: $err"))
37-
}
31+
getOmmersFromPool(parentBlock.hash).zip(transactions).flatMap { case (ommers, pendingTxs) =>
32+
blockGenerator
33+
.generateBlock(parentBlock, pendingTxs.pendingTransactions.map(_.stx.tx), coinbase, ommers.headers) match {
34+
case Right(pb) => Future.successful(pb)
35+
case Left(err) => Future.failed(new RuntimeException(s"Error while generating block for mining: $err"))
36+
}
3837
}
3938
}
4039

41-
private def getOmmersFromPool(blockNumber: BigInt): Future[OmmersPool.Ommers] = {
42-
(ommersPool ? OmmersPool.GetOmmers(blockNumber))(Timeout(miningConfig.ommerPoolQueryTimeout))
40+
private def getOmmersFromPool(parentBlockHash: ByteString): Future[OmmersPool.Ommers] = {
41+
(ommersPool ? OmmersPool.GetOmmers(parentBlockHash))(Timeout(miningConfig.ommerPoolQueryTimeout))
4342
.mapTo[OmmersPool.Ommers]
44-
.recover {
45-
case ex =>
46-
log.error("Failed to get ommers, mining block with empty ommers list", ex)
47-
OmmersPool.Ommers(Nil)
43+
.recover { case ex =>
44+
log.error("Failed to get ommers, mining block with empty ommers list", ex)
45+
OmmersPool.Ommers(Nil)
4846
}
4947
}
5048

src/main/scala/io/iohk/ethereum/domain/BlockHeader.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ case class BlockHeader(
2727

2828
override def toString: String = {
2929
s"""BlockHeader {
30+
|hash: $hashAsHexString
3031
|parentHash: ${Hex.toHexString(parentHash.toArray[Byte])}
3132
|ommersHash: ${Hex.toHexString(ommersHash.toArray[Byte])}
3233
|beneficiary: ${Hex.toHexString(beneficiary.toArray[Byte])}

src/main/scala/io/iohk/ethereum/jsonrpc/EthService.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ class EthService(
507507
import io.iohk.ethereum.consensus.ethash.EthashUtils.{epoch, seed}
508508

509509
val bestBlock = blockchain.getBestBlock()
510-
getOmmersFromPool(bestBlock.header.number + 1).zip(getTransactionsFromPool).map { case (ommers, pendingTxs) =>
510+
getOmmersFromPool(bestBlock.hash).zip(getTransactionsFromPool).map { case (ommers, pendingTxs) =>
511511
val blockGenerator = ethash.blockGenerator
512512
blockGenerator.generateBlock(
513513
bestBlock,
@@ -530,12 +530,12 @@ class EthService(
530530
}
531531
})(Future.successful(Left(JsonRpcErrors.ConsensusIsNotEthash)))
532532

533-
private def getOmmersFromPool(blockNumber: BigInt): Future[OmmersPool.Ommers] =
533+
private def getOmmersFromPool(parentBlockHash: ByteString): Future[OmmersPool.Ommers] =
534534
consensus.ifEthash(ethash => {
535535
val miningConfig = ethash.config.specific
536536
implicit val timeout: Timeout = Timeout(miningConfig.ommerPoolQueryTimeout)
537537

538-
(ommersPool ? OmmersPool.GetOmmers(blockNumber))
538+
(ommersPool ? OmmersPool.GetOmmers(parentBlockHash))
539539
.mapTo[OmmersPool.Ommers]
540540
.recover { case ex =>
541541
log.error("failed to get ommer, mining block with empty ommers list", ex)
Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,79 @@
11
package io.iohk.ethereum.ommers
22

3-
import akka.actor.{Actor, Props}
3+
import akka.util.ByteString
4+
import akka.actor.{Actor, ActorLogging, Props}
5+
import org.bouncycastle.util.encoders.Hex
46
import io.iohk.ethereum.domain.{BlockHeader, Blockchain}
57
import io.iohk.ethereum.ommers.OmmersPool.{AddOmmers, GetOmmers, RemoveOmmers}
8+
import scala.annotation.tailrec
69

7-
class OmmersPool(blockchain: Blockchain, ommersPoolSize: Int) extends Actor {
10+
class OmmersPool(blockchain: Blockchain, ommersPoolSize: Int, ommerGenerationLimit: Int, returnedOmmersSizeLimit: Int)
11+
extends Actor
12+
with ActorLogging {
813

914
var ommersPool: Seq[BlockHeader] = Nil
1015

11-
val ommerGenerationLimit: Int = 6 //Stated on section 11.1, eq. (143) of the YP
12-
val ommerSizeLimit: Int = 2
13-
1416
override def receive: Receive = {
1517
case AddOmmers(ommers) =>
1618
ommersPool = (ommers ++ ommersPool).take(ommersPoolSize).distinct
19+
logStatus(event = "Ommers after add", ommers = ommersPool)
1720

1821
case RemoveOmmers(ommers) =>
1922
val toDelete = ommers.map(_.hash).toSet
2023
ommersPool = ommersPool.filter(b => !toDelete.contains(b.hash))
24+
logStatus(event = "Ommers after remove", ommers = ommersPool)
2125

22-
case GetOmmers(blockNumber) =>
23-
val ommers = ommersPool.filter { b =>
24-
val generationDifference = blockNumber - b.number
25-
generationDifference > 0 && generationDifference <= ommerGenerationLimit
26-
}.filter { b =>
27-
blockchain.getBlockHeaderByHash(b.parentHash).isDefined
28-
}.take(ommerSizeLimit)
26+
case GetOmmers(parentBlockHash) =>
27+
val ancestors = collectAncestors(parentBlockHash, ommerGenerationLimit)
28+
val ommers = ommersPool
29+
.filter { b =>
30+
val notAncestor = ancestors.find(_.hash == b.hash).isEmpty
31+
ancestors.find(_.hash == b.parentHash).isDefined && notAncestor
32+
}
33+
.take(returnedOmmersSizeLimit)
34+
logStatus(event = s"Ommers given parent block ${Hex.toHexString(parentBlockHash.toArray)}", ommers)
2935
sender() ! OmmersPool.Ommers(ommers)
3036
}
37+
38+
private def collectAncestors(parentHash: ByteString, generationLimit: Int): List[BlockHeader] = {
39+
@tailrec
40+
def rec(hash: ByteString, limit: Int, acc: List[BlockHeader]): List[BlockHeader] = {
41+
if (limit > 0) {
42+
blockchain.getBlockHeaderByHash(hash) match {
43+
case Some(bh) => rec(bh.parentHash, limit - 1, acc :+ bh)
44+
case None => acc
45+
}
46+
} else {
47+
acc
48+
}
49+
}
50+
rec(parentHash, generationLimit, List.empty)
51+
}
52+
53+
private def logStatus(event: String, ommers: Seq[BlockHeader]): Unit = {
54+
lazy val ommersAsString: Seq[String] = ommers.map { bh => s"[number = ${bh.number}, hash = ${bh.hashAsHexString}]" }
55+
log.debug(s"$event ${ommersAsString}")
56+
}
3157
}
3258

3359
object OmmersPool {
34-
def props(blockchain: Blockchain, ommersPoolSize: Int): Props = Props(new OmmersPool(blockchain, ommersPoolSize))
60+
61+
/**
62+
* As is stated on section 11.1, eq. (143) of the YP
63+
*
64+
* @param ommerGenerationLimit should be === 6
65+
* @param returnedOmmersSizeLimit should be === 2
66+
*
67+
* ^ Probably not worthy but those params could be placed in consensus config.
68+
*/
69+
def props(
70+
blockchain: Blockchain,
71+
ommersPoolSize: Int,
72+
ommerGenerationLimit: Int = 6,
73+
returnedOmmersSizeLimit: Int = 2
74+
): Props = Props(
75+
new OmmersPool(blockchain, ommersPoolSize, ommerGenerationLimit, returnedOmmersSizeLimit)
76+
)
3577

3678
case class AddOmmers(ommers: List[BlockHeader])
3779

@@ -45,7 +87,7 @@ object OmmersPool {
4587
def apply(b: BlockHeader*): RemoveOmmers = RemoveOmmers(b.toList)
4688
}
4789

48-
case class GetOmmers(blockNumber: BigInt)
90+
case class GetOmmers(parentBlockHash: ByteString)
4991

5092
case class Ommers(headers: Seq[BlockHeader])
5193
}

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,5 @@ class BlockFetcherStateSpec extends TestKit(ActorSystem()) with AnyWordSpecLike
3333
newState.knownTop shouldEqual newBestBlock.number
3434
}
3535
}
36-
3736
}
3837
}

0 commit comments

Comments
 (0)