Skip to content

Commit 36c0368

Browse files
committed
[CGKIELE-154] stabilize mantis for iele_testnet - fixes for block synchronization
1 parent 4388d98 commit 36c0368

File tree

13 files changed

+68
-23
lines changed

13 files changed

+68
-23
lines changed

src/main/resources/application.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,9 @@ mantis {
249249

250250
heartbeat-interval = 300.millis
251251

252+
# Determines how often new blocks will be forged
253+
block-forging-delay = 15.seconds
254+
252255
# Represents this node.
253256
#
254257
# ID and PORT are not mandatory.
@@ -375,6 +378,8 @@ mantis {
375378
peers-scan-interval = 3.seconds
376379

377380
# Duration for blacklisting a peer. Blacklisting reason include: invalid response from peer, response time-out, etc.
381+
# 0 value is a valid duration and it will disable blacklisting completely (which can be useful when all nodes are
382+
# are controlled by a single party, eg. private networks)
378383
blacklist-duration = 200.seconds
379384

380385
# Retry interval when not having enough peers to start fast-sync
@@ -447,6 +452,10 @@ mantis {
447452
# Maximum number of hashes processed form NewBlockHashes packet
448453
max-new-hashes = 64
449454

455+
# Set to false to disable broadcasting the NewBlockHashes message, as its usefulness is debatable,
456+
# especially in the context of private networks
457+
broadcast-new-block-hashes = true
458+
450459
# This a recovery mechanism for the issue of missing state nodes during blocks execution:
451460
# off - missing state node will result in an exception
452461
# on - missing state node will be redownloaded from a peer and block execution will be retried. This can repeat

src/main/scala/io/iohk/ethereum/blockchain/sync/BlacklistSupport.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.iohk.ethereum.blockchain.sync
22

3-
import scala.concurrent.duration.FiniteDuration
3+
import scala.concurrent.duration.{Duration, FiniteDuration}
44
import akka.actor.{Actor, ActorLogging, Cancellable, Scheduler}
55
import io.iohk.ethereum.network.PeerId
66

@@ -16,10 +16,14 @@ trait BlacklistSupport {
1616
var blacklistedPeers: Seq[(PeerId, Cancellable)] = Nil
1717

1818
def blacklist(peerId: PeerId, duration: FiniteDuration, reason: String): Unit = {
19-
undoBlacklist(peerId)
20-
log.debug(s"Blacklisting peer ($peerId), $reason")
21-
val unblacklistCancellable = scheduler.scheduleOnce(duration, self, UnblacklistPeer(peerId))
22-
blacklistedPeers :+= (peerId, unblacklistCancellable)
19+
if (duration > Duration.Zero) {
20+
undoBlacklist(peerId)
21+
log.debug(s"Blacklisting peer ($peerId), $reason")
22+
val unblacklistCancellable = scheduler.scheduleOnce(duration, self, UnblacklistPeer(peerId))
23+
blacklistedPeers :+= (peerId, unblacklistCancellable)
24+
} else {
25+
log.debug(s"Peer ($peerId) would be blacklisted (reason: $reason), but blacklisting duration is zero")
26+
}
2327
}
2428

2529
def undoBlacklist(peerId: PeerId): Unit = {

src/main/scala/io/iohk/ethereum/blockchain/sync/BlockBroadcast.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
66
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
77
import io.iohk.ethereum.network.p2p.messages.PV62
88
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
9+
import io.iohk.ethereum.utils.Config.SyncConfig
910

1011
import scala.util.Random
1112

12-
class BlockBroadcast(val etcPeerManager: ActorRef) {
13+
class BlockBroadcast(val etcPeerManager: ActorRef, syncConfig: SyncConfig) {
1314

1415
/**
1516
* Broadcasts various NewBlock's messages to handshaked peers, considering that a block should not be sent to a peer
@@ -25,7 +26,11 @@ class BlockBroadcast(val etcPeerManager: ActorRef) {
2526
case (peer, peerInfo) if shouldSendNewBlock(newBlock, peerInfo) => peer }.toSet
2627

2728
broadcastNewBlock(newBlock, peersWithoutBlock)
28-
broadcastNewBlockHash(newBlock, peersWithoutBlock)
29+
30+
if (syncConfig.broadcastNewBlockHashes) {
31+
// NOTE: the usefulness of this message is debatable, especially in private networks
32+
broadcastNewBlockHash(newBlock, peersWithoutBlock)
33+
}
2934
}
3035

3136
private def shouldSendNewBlock(newBlock: NewBlock, peerInfo: PeerInfo): Boolean =

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class SyncController(
7676

7777
def startRegularSync(): Unit = {
7878
val regularSync = context.actorOf(RegularSync.props(appStateStorage, etcPeerManager,
79-
peerEventBus, ommersPool, pendingTransactionsManager, new BlockBroadcast(etcPeerManager),
79+
peerEventBus, ommersPool, pendingTransactionsManager, new BlockBroadcast(etcPeerManager, syncConfig),
8080
ledger, blockchain, syncConfig, scheduler), "regular-sync")
8181
regularSync ! RegularSync.Start
8282
context become runningRegularSync(regularSync)

src/main/scala/io/iohk/ethereum/consensus/atomixraft/AtomixRaftConfig.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ case class AtomixRaftConfig private(
1616
bootstrapNodes: List[AtomixNode],
1717
dataDir: File,
1818
electionTimeout: FiniteDuration,
19-
heartbeatInterval: FiniteDuration
19+
heartbeatInterval: FiniteDuration,
20+
blockForgingDelay: FiniteDuration
2021
)
2122

2223
object AtomixRaftConfig extends Logger {
@@ -26,6 +27,7 @@ object AtomixRaftConfig extends Logger {
2627
final val DataDir = "data-dir"
2728
final val ElectionTimeout = "election-timeout"
2829
final val HeartbeatInterval = "heartbeat-interval"
30+
final val BlockForgingDelay = "block-forging-delay"
2931
}
3032

3133
def parseNodeId(parts: Array[String]): AtomixNodeId =
@@ -67,6 +69,7 @@ object AtomixRaftConfig extends Logger {
6769
val dataDir = new File(config.getString(Keys.DataDir))
6870
val electionTimeout = config.getDuration(Keys.ElectionTimeout).toMillis.millis
6971
val heartbeatInterval = config.getDuration(Keys.HeartbeatInterval).toMillis.millis
72+
val blockForgingDelay = config.getDuration(Keys.BlockForgingDelay).toMillis.millis
7073

7174
log.info("***** local-node = " + localNode)
7275
log.info("***** bootstrap-nodes = " + bootstrapNodes)
@@ -76,7 +79,8 @@ object AtomixRaftConfig extends Logger {
7679
bootstrapNodes = bootstrapNodes,
7780
dataDir = dataDir,
7881
electionTimeout = electionTimeout,
79-
heartbeatInterval = heartbeatInterval
82+
heartbeatInterval = heartbeatInterval,
83+
blockForgingDelay = blockForgingDelay
8084
)
8185
}
8286
}

src/main/scala/io/iohk/ethereum/consensus/atomixraft/AtomixRaftForger.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ class AtomixRaftForger(
2828

2929
def receive: Receive = stopped
3030

31-
private def consensusCofig: ConsensusConfig = consensus.config.generic
32-
private def coinbase: Address = consensusCofig.coinbase
31+
private def consensusConfig: ConsensusConfig = consensus.config.generic
32+
private def atomixRaftConfig: AtomixRaftConfig = consensus.config.specific
33+
private def coinbase: Address = consensusConfig.coinbase
3334
private def isLeader: Boolean = consensus.isLeader.getOrElse(false)
3435
private def blockGenerator: AtomixRaftBlockGenerator = consensus.blockGenerator
3536

@@ -66,7 +67,7 @@ class AtomixRaftForger(
6667

6768
case Failure(ex)
6869
log.error(ex, "Unable to get block")
69-
scheduleOnce(10.seconds, StartForging)
70+
scheduleOnce(atomixRaftConfig.blockForgingDelay, StartForging)
7071
}
7172
}
7273
else {
@@ -78,16 +79,14 @@ class AtomixRaftForger(
7879
if(isLeader) {
7980
log.info("***** Forged block " + block.header.number)
8081
syncController ! RegularSync.MinedBlock(block)
81-
self ! StartForging
82+
scheduleOnce(atomixRaftConfig.blockForgingDelay, StartForging)
8283
}
8384
else {
8485
lostLeadership()
8586
}
8687
}
8788

8889
private def getBlock(parentBlock: Block): Future[PendingBlock] = {
89-
Thread.sleep(AtomixRaftForger.ArtificialDelay)
90-
9190
val ffPendingBlock: Future[Future[PendingBlock]] =
9291
for {
9392
pendingTxResponse getTransactionsFromPool
@@ -124,8 +123,6 @@ class AtomixRaftForger(
124123
}
125124

126125
object AtomixRaftForger {
127-
final val ArtificialDelay = 3001 // FIXME Delete
128-
129126
sealed trait Msg
130127
case object Init extends Msg
131128
case object IAmTheLeader extends Msg

src/main/scala/io/iohk/ethereum/consensus/atomixraft/difficulty/AtomixRaftDifficulty.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ package io.iohk.ethereum.consensus.atomixraft.difficulty
22

33
import io.iohk.ethereum.consensus.difficulty.ConstantDifficulty
44

5-
object AtomixRaftDifficulty extends ConstantDifficulty(0)
5+
object AtomixRaftDifficulty extends ConstantDifficulty(1)

src/main/scala/io/iohk/ethereum/utils/Config.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ object Config {
144144

145145
maxQueuedBlockNumberAhead: Int,
146146
maxQueuedBlockNumberBehind: Int,
147+
broadcastNewBlockHashes: Boolean,
147148

148149
maxNewBlockHashAge: Int,
149150
maxNewHashes: Int,
@@ -187,6 +188,7 @@ object Config {
187188
maxQueuedBlockNumberAhead = syncConfig.getInt("max-queued-block-number-ahead"),
188189
maxNewBlockHashAge = syncConfig.getInt("max-new-block-hash-age"),
189190
maxNewHashes = syncConfig.getInt("max-new-hashes"),
191+
broadcastNewBlockHashes = syncConfig.getBoolean("broadcast-new-block-hashes"),
190192

191193
redownloadMissingStateNodes = syncConfig.getBoolean("redownload-missing-state-nodes"),
192194

src/test/scala/io/iohk/ethereum/blockchain/sync/BlockBroadcastSpec.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
1111
import io.iohk.ethereum.network.p2p.messages.CommonMessages.{NewBlock, Status}
1212
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBody, NewBlockHashes}
1313
import io.iohk.ethereum.network.p2p.messages.{PV62, Versions}
14+
import io.iohk.ethereum.utils.Config
1415
import org.scalatest.{FlatSpec, Matchers}
16+
import scala.concurrent.duration._
1517

1618
class BlockBroadcastSpec extends FlatSpec with Matchers {
1719

@@ -104,12 +106,27 @@ class BlockBroadcastSpec extends FlatSpec with Matchers {
104106
etcPeerManagerProbe.expectNoMsg()
105107
}
106108

109+
it should "not broadcast NewBlockHashes message when disable by configuration" in new TestSetup {
110+
val updatedConfig = syncConfig.copy(broadcastNewBlockHashes = false)
111+
override val blockBroadcast = new BlockBroadcast(etcPeerManagerProbe.ref, updatedConfig)
112+
113+
val blockHeader: BlockHeader = baseBlockHeader.copy(number = initialPeerInfo.maxBlockNumber + 1)
114+
val newBlock = NewBlock(Block(blockHeader, BlockBody(Nil, Nil)), initialPeerInfo.totalDifficulty + 1)
115+
116+
blockBroadcast.broadcastBlock(newBlock, Map(peer -> initialPeerInfo))
117+
118+
etcPeerManagerProbe.expectMsg(EtcPeerManagerActor.SendMessage(newBlock, peer.id))
119+
etcPeerManagerProbe.expectNoMsg(100.millis)
120+
}
121+
107122
trait TestSetup {
108123
implicit val system = ActorSystem("BlockBroadcastSpec_System")
109124

110125
val etcPeerManagerProbe = TestProbe()
111126

112-
val blockBroadcast = new BlockBroadcast(etcPeerManagerProbe.ref)
127+
val syncConfig = Config.SyncConfig(Config.config)
128+
129+
val blockBroadcast = new BlockBroadcast(etcPeerManagerProbe.ref, syncConfig)
113130

114131
val baseBlockHeader = Fixtures.Blocks.Block3125369.header
115132

src/test/scala/io/iohk/ethereum/blockchain/sync/RegularSyncSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ class RegularSyncSpec extends TestKit(ActorSystem("RegularSync_system")) with Wo
526526
maxQueuedBlockNumberBehind = 10,
527527
maxNewBlockHashAge = 20,
528528
maxNewHashes = 64,
529+
broadcastNewBlockHashes = true,
529530
redownloadMissingStateNodes = true,
530531
fastSyncBlockValidationK = 100,
531532
fastSyncBlockValidationN = 2048,

src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,8 @@ class SyncControllerSpec extends FlatSpec with Matchers with BeforeAndAfter with
458458
) extends EphemBlockchainTestSetup {
459459

460460
//+ cake overrides
461+
override implicit lazy val system: ActorSystem = SyncControllerSpec.this.system
462+
461463
override lazy val vm: VMImpl = new VMImpl
462464

463465
override lazy val validators: Validators = _validators
@@ -512,6 +514,7 @@ class SyncControllerSpec extends FlatSpec with Matchers with BeforeAndAfter with
512514
maxQueuedBlockNumberBehind = 10,
513515
maxNewBlockHashAge = 20,
514516
maxNewHashes = 64,
517+
broadcastNewBlockHashes = true,
515518
redownloadMissingStateNodes = false,
516519
fastSyncBlockValidationK = 100,
517520
fastSyncBlockValidationN = 2048,

src/test/scala/io/iohk/ethereum/consensus/atomixraft/difficulty/DifficultySpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class DifficultySpec extends FlatSpec with Matchers with PropertyChecks {
1818
transactionsRoot = ByteString(Hex.decode("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")),
1919
receiptsRoot = ByteString(Hex.decode("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")),
2020
logsBloom = ByteString(Hex.decode("00" * 256)),
21-
difficulty = 0,
21+
difficulty = 1, // this must be equal to the result of whatever calculation atomix-raft applies
2222
number = 20,
2323
gasLimit = 131620495,
2424
gasUsed = 0,
@@ -36,7 +36,7 @@ class DifficultySpec extends FlatSpec with Matchers with PropertyChecks {
3636
transactionsRoot = ByteString(Hex.decode("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")),
3737
receiptsRoot = ByteString(Hex.decode("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")),
3838
logsBloom = ByteString(Hex.decode("00" * 256)),
39-
difficulty = 0,
39+
difficulty = 0, // does not matter for parent
4040
number = 19,
4141
gasLimit = 131749155,
4242
gasUsed = 0,
@@ -57,6 +57,6 @@ class DifficultySpec extends FlatSpec with Matchers with PropertyChecks {
5757
val result = blockHeaderValidator.validate(blockHeader, parentHeader)
5858

5959
result shouldBe Right(BlockHeaderValid)
60-
difficulty shouldBe 0
60+
difficulty shouldBe 1
6161
}
6262
}

src/universal/conf/consensus.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ mantis {
3232
# This is the section dedicated to `atomix-raft` consensus.
3333
# This consensus protocol is selected by setting `mantis.consensus.protocol = atomix-raft`.
3434
atomix-raft {
35+
# Determines how often new blocks will be forged
36+
# block-forging-delay = 15.seconds
37+
3538
# Represents this node.
3639
#
3740
# ID and PORT are not mandatory.

0 commit comments

Comments
 (0)