Skip to content

Commit 0300390

Browse files
KonradStanieckapke
authored andcommitted
[ETCM-275] Pr comments
Move DownloaderState to separate file Extract Actor state to separate class Call Future.apply at call site Improve synccontrollerspec autopilot
1 parent 56a9ea9 commit 0300390

File tree

6 files changed

+527
-548
lines changed

6 files changed

+527
-548
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package io.iohk.ethereum.blockchain.sync
2+
3+
import akka.util.ByteString
4+
import cats.data.NonEmptyList
5+
import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.SyncResponse
6+
import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{
7+
NoUsefulDataInResponse,
8+
PeerRequest,
9+
ResponseProcessingResult,
10+
UnrequestedResponse,
11+
UsefulData
12+
}
13+
import io.iohk.ethereum.crypto.kec256
14+
import io.iohk.ethereum.network.{Peer, PeerId}
15+
import io.iohk.ethereum.network.p2p.messages.PV63.NodeData
16+
17+
import scala.annotation.tailrec
18+
19+
final case class DownloaderState(
20+
activeRequests: Map[PeerId, NonEmptyList[ByteString]],
21+
nodesToGet: Map[ByteString, Option[PeerId]]
22+
) {
23+
lazy val nonDownloadedNodes = nodesToGet.collect {
24+
case (hash, maybePeer) if maybePeer.isEmpty => hash
25+
}.toSeq
26+
27+
def scheduleNewNodesForRetrieval(nodes: Seq[ByteString]): DownloaderState = {
28+
val newNodesToGet = nodes.foldLeft(nodesToGet) { case (map, node) =>
29+
if (map.contains(node)) {
30+
map
31+
} else {
32+
map + (node -> None)
33+
}
34+
}
35+
36+
copy(nodesToGet = newNodesToGet)
37+
}
38+
39+
private def addActiveRequest(peerRequest: PeerRequest): DownloaderState = {
40+
val newNodesToget = peerRequest.nodes.foldLeft(nodesToGet) { case (map, node) =>
41+
map + (node -> Some(peerRequest.peer.id))
42+
}
43+
44+
copy(activeRequests = activeRequests + (peerRequest.peer.id -> peerRequest.nodes), nodesToGet = newNodesToget)
45+
}
46+
47+
def handleRequestFailure(from: Peer): DownloaderState = {
48+
activeRequests
49+
.get(from.id)
50+
.map { requestedNodes =>
51+
val newNodesToGet = requestedNodes.foldLeft(nodesToGet) { case (map, node) =>
52+
map + (node -> None)
53+
}
54+
55+
copy(activeRequests = activeRequests - from.id, nodesToGet = newNodesToGet)
56+
}
57+
.getOrElse(this)
58+
}
59+
60+
/**
61+
* Responses from peers should be delivered in order, but can contain gaps or can be not full, so we cannot fail
62+
* on first not matching response.
63+
* Matched responses are returned in correct order, the hashes to be rescheduled are returned in no particular order
64+
* as they will either way end up in map of hashes to be re-downloaded
65+
*/
66+
def process(
67+
requested: NonEmptyList[ByteString],
68+
received: NonEmptyList[ByteString]
69+
): (List[ByteString], List[SyncResponse]) = {
70+
@tailrec
71+
def go(
72+
remainingRequestedHashes: List[ByteString],
73+
nextResponse: SyncResponse,
74+
remainingResponses: List[ByteString],
75+
nonReceivedRequested: List[ByteString],
76+
processed: List[SyncResponse]
77+
): (List[ByteString], List[SyncResponse]) = {
78+
if (remainingRequestedHashes.isEmpty) {
79+
(nonReceivedRequested, processed.reverse)
80+
} else {
81+
val nextRequestedHash = remainingRequestedHashes.head
82+
if (nextRequestedHash == nextResponse.hash) {
83+
if (remainingResponses.isEmpty) {
84+
val finalNonReceived = remainingRequestedHashes.tail ::: nonReceivedRequested
85+
val finalProcessed = nextResponse :: processed
86+
(finalNonReceived, finalProcessed.reverse)
87+
} else {
88+
val nexExpectedResponse = SyncResponse(kec256(remainingResponses.head), remainingResponses.head)
89+
go(
90+
remainingRequestedHashes.tail,
91+
nexExpectedResponse,
92+
remainingResponses.tail,
93+
nonReceivedRequested,
94+
nextResponse :: processed
95+
)
96+
}
97+
} else {
98+
go(
99+
remainingRequestedHashes.tail,
100+
nextResponse,
101+
remainingResponses,
102+
nextRequestedHash :: nonReceivedRequested,
103+
processed
104+
)
105+
}
106+
}
107+
}
108+
109+
val firstReceivedResponse = SyncResponse(kec256(received.head), received.head)
110+
111+
go(requested.toList, firstReceivedResponse, received.tail, List.empty, List.empty)
112+
}
113+
114+
def handleRequestSuccess(from: Peer, receivedMessage: NodeData): (ResponseProcessingResult, DownloaderState) = {
115+
activeRequests
116+
.get(from.id)
117+
.map { requestedHashes =>
118+
if (receivedMessage.values.isEmpty) {
119+
val rescheduleRequestedHashes = requestedHashes.foldLeft(nodesToGet) { case (map, hash) =>
120+
map + (hash -> None)
121+
}
122+
(
123+
NoUsefulDataInResponse,
124+
copy(activeRequests = activeRequests - from.id, nodesToGet = rescheduleRequestedHashes)
125+
)
126+
} else {
127+
val (notReceived, received) =
128+
process(requestedHashes, NonEmptyList.fromListUnsafe(receivedMessage.values.toList))
129+
if (received.isEmpty) {
130+
val rescheduleRequestedHashes = notReceived.foldLeft(nodesToGet) { case (map, hash) =>
131+
map + (hash -> None)
132+
}
133+
(
134+
NoUsefulDataInResponse,
135+
copy(activeRequests = activeRequests - from.id, nodesToGet = rescheduleRequestedHashes)
136+
)
137+
} else {
138+
val afterNotReceive = notReceived.foldLeft(nodesToGet) { case (map, hash) => map + (hash -> None) }
139+
val afterReceived = received.foldLeft(afterNotReceive) { case (map, received) => map - received.hash }
140+
(UsefulData(received), copy(activeRequests = activeRequests - from.id, nodesToGet = afterReceived))
141+
}
142+
}
143+
}
144+
.getOrElse((UnrequestedResponse, this))
145+
}
146+
147+
def assignTasksToPeers(
148+
peers: NonEmptyList[Peer],
149+
newNodes: Option[Seq[ByteString]],
150+
nodesPerPeerCapacity: Int
151+
): (Seq[PeerRequest], DownloaderState) = {
152+
@tailrec
153+
def go(
154+
peersRemaining: List[Peer],
155+
nodesRemaining: Seq[ByteString],
156+
createdRequests: List[PeerRequest],
157+
currentState: DownloaderState
158+
): (Seq[PeerRequest], DownloaderState) = {
159+
if (peersRemaining.isEmpty || nodesRemaining.isEmpty) {
160+
(createdRequests.reverse, currentState.scheduleNewNodesForRetrieval(nodesRemaining))
161+
} else {
162+
val nextPeer = peersRemaining.head
163+
val (nodes, nodesAfterAssignment) = nodesRemaining.splitAt(nodesPerPeerCapacity)
164+
val peerRequest = PeerRequest(nextPeer, NonEmptyList.fromListUnsafe(nodes.toList))
165+
go(
166+
peersRemaining.tail,
167+
nodesAfterAssignment,
168+
peerRequest :: createdRequests,
169+
currentState.addActiveRequest(peerRequest)
170+
)
171+
}
172+
}
173+
174+
val currentNodesToDeliver = newNodes.map(nodes => nonDownloadedNodes ++ nodes).getOrElse(nonDownloadedNodes)
175+
if (currentNodesToDeliver.isEmpty) {
176+
(Seq(), this)
177+
} else {
178+
go(peers.toList, currentNodesToDeliver, List.empty, this)
179+
}
180+
}
181+
182+
}
183+
184+
object DownloaderState {
185+
def apply(): DownloaderState = new DownloaderState(Map.empty, Map.empty)
186+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package io.iohk.ethereum.blockchain.sync
2+
3+
import akka.actor.ActorRef
4+
import akka.util.ByteString
5+
import cats.data.NonEmptyList
6+
import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.{ProcessingStatistics, SchedulerState}
7+
import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{PeerRequest, RequestResult}
8+
import io.iohk.ethereum.network.{Peer, PeerId}
9+
10+
import scala.collection.immutable.Queue
11+
12+
case class SyncSchedulerActorState(
13+
currentSchedulerState: SchedulerState,
14+
currentDownloaderState: DownloaderState,
15+
currentStats: ProcessingStatistics,
16+
targetBlock: BigInt,
17+
syncInitiator: ActorRef,
18+
nodesToProcess: Queue[RequestResult],
19+
processing: Boolean,
20+
restartRequested: Option[ActorRef]
21+
) {
22+
def hasRemainingPendingRequests: Boolean = currentSchedulerState.numberOfPendingRequests > 0
23+
def isProcessing: Boolean = processing
24+
def restartHasBeenRequested: Boolean = restartRequested.isDefined
25+
def withNewRequestResult(requestResult: RequestResult): SyncSchedulerActorState =
26+
copy(nodesToProcess = nodesToProcess.enqueue(requestResult))
27+
28+
def withNewProcessingResults(
29+
newSchedulerState: SchedulerState,
30+
newDownloaderState: DownloaderState,
31+
newStats: ProcessingStatistics
32+
): SyncSchedulerActorState = {
33+
copy(
34+
currentSchedulerState = newSchedulerState,
35+
currentDownloaderState = newDownloaderState,
36+
currentStats = newStats
37+
)
38+
}
39+
40+
def withNewDownloaderState(newDownloaderState: DownloaderState): SyncSchedulerActorState = {
41+
copy(currentDownloaderState = newDownloaderState)
42+
}
43+
44+
def withRestartRequested(restartRequester: ActorRef): SyncSchedulerActorState = {
45+
copy(restartRequested = Some(restartRequester))
46+
}
47+
48+
def initProcessing: SyncSchedulerActorState = {
49+
copy(processing = true)
50+
}
51+
52+
def finishProcessing: SyncSchedulerActorState = {
53+
copy(processing = false)
54+
}
55+
56+
def assignTasksToPeers(
57+
freePeers: NonEmptyList[Peer],
58+
nodesPerPeer: Int
59+
): (Seq[PeerRequest], SyncSchedulerActorState) = {
60+
val retryQueue = currentDownloaderState.nonDownloadedNodes
61+
val maxNewNodes = ((freePeers.size * nodesPerPeer) - retryQueue.size).max(0)
62+
val (newNodes, newState) = currentSchedulerState.getMissingHashes(maxNewNodes)
63+
val (requests, newDownloaderState) =
64+
currentDownloaderState.assignTasksToPeers(
65+
NonEmptyList.fromListUnsafe(freePeers.toList),
66+
Some(newNodes),
67+
nodesPerPeer
68+
)
69+
(requests, copy(currentSchedulerState = newState, currentDownloaderState = newDownloaderState))
70+
}
71+
72+
def getRequestToProcess: Option[(RequestResult, SyncSchedulerActorState)] = {
73+
nodesToProcess.dequeueOption.map { case (result, restOfResults) =>
74+
(result, copy(nodesToProcess = restOfResults))
75+
}
76+
}
77+
78+
def numberOfRemainingRequests: Int = nodesToProcess.size
79+
80+
def memBatch: Map[ByteString, (ByteString, SyncStateScheduler.RequestType)] = currentSchedulerState.memBatch
81+
82+
def activePeerRequests: Map[PeerId, NonEmptyList[ByteString]] = currentDownloaderState.activeRequests
83+
84+
override def toString: String = {
85+
s""" Status of mpt state sync:
86+
| Number of Pending requests: ${currentSchedulerState.numberOfPendingRequests},
87+
| Number of Missing hashes waiting to be retrieved: ${currentSchedulerState.queue.size()},
88+
| Number of Requests waiting for processing: ${nodesToProcess.size},
89+
| Number of Mpt nodes saved to database: ${currentStats.saved},
90+
| Number of duplicated hashes: ${currentStats.duplicatedHashes},
91+
| Number of not requested hashes: ${currentStats.notRequestedHashes},
92+
| Number of active peer requests: ${currentDownloaderState.activeRequests.size}
93+
""".stripMargin
94+
}
95+
}
96+
97+
object SyncSchedulerActorState {
98+
def initial(
99+
initialSchedulerState: SchedulerState,
100+
initialStats: ProcessingStatistics,
101+
targetBlock: BigInt,
102+
syncInitiator: ActorRef
103+
): SyncSchedulerActorState = {
104+
SyncSchedulerActorState(
105+
initialSchedulerState,
106+
DownloaderState(),
107+
initialStats,
108+
targetBlock,
109+
syncInitiator,
110+
Queue(),
111+
processing = false,
112+
restartRequested = None
113+
)
114+
115+
}
116+
}

0 commit comments

Comments
 (0)