Skip to content

Commit e0b5bec

Browse files
author
Michal Mrozek
committed
[ECTM-104] Pivot block selection algorithm
1 parent f421f79 commit e0b5bec

File tree

6 files changed

+1172
-419
lines changed

6 files changed

+1172
-419
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala

Lines changed: 215 additions & 128 deletions
Large diffs are not rendered by default.
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package io.iohk.ethereum.blockchain.sync
2+
3+
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Scheduler}
4+
import io.iohk.ethereum.domain.BlockHeader
5+
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId}
6+
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
7+
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
8+
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
9+
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier
10+
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockHeaders, GetBlockHeaders}
11+
import io.iohk.ethereum.utils.Config.SyncConfig
12+
import scala.concurrent.duration.FiniteDuration
13+
import scala.concurrent.ExecutionContext.Implicits.global
14+
15+
class FastSyncPivotBlockSelector(
16+
val etcPeerManager: ActorRef,
17+
val peerEventBus: ActorRef,
18+
val syncConfig: SyncConfig,
19+
val scheduler: Scheduler
20+
) extends Actor
21+
with ActorLogging
22+
with PeerListSupport
23+
with BlacklistSupport {
24+
25+
import FastSyncPivotBlockSelector._
26+
import syncConfig._
27+
28+
val fastSync: ActorRef = context.parent
29+
30+
def handleCommonMessages: Receive = handlePeerListMessages orElse handleBlacklistMessages
31+
32+
override def receive: Receive = idle
33+
34+
def idle: Receive = handleCommonMessages orElse { case ChoosePivotBlock =>
35+
val peersUsedToChooseTarget = peersToDownloadFrom.filter(_._2.forkAccepted)
36+
37+
if (peersUsedToChooseTarget.size >= minPeersToChooseTargetBlock) {
38+
peersUsedToChooseTarget.foreach { case (peer, PeerInfo(status, _, _, _, _)) =>
39+
peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id)))
40+
etcPeerManager ! EtcPeerManagerActor.SendMessage(
41+
GetBlockHeaders(Right(status.bestHash), 1, 0, reverse = false),
42+
peer.id
43+
)
44+
}
45+
log.debug("Asking {} peers for block headers", peersUsedToChooseTarget.size)
46+
val timeout = scheduler.scheduleOnce(peerResponseTimeout, self, BlockHeadersTimeout)
47+
context become waitingForBlockHeaders(peersUsedToChooseTarget.keySet, Map.empty, timeout)
48+
} else {
49+
log.info(
50+
"Cannot pick pivot block. Need at least {} peers, but there are only {} available at the moment. Retrying in {}",
51+
minPeersToChooseTargetBlock,
52+
peersUsedToChooseTarget.size,
53+
startRetryInterval
54+
)
55+
scheduleRetry(startRetryInterval)
56+
context become idle
57+
}
58+
}
59+
60+
def waitingForBlockHeaders(waitingFor: Set[Peer], received: Map[Peer, BlockHeader], timeout: Cancellable): Receive =
61+
handleCommonMessages orElse {
62+
case MessageFromPeer(BlockHeaders(Seq(blockHeader)), peerId) =>
63+
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peerId)))
64+
65+
val newWaitingFor = waitingFor.filterNot(_.id == peerId)
66+
67+
waitingFor.find(_.id == peerId).foreach { peer =>
68+
val newReceived = received + (peer -> blockHeader)
69+
70+
if (newWaitingFor.isEmpty) {
71+
timeout.cancel()
72+
tryChooseTargetBlock(newReceived)
73+
} else context become waitingForBlockHeaders(newWaitingFor, newReceived, timeout)
74+
}
75+
76+
case MessageFromPeer(BlockHeaders(blockHeaders), peerId) =>
77+
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peerId)))
78+
blacklist(
79+
peerId,
80+
blacklistDuration,
81+
s"did not respond with 1 header but with ${blockHeaders.size}, blacklisting for $blacklistDuration"
82+
)
83+
waitingFor.find(_.id == peerId).foreach { peer =>
84+
context become waitingForBlockHeaders(waitingFor - peer, received, timeout)
85+
}
86+
87+
case BlockHeadersTimeout =>
88+
waitingFor.foreach { peer =>
89+
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id)))
90+
blacklist(
91+
peer.id,
92+
blacklistDuration,
93+
s"did not respond within required time with block header, blacklisting for $blacklistDuration"
94+
)
95+
}
96+
tryChooseTargetBlock(received)
97+
}
98+
99+
def tryChooseTargetBlock(receivedHeaders: Map[Peer, BlockHeader]): Unit = {
100+
log.debug("Trying to choose fast sync pivot block. Received {} block headers", receivedHeaders.size)
101+
if (receivedHeaders.size >= minPeersToChooseTargetBlock) {
102+
103+
val peersWithBestHeaders = receivedHeaders.toList.sortBy(-_._2.number)
104+
105+
val bestPeerBestBlockNumber = peersWithBestHeaders.head._2.number
106+
val targetBlock = bestPeerBestBlockNumber - syncConfig.targetBlockOffset
107+
108+
val peersToAsk = peersWithBestHeaders.takeWhile(_._2.number >= targetBlock).map(_._1)
109+
110+
peersToAsk.foreach { peer =>
111+
peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id)))
112+
etcPeerManager ! EtcPeerManagerActor.SendMessage(
113+
GetBlockHeaders(Left(targetBlock), 1, 0, reverse = false),
114+
peer.id
115+
)
116+
}
117+
118+
val timeout = scheduler.scheduleOnce(peerResponseTimeout, self, PivotBlockTimeout)
119+
context become waitingForTargetBlock(peersToAsk.map(_.id).toSet, targetBlock, timeout, Map.empty)
120+
121+
} else {
122+
log.info(
123+
"Cannot pick pivot block. Need to receive block headers from at least {} peers, but received only from {}. Retrying in {}",
124+
minPeersToChooseTargetBlock,
125+
receivedHeaders.size,
126+
startRetryInterval
127+
)
128+
scheduleRetry(startRetryInterval)
129+
context become idle
130+
}
131+
}
132+
133+
def waitingForTargetBlock(
134+
peersToAsk: Set[PeerId],
135+
targetBlockNumber: BigInt,
136+
timeout: Cancellable,
137+
headers: Map[BlockHeader, Int]
138+
): Receive =
139+
handleCommonMessages orElse {
140+
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) =>
141+
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peerId)))
142+
val updatedPeersToAsk = peersToAsk - peerId
143+
val targetBlockHeaderOpt = blockHeaders.headers.find(header => header.number == targetBlockNumber)
144+
145+
targetBlockHeaderOpt match {
146+
case Some(targetBlockHeader) =>
147+
val newValue = headers.find(_._1 == targetBlockHeader).map(_._2 + 1).getOrElse(1)
148+
val updatedHeaders = headers.updated(targetBlockHeader, newValue)
149+
val (mostPopularBlockHeader, votes) = updatedHeaders.maxBy(_._2)
150+
if (votes >= minPeersToChooseTargetBlock) {
151+
timeout.cancel()
152+
sendResponseAndCleanup(mostPopularBlockHeader)
153+
} else if (updatedPeersToAsk.isEmpty) {
154+
timeout.cancel()
155+
log.info("Not enough votes for pivot block. Retrying in {}", startRetryInterval)
156+
scheduleRetry(startRetryInterval)
157+
context become idle
158+
} else {
159+
context become waitingForTargetBlock(updatedPeersToAsk, targetBlockNumber, timeout, updatedHeaders)
160+
}
161+
case None =>
162+
blacklist(peerId, blacklistDuration, "Did not respond with pivot block header, blacklisting")
163+
context become waitingForTargetBlock(updatedPeersToAsk, targetBlockNumber, timeout, headers)
164+
}
165+
case PivotBlockTimeout =>
166+
peersToAsk.foreach { peerId =>
167+
blacklist(peerId, blacklistDuration, "Did not respond with pivot block header (timeout), blacklisting")
168+
}
169+
peerEventBus ! Unsubscribe()
170+
log.info("Pivot block header receive timeout. Retrying in {}", startRetryInterval)
171+
scheduleRetry(startRetryInterval)
172+
context become idle
173+
}
174+
175+
def scheduleRetry(interval: FiniteDuration): Unit = {
176+
scheduler.scheduleOnce(interval, self, ChoosePivotBlock)
177+
}
178+
179+
def sendResponseAndCleanup(pivotBlockHeader: BlockHeader): Unit = {
180+
log.info("Found pivot block: {} hash: {}", pivotBlockHeader.number, pivotBlockHeader.hashAsHexString)
181+
fastSync ! Result(pivotBlockHeader)
182+
peerEventBus ! Unsubscribe()
183+
context stop self
184+
}
185+
186+
}
187+
188+
object FastSyncPivotBlockSelector {
189+
def props(etcPeerManager: ActorRef, peerEventBus: ActorRef, syncConfig: SyncConfig, scheduler: Scheduler): Props =
190+
Props(new FastSyncPivotBlockSelector(etcPeerManager: ActorRef, peerEventBus, syncConfig, scheduler))
191+
192+
case object ChoosePivotBlock
193+
case class Result(targetBlockHeader: BlockHeader)
194+
195+
private case object BlockHeadersTimeout
196+
private case object PivotBlockTimeout
197+
}

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSyncTargetBlockSelector.scala

Lines changed: 0 additions & 147 deletions
This file was deleted.

0 commit comments

Comments
 (0)