Skip to content

[CGKIELE-154] stabilize sync for iele testnet #442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ mantis {

heartbeat-interval = 300.millis

# Determines how often new blocks will be forged
block-forging-delay = 15.seconds

# Represents this node.
#
# ID and PORT are not mandatory.
Expand Down Expand Up @@ -375,6 +378,8 @@ mantis {
peers-scan-interval = 3.seconds

# Duration for blacklisting a peer. Blacklisting reason include: invalid response from peer, response time-out, etc.
# 0 value is a valid duration and it will disable blacklisting completely (which can be useful when all nodes are
# are controlled by a single party, eg. private networks)
blacklist-duration = 200.seconds

# Retry interval when not having enough peers to start fast-sync
Expand Down Expand Up @@ -447,6 +452,10 @@ mantis {
# Maximum number of hashes processed form NewBlockHashes packet
max-new-hashes = 64

# Set to false to disable broadcasting the NewBlockHashes message, as its usefulness is debatable,
# especially in the context of private networks
broadcast-new-block-hashes = true

# This a recovery mechanism for the issue of missing state nodes during blocks execution:
# off - missing state node will result in an exception
# on - missing state node will be redownloaded from a peer and block execution will be retried. This can repeat
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.iohk.ethereum.blockchain.sync

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{Actor, ActorLogging, Cancellable, Scheduler}
import io.iohk.ethereum.network.PeerId

Expand All @@ -16,10 +16,14 @@ trait BlacklistSupport {
var blacklistedPeers: Seq[(PeerId, Cancellable)] = Nil

def blacklist(peerId: PeerId, duration: FiniteDuration, reason: String): Unit = {
undoBlacklist(peerId)
log.debug(s"Blacklisting peer ($peerId), $reason")
val unblacklistCancellable = scheduler.scheduleOnce(duration, self, UnblacklistPeer(peerId))
blacklistedPeers :+= (peerId, unblacklistCancellable)
if (duration > Duration.Zero) {
undoBlacklist(peerId)
log.debug(s"Blacklisting peer ($peerId), $reason")
val unblacklistCancellable = scheduler.scheduleOnce(duration, self, UnblacklistPeer(peerId))
blacklistedPeers :+= (peerId, unblacklistCancellable)
} else {
log.debug(s"Peer ($peerId) would be blacklisted (reason: $reason), but blacklisting duration is zero")
}
}

def undoBlacklist(peerId: PeerId): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.network.p2p.messages.PV62
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
import io.iohk.ethereum.utils.Config.SyncConfig

import scala.util.Random

class BlockBroadcast(val etcPeerManager: ActorRef) {
class BlockBroadcast(val etcPeerManager: ActorRef, syncConfig: SyncConfig) {

/**
* Broadcasts various NewBlock's messages to handshaked peers, considering that a block should not be sent to a peer
Expand All @@ -25,7 +26,11 @@ class BlockBroadcast(val etcPeerManager: ActorRef) {
case (peer, peerInfo) if shouldSendNewBlock(newBlock, peerInfo) => peer }.toSet

broadcastNewBlock(newBlock, peersWithoutBlock)
broadcastNewBlockHash(newBlock, peersWithoutBlock)

if (syncConfig.broadcastNewBlockHashes) {
// NOTE: the usefulness of this message is debatable, especially in private networks
broadcastNewBlockHash(newBlock, peersWithoutBlock)
}
}

private def shouldSendNewBlock(newBlock: NewBlock, peerInfo: PeerInfo): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class SyncController(

def startRegularSync(): Unit = {
val regularSync = context.actorOf(RegularSync.props(appStateStorage, etcPeerManager,
peerEventBus, ommersPool, pendingTransactionsManager, new BlockBroadcast(etcPeerManager),
peerEventBus, ommersPool, pendingTransactionsManager, new BlockBroadcast(etcPeerManager, syncConfig),
ledger, blockchain, syncConfig, scheduler), "regular-sync")
regularSync ! RegularSync.Start
context become runningRegularSync(regularSync)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ case class AtomixRaftConfig private(
bootstrapNodes: List[AtomixNode],
dataDir: File,
electionTimeout: FiniteDuration,
heartbeatInterval: FiniteDuration
heartbeatInterval: FiniteDuration,
blockForgingDelay: FiniteDuration
)

object AtomixRaftConfig extends Logger {
Expand All @@ -26,6 +27,7 @@ object AtomixRaftConfig extends Logger {
final val DataDir = "data-dir"
final val ElectionTimeout = "election-timeout"
final val HeartbeatInterval = "heartbeat-interval"
final val BlockForgingDelay = "block-forging-delay"
}

def parseNodeId(parts: Array[String]): AtomixNodeId =
Expand Down Expand Up @@ -67,6 +69,7 @@ object AtomixRaftConfig extends Logger {
val dataDir = new File(config.getString(Keys.DataDir))
val electionTimeout = config.getDuration(Keys.ElectionTimeout).toMillis.millis
val heartbeatInterval = config.getDuration(Keys.HeartbeatInterval).toMillis.millis
val blockForgingDelay = config.getDuration(Keys.BlockForgingDelay).toMillis.millis

log.info("***** local-node = " + localNode)
log.info("***** bootstrap-nodes = " + bootstrapNodes)
Expand All @@ -76,7 +79,8 @@ object AtomixRaftConfig extends Logger {
bootstrapNodes = bootstrapNodes,
dataDir = dataDir,
electionTimeout = electionTimeout,
heartbeatInterval = heartbeatInterval
heartbeatInterval = heartbeatInterval,
blockForgingDelay = blockForgingDelay
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ class AtomixRaftForger(

def receive: Receive = stopped

private def consensusCofig: ConsensusConfig = consensus.config.generic
private def coinbase: Address = consensusCofig.coinbase
private def consensusConfig: ConsensusConfig = consensus.config.generic
private def atomixRaftConfig: AtomixRaftConfig = consensus.config.specific
private def coinbase: Address = consensusConfig.coinbase
private def isLeader: Boolean = consensus.isLeader.getOrElse(false)
private def blockGenerator: AtomixRaftBlockGenerator = consensus.blockGenerator

Expand Down Expand Up @@ -66,7 +67,7 @@ class AtomixRaftForger(

case Failure(ex) ⇒
log.error(ex, "Unable to get block")
scheduleOnce(10.seconds, StartForging)
scheduleOnce(atomixRaftConfig.blockForgingDelay, StartForging)
}
}
else {
Expand All @@ -78,16 +79,14 @@ class AtomixRaftForger(
if(isLeader) {
log.info("***** Forged block " + block.header.number)
syncController ! RegularSync.MinedBlock(block)
self ! StartForging
scheduleOnce(atomixRaftConfig.blockForgingDelay, StartForging)
}
else {
lostLeadership()
}
}

private def getBlock(parentBlock: Block): Future[PendingBlock] = {
Thread.sleep(AtomixRaftForger.ArtificialDelay)

val ffPendingBlock: Future[Future[PendingBlock]] =
for {
pendingTxResponse ← getTransactionsFromPool
Expand Down Expand Up @@ -124,8 +123,6 @@ class AtomixRaftForger(
}

object AtomixRaftForger {
final val ArtificialDelay = 3001 // FIXME Delete

sealed trait Msg
case object Init extends Msg
case object IAmTheLeader extends Msg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ package io.iohk.ethereum.consensus.atomixraft.difficulty

import io.iohk.ethereum.consensus.difficulty.ConstantDifficulty

object AtomixRaftDifficulty extends ConstantDifficulty(0)
object AtomixRaftDifficulty extends ConstantDifficulty(1)
2 changes: 2 additions & 0 deletions src/main/scala/io/iohk/ethereum/utils/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ object Config {

maxQueuedBlockNumberAhead: Int,
maxQueuedBlockNumberBehind: Int,
broadcastNewBlockHashes: Boolean,

maxNewBlockHashAge: Int,
maxNewHashes: Int,
Expand Down Expand Up @@ -187,6 +188,7 @@ object Config {
maxQueuedBlockNumberAhead = syncConfig.getInt("max-queued-block-number-ahead"),
maxNewBlockHashAge = syncConfig.getInt("max-new-block-hash-age"),
maxNewHashes = syncConfig.getInt("max-new-hashes"),
broadcastNewBlockHashes = syncConfig.getBoolean("broadcast-new-block-hashes"),

redownloadMissingStateNodes = syncConfig.getBoolean("redownload-missing-state-nodes"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.p2p.messages.CommonMessages.{NewBlock, Status}
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBody, NewBlockHashes}
import io.iohk.ethereum.network.p2p.messages.{PV62, Versions}
import io.iohk.ethereum.utils.Config
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration._

class BlockBroadcastSpec extends FlatSpec with Matchers {

Expand Down Expand Up @@ -104,12 +106,27 @@ class BlockBroadcastSpec extends FlatSpec with Matchers {
etcPeerManagerProbe.expectNoMsg()
}

it should "not broadcast NewBlockHashes message when disable by configuration" in new TestSetup {
val updatedConfig = syncConfig.copy(broadcastNewBlockHashes = false)
override val blockBroadcast = new BlockBroadcast(etcPeerManagerProbe.ref, updatedConfig)

val blockHeader: BlockHeader = baseBlockHeader.copy(number = initialPeerInfo.maxBlockNumber + 1)
val newBlock = NewBlock(Block(blockHeader, BlockBody(Nil, Nil)), initialPeerInfo.totalDifficulty + 1)

blockBroadcast.broadcastBlock(newBlock, Map(peer -> initialPeerInfo))

etcPeerManagerProbe.expectMsg(EtcPeerManagerActor.SendMessage(newBlock, peer.id))
etcPeerManagerProbe.expectNoMsg(100.millis)
}

trait TestSetup {
implicit val system = ActorSystem("BlockBroadcastSpec_System")

val etcPeerManagerProbe = TestProbe()

val blockBroadcast = new BlockBroadcast(etcPeerManagerProbe.ref)
val syncConfig = Config.SyncConfig(Config.config)

val blockBroadcast = new BlockBroadcast(etcPeerManagerProbe.ref, syncConfig)

val baseBlockHeader = Fixtures.Blocks.Block3125369.header

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ class RegularSyncSpec extends TestKit(ActorSystem("RegularSync_system")) with Wo
maxQueuedBlockNumberBehind = 10,
maxNewBlockHashAge = 20,
maxNewHashes = 64,
broadcastNewBlockHashes = true,
redownloadMissingStateNodes = true,
fastSyncBlockValidationK = 100,
fastSyncBlockValidationN = 2048,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ class SyncControllerSpec extends FlatSpec with Matchers with BeforeAndAfter with
) extends EphemBlockchainTestSetup {

//+ cake overrides
override implicit lazy val system: ActorSystem = SyncControllerSpec.this.system

override lazy val vm: VMImpl = new VMImpl

override lazy val validators: Validators = _validators
Expand Down Expand Up @@ -512,6 +514,7 @@ class SyncControllerSpec extends FlatSpec with Matchers with BeforeAndAfter with
maxQueuedBlockNumberBehind = 10,
maxNewBlockHashAge = 20,
maxNewHashes = 64,
broadcastNewBlockHashes = true,
redownloadMissingStateNodes = false,
fastSyncBlockValidationK = 100,
fastSyncBlockValidationN = 2048,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class DifficultySpec extends FlatSpec with Matchers with PropertyChecks {
transactionsRoot = ByteString(Hex.decode("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")),
receiptsRoot = ByteString(Hex.decode("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")),
logsBloom = ByteString(Hex.decode("00" * 256)),
difficulty = 0,
difficulty = 1, // this must be equal to the result of whatever calculation atomix-raft applies
number = 20,
gasLimit = 131620495,
gasUsed = 0,
Expand All @@ -36,7 +36,7 @@ class DifficultySpec extends FlatSpec with Matchers with PropertyChecks {
transactionsRoot = ByteString(Hex.decode("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")),
receiptsRoot = ByteString(Hex.decode("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")),
logsBloom = ByteString(Hex.decode("00" * 256)),
difficulty = 0,
difficulty = 0, // does not matter for parent
number = 19,
gasLimit = 131749155,
gasUsed = 0,
Expand All @@ -57,6 +57,6 @@ class DifficultySpec extends FlatSpec with Matchers with PropertyChecks {
val result = blockHeaderValidator.validate(blockHeader, parentHeader)

result shouldBe Right(BlockHeaderValid)
difficulty shouldBe 0
difficulty shouldBe 1
}
}
3 changes: 3 additions & 0 deletions src/universal/conf/consensus.conf
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ mantis {
# This is the section dedicated to `atomix-raft` consensus.
# This consensus protocol is selected by setting `mantis.consensus.protocol = atomix-raft`.
atomix-raft {
# Determines how often new blocks will be forged
# block-forging-delay = 15.seconds

# Represents this node.
#
# ID and PORT are not mandatory.
Expand Down