Skip to content

Commit 410386a

Browse files
author
Christos KK Loverdos
committed
Decouple Consensus II (move get-transaction-from-pool-timeout)
1 parent 3282958 commit 410386a

File tree

14 files changed

+66
-50
lines changed

14 files changed

+66
-50
lines changed

src/main/resources/application.conf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,12 @@ mantis {
198198
pending-tx-manager-query-timeout = 5.seconds
199199

200200
transaction-timeout = 2.minutes
201-
}
202201

203-
consensus {
204-
# New timeout to replace `ommer-pool-query-timeout` for uses outside ethash mining
202+
# Used in mining (ethash) / forging (atomix-raft)
205203
get-transaction-from-pool-timeout = 5.seconds
204+
}
206205

206+
consensus {
207207
# Miner's coinbase address
208208
# Also used in non-Ethash consensus.
209209
coinbase = "0011223344556677889900112233445566778899"

src/main/scala/io/iohk/ethereum/consensus/ConsensusConfig.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import io.iohk.ethereum.domain.Address
77
import io.iohk.ethereum.nodebuilder.ShutdownHookBuilder
88
import io.iohk.ethereum.utils.Logger
99

10-
import scala.concurrent.duration.{FiniteDuration, _}
1110

1211
/**
1312
* Provides generic consensus configuration. Each consensus protocol implementation
@@ -22,7 +21,6 @@ final case class ConsensusConfig(
2221
coinbase: Address,
2322
headerExtraData: ByteString, // only used in BlockGenerator
2423
blockCacheSize: Int, // only used in BlockGenerator
25-
getTransactionFromPoolTimeout: FiniteDuration,
2624
miningEnabled: Boolean
2725
)
2826

@@ -34,7 +32,6 @@ object ConsensusConfig extends Logger {
3432
final val HeaderExtraData = "header-extra-data"
3533
final val BlockCacheSize = "block-cashe-size"
3634
final val MiningEnabled = "mining-enabled"
37-
final val GetTransactionFromPoolTimeout = "get-transaction-from-pool-timeout"
3835
}
3936

4037

@@ -65,8 +62,6 @@ object ConsensusConfig extends Logger {
6562
def apply(mantisConfig: TypesafeConfig)(shutdownHook: ShutdownHookBuilder): ConsensusConfig = {
6663
val config = mantisConfig.getConfig(Keys.Consensus)
6764

68-
def millis(path: String): FiniteDuration = config.getDuration(path).toMillis.millis
69-
7065
val protocol = readProtocol(config)
7166
val coinbase = Address(config.getString(Keys.Coinbase))
7267

@@ -75,14 +70,11 @@ object ConsensusConfig extends Logger {
7570
val blockCacheSize = config.getInt(Keys.BlockCacheSize)
7671
val miningEnabled = config.getBoolean(Keys.MiningEnabled)
7772

78-
val getTransactionFromPoolTimeout = millis(Keys.GetTransactionFromPoolTimeout)
79-
8073
new ConsensusConfig(
8174
protocol = protocol,
8275
coinbase = coinbase,
8376
headerExtraData = headerExtraData,
8477
blockCacheSize = blockCacheSize,
85-
getTransactionFromPoolTimeout = getTransactionFromPoolTimeout,
8678
miningEnabled = miningEnabled
8779
)
8880
}

src/main/scala/io/iohk/ethereum/consensus/atomixraft/AtomixRaftForger.scala

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ class AtomixRaftForger(
2222
blockchain: Blockchain,
2323
pendingTransactionsManager: ActorRef,
2424
syncController: ActorRef,
25-
consensus: AtomixRaftConsensus
25+
consensus: AtomixRaftConsensus,
26+
getTransactionFromPoolTimeout: FiniteDuration
2627
) extends Actor with ActorLogging {
2728

2829
def receive: Receive = stopped
@@ -37,35 +38,35 @@ class AtomixRaftForger(
3738

3839
private def stopped: Receive = {
3940
case Init
40-
log.info("***** EthashMiner initialized")
41+
log.info("***** Forger initialized")
4142

4243
case IAmTheLeader
43-
log.info("***** I am the leader, will start mining")
44-
context become mining
45-
self ! StartMining
44+
log.info("***** I am the leader, will start forging blocks")
45+
context become forging
46+
self ! StartForging
4647
}
4748

48-
private def mining: Receive = {
49-
case StopMining context become stopped
50-
case StartMining startMining()
49+
private def forging: Receive = {
50+
case StopForging context become stopped
51+
case StartForging startForging()
5152
}
5253

5354
private def lostLeadership(): Unit = {
5455
log.info("***** Ouch, lost leadership")
55-
self ! StopMining
56+
self ! StopForging
5657
}
5758

58-
private def startMining(): Unit = {
59+
private def startForging(): Unit = {
5960
if(isLeader) {
6061
val parentBlock = blockchain.getBestBlock()
6162

62-
getBlockForMining(parentBlock) onComplete {
63+
getBlock(parentBlock) onComplete {
6364
case Success(PendingBlock(block, _))
6465
syncTheBlock(block)
6566

6667
case Failure(ex)
67-
log.error(ex, "Unable to get block for mining")
68-
scheduleOnce(10.seconds, StartMining)
68+
log.error(ex, "Unable to get block")
69+
scheduleOnce(10.seconds, StartForging)
6970
}
7071
}
7172
else {
@@ -75,17 +76,16 @@ class AtomixRaftForger(
7576

7677
private def syncTheBlock(block: Block): Unit = {
7778
if(isLeader) {
78-
log.info("***** Mined block " + block.header.number)
79+
log.info("***** Forged block " + block.header.number)
7980
syncController ! RegularSync.MinedBlock(block)
80-
self ! StartMining
81+
self ! StartForging
8182
}
8283
else {
8384
lostLeadership()
8485
}
8586
}
8687

87-
//noinspection ScalaStyle
88-
private def getBlockForMining(parentBlock: Block): Future[PendingBlock] = {
88+
private def getBlock(parentBlock: Block): Future[PendingBlock] = {
8989
Thread.sleep(AtomixRaftForger.ArtificialDelay)
9090

9191
val ffPendingBlock: Future[Future[PendingBlock]] =
@@ -102,7 +102,7 @@ class AtomixRaftForger(
102102
)
103103
errorOrPendingBlock match {
104104
case Left(error)
105-
Future.failed(new RuntimeException(s"Error while generating block for mining: $error"))
105+
Future.failed(new RuntimeException(s"Error while generating block: $error"))
106106

107107
case Right(pendingBlock)
108108
Future.successful(pendingBlock)
@@ -113,11 +113,11 @@ class AtomixRaftForger(
113113
}
114114

115115
private def getTransactionsFromPool = {
116-
implicit val timeout: Timeout = consensusCofig.getTransactionFromPoolTimeout
116+
implicit val timeout: Timeout = getTransactionFromPoolTimeout
117117

118118
(pendingTransactionsManager ? PendingTransactionsManager.GetPendingTransactions).mapTo[PendingTransactionsResponse]
119119
.recover { case ex =>
120-
log.error(ex, "Failed to get transactions, mining block with empty transactions list")
120+
log.error(ex, "Failed to get transactions, forging block with empty transactions list")
121121
PendingTransactionsResponse(Nil)
122122
}
123123
}
@@ -129,17 +129,21 @@ object AtomixRaftForger {
129129
sealed trait Msg
130130
case object Init extends Msg
131131
case object IAmTheLeader extends Msg
132-
case object StartMining extends Msg
133-
case object StopMining extends Msg
132+
case object StartForging extends Msg
133+
case object StopForging extends Msg
134134

135135
private def props(
136136
blockchain: Blockchain,
137137
pendingTransactionsManager: ActorRef,
138138
syncController: ActorRef,
139-
consensus: AtomixRaftConsensus
139+
consensus: AtomixRaftConsensus,
140+
getTransactionFromPoolTimeout: FiniteDuration
140141
): Props =
141142
Props(
142-
new AtomixRaftForger(blockchain, pendingTransactionsManager, syncController, consensus)
143+
new AtomixRaftForger(
144+
blockchain, pendingTransactionsManager, syncController, consensus,
145+
getTransactionFromPoolTimeout
146+
)
143147
)
144148

145149
private[atomixraft] def apply(node: Node): ActorRef = {
@@ -149,7 +153,8 @@ object AtomixRaftForger {
149153
blockchain = node.blockchain,
150154
pendingTransactionsManager = node.pendingTransactionsManager,
151155
syncController = node.syncController,
152-
consensus = consensus
156+
consensus = consensus,
157+
getTransactionFromPoolTimeout = node.txPoolConfig.getTransactionFromPoolTimeout
153158
)
154159

155160
node.system.actorOf(minerProps)

src/main/scala/io/iohk/ethereum/consensus/ethash/EthashMiner.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ class EthashMiner(
3232
pendingTransactionsManager: ActorRef,
3333
syncController: ActorRef,
3434
ethService: EthService,
35-
consensus: EthashConsensus
35+
consensus: EthashConsensus,
36+
getTransactionFromPoolTimeout: FiniteDuration
3637
) extends Actor with ActorLogging {
3738

3839
import EthashMiner._
@@ -196,7 +197,7 @@ class EthashMiner(
196197
}
197198

198199
private def getTransactionsFromPool: Future[PendingTransactionsResponse] = {
199-
implicit val timeout = Timeout(consensusConfig.getTransactionFromPoolTimeout)
200+
implicit val timeout = Timeout(getTransactionFromPoolTimeout)
200201

201202
(pendingTransactionsManager ? PendingTransactionsManager.GetPendingTransactions).mapTo[PendingTransactionsResponse]
202203
.recover { case ex =>
@@ -213,10 +214,15 @@ object EthashMiner {
213214
pendingTransactionsManager: ActorRef,
214215
syncController: ActorRef,
215216
ethService: EthService,
216-
consensus: EthashConsensus
217+
consensus: EthashConsensus,
218+
getTransactionFromPoolTimeout: FiniteDuration
217219
): Props =
218-
Props(new EthashMiner(blockchain, ommersPool,
219-
pendingTransactionsManager, syncController, ethService, consensus))
220+
Props(
221+
new EthashMiner(
222+
blockchain, ommersPool, pendingTransactionsManager, syncController, ethService, consensus,
223+
getTransactionFromPoolTimeout
224+
)
225+
)
220226

221227
def apply(node: Node): ActorRef = {
222228
node.consensus match {
@@ -227,7 +233,8 @@ object EthashMiner {
227233
pendingTransactionsManager = node.pendingTransactionsManager,
228234
syncController = node.syncController,
229235
ethService = node.ethService,
230-
consensus = consensus
236+
consensus = consensus,
237+
getTransactionFromPoolTimeout = node.txPoolConfig.getTransactionFromPoolTimeout
231238
)
232239

233240
node.system.actorOf(minerProps)

src/main/scala/io/iohk/ethereum/jsonrpc/EthService.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,8 @@ class EthService(
180180
filterConfig: FilterConfig,
181181
blockchainConfig: BlockchainConfig,
182182
protocolVersion: Int,
183-
activeTimeout: FiniteDuration)
183+
activeTimeout: FiniteDuration,
184+
getTransactionFromPoolTimeout: FiniteDuration)
184185
extends Logger {
185186

186187
import EthService._
@@ -486,7 +487,7 @@ class EthService(
486487

487488
// TODO This seems to be re-implemented elsewhere, probably move to a better place? Also generalize the error message.
488489
private def getTransactionsFromPool: Future[PendingTransactionsResponse] = {
489-
implicit val timeout = Timeout(consensusConfig.getTransactionFromPoolTimeout)
490+
implicit val timeout = Timeout(getTransactionFromPoolTimeout)
490491

491492
(pendingTransactionsManager ? PendingTransactionsManager.GetPendingTransactions).mapTo[PendingTransactionsResponse]
492493
.recover { case ex =>

src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,11 +303,13 @@ trait EthServiceBuilder {
303303
ConsensusBuilder with
304304
ConsensusConfigBuilder with
305305
FilterManagerBuilder with
306-
FilterConfigBuilder =>
306+
FilterConfigBuilder with
307+
TxPoolConfigBuilder =>
307308

308309
lazy val ethService = new EthService(blockchain, storagesInstance.storages.appStateStorage,
309310
ledger, keyStore, pendingTransactionsManager, syncController, ommersPool, filterManager, filterConfig,
310-
blockchainConfig, Config.Network.protocolVersion, Config.Network.Rpc.activeTimeout)
311+
blockchainConfig, Config.Network.protocolVersion, Config.Network.Rpc.activeTimeout,
312+
txPoolConfig.getTransactionFromPoolTimeout)
311313
}
312314

313315
trait PersonalServiceBuilder {

src/main/scala/io/iohk/ethereum/utils/Config.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ trait TxPoolConfig {
238238
val txPoolSize: Int
239239
val pendingTxManagerQueryTimeout: FiniteDuration
240240
val transactionTimeout: FiniteDuration
241+
val getTransactionFromPoolTimeout: FiniteDuration
241242
}
242243

243244
object TxPoolConfig {
@@ -248,6 +249,7 @@ object TxPoolConfig {
248249
val txPoolSize: Int = txPoolConfig.getInt("tx-pool-size")
249250
val pendingTxManagerQueryTimeout: FiniteDuration = txPoolConfig.getDuration("pending-tx-manager-query-timeout").toMillis.millis
250251
val transactionTimeout: FiniteDuration = txPoolConfig.getDuration("transaction-timeout").toMillis.millis
252+
val getTransactionFromPoolTimeout: FiniteDuration = txPoolConfig.getDuration("get-transaction-from-pool-timeout").toMillis.millis
251253
}
252254
}
253255
}

src/test/scala/io/iohk/ethereum/consensus/ConsensusConfigs.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ object ConsensusConfigs {
2424
coinbase = coinbase,
2525
headerExtraData = ByteString.empty,
2626
blockCacheSize = blockCacheSize,
27-
getTransactionFromPoolTimeout = ethashConfig.ommerPoolQueryTimeout,
2827
miningEnabled = false
2928
)
3029

src/test/scala/io/iohk/ethereum/consensus/ethash/EthashMinerSpec.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,12 @@ class EthashMinerSpec extends FlatSpec with Matchers {
143143
val syncController = TestProbe()
144144

145145
val ethService = mock[EthService]
146+
val getTransactionFromPoolTimeout: FiniteDuration = 5.seconds
147+
146148

147149
val miner = TestActorRef(EthashMiner.props(
148150
blockchain, ommersPool.ref, pendingTransactionsManager.ref,
149-
syncController.ref, ethService, consensus))
151+
syncController.ref, ethService, consensus, getTransactionFromPoolTimeout))
150152

151153
def waitForMinedBlock(): Block = {
152154
syncController.expectMsgPF[Block](10.minutes) {

src/test/scala/io/iohk/ethereum/jsonrpc/EthServiceSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -898,6 +898,7 @@ class EthServiceSpec extends FlatSpec with Matchers with ScalaFutures with MockF
898898
val miningConfig = ConsensusConfigs.ethashConfig
899899
val fullConsensusConfig = ConsensusConfigs.fullConsensusConfig
900900
val activeTimeout: FiniteDuration = 5.seconds
901+
val getTransactionFromPoolTimeout: FiniteDuration = 5.seconds
901902

902903
val filterConfig = new FilterConfig {
903904
override val filterTimeout: FiniteDuration = Timeouts.normalTimeout
@@ -908,7 +909,7 @@ class EthServiceSpec extends FlatSpec with Matchers with ScalaFutures with MockF
908909

909910
val ethService = new EthService(blockchain, appStateStorage, ledger,
910911
keyStore, pendingTransactionsManager.ref, syncingController.ref, ommersPool.ref, filterManager.ref, filterConfig,
911-
blockchainConfig, currentProtocolVersion, activeTimeout)
912+
blockchainConfig, currentProtocolVersion, activeTimeout, getTransactionFromPoolTimeout)
912913

913914
val blockToRequest = Block(Fixtures.Blocks.Block3125369.header, Fixtures.Blocks.Block3125369.body)
914915
val blockToRequestNumber = blockToRequest.header.number

src/test/scala/io/iohk/ethereum/jsonrpc/FilterManagerSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ class FilterManagerSpec extends FlatSpec with Matchers with ScalaFutures with No
383383
override val txPoolSize: Int = 30
384384
override val pendingTxManagerQueryTimeout: FiniteDuration = Timeouts.longTimeout
385385
override val transactionTimeout: FiniteDuration = Timeouts.normalTimeout
386+
override val getTransactionFromPoolTimeout: FiniteDuration = Timeouts.normalTimeout
386387
}
387388

388389
val time = new VirtualTime

src/test/scala/io/iohk/ethereum/jsonrpc/JsonRpcControllerSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1456,6 +1456,7 @@ class JsonRpcControllerSpec extends FlatSpec with Matchers with PropertyChecks w
14561456
override lazy val consensusConfig = ConsensusConfigs.consensusConfig
14571457
val fullConsensusConfig = ConsensusConfigs.fullConsensusConfig
14581458
val activeTimeout: FiniteDuration = 5.seconds
1459+
val getTransactionFromPoolTimeout: FiniteDuration = 5.seconds
14591460

14601461
val filterConfig = new FilterConfig {
14611462
override val filterTimeout: FiniteDuration = Timeouts.normalTimeout
@@ -1471,7 +1472,7 @@ class JsonRpcControllerSpec extends FlatSpec with Matchers with PropertyChecks w
14711472
val ethService = new EthService(
14721473
blockchain, appStateStorage, ledger,
14731474
keyStore, pendingTransactionsManager.ref, syncingController.ref, ommersPool.ref, filterManager.ref, filterConfig,
1474-
blockchainConfig, currentProtocolVersion, activeTimeout)
1475+
blockchainConfig, currentProtocolVersion, activeTimeout, getTransactionFromPoolTimeout)
14751476
val jsonRpcController = new JsonRpcController(web3Service, netService, ethService, personalService, config)
14761477

14771478
val blockHeader = BlockHeader(

src/test/scala/io/iohk/ethereum/jsonrpc/PersonalServiceSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,7 @@ class PersonalServiceSpec extends FlatSpec with Matchers with MockFactory with S
440440
override val txPoolSize: Int = 30
441441
override val pendingTxManagerQueryTimeout: FiniteDuration = Timeouts.normalTimeout
442442
override val transactionTimeout: FiniteDuration = Timeouts.normalTimeout
443+
override val getTransactionFromPoolTimeout: FiniteDuration = Timeouts.normalTimeout
443444
}
444445

445446
val time = new VirtualTime

src/test/scala/io/iohk/ethereum/transactions/PendingTransactionsManagerSpec.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ class PendingTransactionsManagerSpec extends FlatSpec with Matchers with ScalaFu
159159
override val txPoolConfig = new TxPoolConfig {
160160
override val txPoolSize: Int = 300
161161
override val transactionTimeout: FiniteDuration = 500.millis
162+
override val getTransactionFromPoolTimeout: FiniteDuration = Timeouts.normalTimeout
162163

163164
//unused
164165
override val pendingTxManagerQueryTimeout: FiniteDuration = Timeouts.veryLongTimeout
@@ -203,6 +204,7 @@ class PendingTransactionsManagerSpec extends FlatSpec with Matchers with ScalaFu
203204
//unused
204205
override val pendingTxManagerQueryTimeout: FiniteDuration = Timeouts.veryLongTimeout
205206
override val transactionTimeout: FiniteDuration = Timeouts.veryLongTimeout
207+
override val getTransactionFromPoolTimeout: FiniteDuration = Timeouts.veryLongTimeout
206208
}
207209

208210
val peerManager = TestProbe()

0 commit comments

Comments
 (0)