Skip to content

Commit b65aa0c

Browse files
authored
Support responding to new message types (#204)
* feat: update message protobuf to support new message types * fix: case of protobuf fields * feat: include pending bytes in response message (#205) * Feat/have new msg types (#211) * feat: SortedMap * refactor: make SortedMap more efficient * feat: add request queue * feat: support want-have & want-block in engine * fix: block presences deserialization * fix: ensure blocks are in blockstore before adding them to engine request queue * fix: reduce engine queue contention * feat: listen on bitswap/1.2.0 (#212) * fix: several engine fixes * refactor: rename some things * docs: add some docs to task merger * refactor: more efficient block receive * docs: add message comments * refactor: better var names in merger * fix: merge with master
1 parent 936f899 commit b65aa0c

24 files changed

+2639
-280
lines changed

src/decision-engine/index.js

Lines changed: 228 additions & 116 deletions
Large diffs are not rendered by default.

src/decision-engine/ledger.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ class Ledger {
2828
this.accounting.bytesRecv += n
2929
}
3030

31-
wants (cid, priority) {
32-
this.wantlist.add(cid, priority)
31+
wants (cid, priority, wantType) {
32+
this.wantlist.add(cid, priority, wantType)
3333
}
3434

3535
cancelWant (cid) {

src/decision-engine/req-queue.js

Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
'use strict'
2+
3+
const SortedMap = require('../utils/sorted-map')
4+
5+
/**
6+
* @typedef {Object} Task
7+
* @property {string} topic - a name for the Task (like an id but not necessarily unique)
8+
* @property {number} priority - tasks are ordered by priority per peer
9+
* @property {number} size - the size of the task, eg the number of bytes in a block
10+
*/
11+
12+
/**
13+
* @typedef {Object} TaskMerger
14+
* @property {function(task, tasksWithTopic)} hasNewInfo - given the existing
15+
* tasks with the same topic, does the task add some new information?
16+
* Used to decide whether to merge the task or ignore it.
17+
* @property {function(task, existingTask)} merge - merge the information from
18+
* the given task into the existing task (with the same topic)
19+
*/
20+
21+
/**
22+
* The task merger that is used by default.
23+
* Assumes that new tasks do not add any information over existing tasks,
24+
* and doesn't try to merge.
25+
*/
26+
const DefaultTaskMerger = {
27+
hasNewInfo () {
28+
return false
29+
},
30+
31+
merge () {}
32+
}
33+
34+
/**
35+
* Queue of requests to be processed by the engine.
36+
* The requests from each peer are added to the peer's queue, sorted by
37+
* priority.
38+
* Tasks are popped in priority order from the best peer - see popTasks()
39+
* for more details.
40+
*/
41+
class RequestQueue {
42+
/**
43+
* @param {TaskMerger} taskMerger
44+
*/
45+
constructor (taskMerger) {
46+
this._taskMerger = taskMerger || DefaultTaskMerger
47+
this._byPeer = new SortedMap([], PeerTasks.compare, true)
48+
}
49+
50+
/**
51+
* Push tasks onto the queue for the given peer
52+
* @param {PeerId} peerId
53+
* @param {Task} tasks
54+
*/
55+
pushTasks (peerId, tasks) {
56+
let peerTasks = this._byPeer.get(peerId.toB58String())
57+
if (peerTasks) {
58+
peerTasks.pushTasks(tasks)
59+
return
60+
}
61+
62+
peerTasks = new PeerTasks(peerId, this._taskMerger)
63+
peerTasks.pushTasks(tasks)
64+
this._byPeer.set(peerId.toB58String(), peerTasks)
65+
}
66+
67+
/**
68+
* Choose the peer with the least active work (or if all have the same active
69+
* work, the most pending tasks) and pop off the highest priority tasks until
70+
* the total size is at least targetMinBytes.
71+
* This puts the popped tasks into the "active" state, meaning they are
72+
* actively being processed (and cannot be modified).
73+
* @param {number} targetMinBytes - the minimum total size of tasks to pop
74+
* @returns {Object}
75+
*/
76+
popTasks (targetMinBytes) {
77+
if (this._byPeer.size === 0) {
78+
return { tasks: [], pendingSize: 0 }
79+
}
80+
81+
// Get the queue of tasks for the best peer and pop off tasks up to
82+
// targetMinBytes
83+
const peerTasks = this._head()
84+
const { tasks, pendingSize } = peerTasks.popTasks(targetMinBytes)
85+
if (tasks.length === 0) {
86+
return { tasks, pendingSize }
87+
}
88+
89+
const peerId = peerTasks.peerId
90+
if (peerTasks.isIdle()) {
91+
// If there are no more tasks for the peer, free up its memory
92+
this._byPeer.delete(peerId.toB58String())
93+
} else {
94+
// If there are still tasks remaining, update the sort order of peerTasks
95+
// (because it depends on the number of pending tasks)
96+
this._byPeer.update(0)
97+
}
98+
99+
return {
100+
peerId, tasks, pendingSize
101+
}
102+
}
103+
104+
_head () {
105+
for (const [, v] of this._byPeer) {
106+
return v
107+
}
108+
return undefined
109+
}
110+
111+
/**
112+
* Remove the task with the given topic for the given peer.
113+
* @param {string} topic
114+
* @param {PeerId} peerId
115+
*/
116+
remove (topic, peerId) {
117+
const peerTasks = this._byPeer.get(peerId.toB58String())
118+
peerTasks && peerTasks.remove(topic)
119+
}
120+
121+
/**
122+
* Called when the tasks for the given peer complete.
123+
* @param {PeerId} peerId
124+
* @param {Task[]} tasks
125+
*/
126+
tasksDone (peerId, tasks) {
127+
const peerTasks = this._byPeer.get(peerId.toB58String())
128+
if (!peerTasks) {
129+
return
130+
}
131+
132+
const i = this._byPeer.indexOf(peerId.toB58String())
133+
for (const task of tasks) {
134+
peerTasks.taskDone(task)
135+
}
136+
137+
// Marking the tasks as done takes them out of the "active" state, and the
138+
// sort order depends on the size of the active tasks, so we need to update
139+
// the order.
140+
this._byPeer.update(i)
141+
}
142+
}
143+
144+
/**
145+
* Queue of tasks for a particular peer, sorted by priority.
146+
*/
147+
class PeerTasks {
148+
/**
149+
* @param {PeerId} peerId
150+
* @param {TaskMerger} taskMerger
151+
*/
152+
constructor (peerId, taskMerger) {
153+
this.peerId = peerId
154+
this._taskMerger = taskMerger
155+
this._activeTotalSize = 0
156+
this._pending = new PendingTasks()
157+
this._active = new Set()
158+
}
159+
160+
/**
161+
* Push tasks onto the queue.
162+
* @param {Task[]} tasks
163+
*/
164+
pushTasks (tasks) {
165+
for (const t of tasks) {
166+
this._pushTask(t)
167+
}
168+
}
169+
170+
_pushTask (task) {
171+
// If the new task doesn't add any more information over what we
172+
// already have in the active queue, then we can skip the new task
173+
if (!this._taskHasMoreInfoThanActiveTasks(task)) {
174+
return
175+
}
176+
177+
// If there is already a non-active (pending) task with this topic
178+
const existingTask = this._pending.get(task.topic)
179+
if (existingTask) {
180+
// If the new task has a higher priority than the old task,
181+
if (task.priority > existingTask.priority) {
182+
// Update the priority and the task's position in the queue
183+
this._pending.updatePriority(task.topic, task.priority)
184+
}
185+
186+
// Merge the information from the new task into the existing task
187+
this._taskMerger.merge(task, existingTask)
188+
189+
// A task with the topic exists, so we don't need to add
190+
// the new task to the queue
191+
return
192+
}
193+
194+
// Push the new task onto the queue
195+
this._pending.add(task)
196+
}
197+
198+
// Indicates whether the new task adds any more information over tasks that are
199+
// already in the active task queue
200+
_taskHasMoreInfoThanActiveTasks (task) {
201+
const tasksWithTopic = []
202+
for (const activeTask of this._active) {
203+
if (activeTask.topic === task.topic) {
204+
tasksWithTopic.push(activeTask)
205+
}
206+
}
207+
208+
// No tasks with that topic, so the new task adds information
209+
if (tasksWithTopic.length === 0) {
210+
return true
211+
}
212+
213+
return this._taskMerger.hasNewInfo(task, tasksWithTopic)
214+
}
215+
216+
/**
217+
* Pop tasks off the queue such that the total size is at least targetMinBytes
218+
* @param {number} targetMinBytes
219+
* @returns {Object}
220+
*/
221+
popTasks (targetMinBytes) {
222+
let size = 0
223+
const tasks = []
224+
225+
// Keep popping tasks until we get up to targetMinBytes (or one item over
226+
// targetMinBytes)
227+
const pendingTasks = this._pending.tasks()
228+
for (let i = 0; i < pendingTasks.length && size < targetMinBytes; i++) {
229+
const task = pendingTasks[i]
230+
tasks.push(task)
231+
size += task.size
232+
233+
// Move tasks from pending to active
234+
this._pending.delete(task.topic)
235+
this._activeTotalSize += task.size
236+
this._active.add(task)
237+
}
238+
239+
return {
240+
tasks, pendingSize: this._pending.totalSize
241+
}
242+
}
243+
244+
/**
245+
* Called when a task completes.
246+
* Note: must be the same reference as returned from popTasks.
247+
* @param {Task} task
248+
*/
249+
taskDone (task) {
250+
if (this._active.has(task)) {
251+
this._activeTotalSize -= task.size
252+
this._active.delete(task)
253+
}
254+
}
255+
256+
/**
257+
* Remove pending tasks with the given topic
258+
* @param {string} topic
259+
*/
260+
remove (topic) {
261+
this._pending.delete(topic)
262+
}
263+
264+
/**
265+
* No work to be done, this PeerTasks object can be freed.
266+
* @returns {boolean}
267+
*/
268+
isIdle () {
269+
return this._pending.length === 0 && this._active.length === 0
270+
}
271+
272+
// Compare PeerTasks
273+
static compare (a, b) {
274+
// Move peers with no pending tasks to the back of the queue
275+
if (a[1]._pending.length === 0) {
276+
return 1
277+
}
278+
if (b[1]._pending.length === 0) {
279+
return -1
280+
}
281+
282+
// If the amount of active work is the same
283+
if (a[1]._activeTotalSize === b[1]._activeTotalSize) {
284+
// Choose the peer with the most pending work
285+
return b[1]._pending.length - a[1]._pending.length
286+
}
287+
288+
// Choose the peer with the least amount of active work ("keep peers busy")
289+
return a[1]._activeTotalSize - b[1]._activeTotalSize
290+
}
291+
}
292+
293+
/**
294+
* Queue of pending tasks for a particular peer, sorted by priority.
295+
*/
296+
class PendingTasks {
297+
constructor () {
298+
this._tasks = new SortedMap([], this._compare)
299+
}
300+
301+
get length () {
302+
return this._tasks.size
303+
}
304+
305+
// Sum of the size of all pending tasks
306+
get totalSize () {
307+
return [...this._tasks.values()].reduce((a, t) => a + t.task.size, 0)
308+
}
309+
310+
get (topic) {
311+
return (this._tasks.get(topic) || {}).task
312+
}
313+
314+
add (task) {
315+
this._tasks.set(task.topic, {
316+
created: Date.now(),
317+
task
318+
})
319+
}
320+
321+
delete (topic) {
322+
this._tasks.delete(topic)
323+
}
324+
325+
// All pending tasks, in priority order
326+
tasks () {
327+
return [...this._tasks.values()].map(i => i.task)
328+
}
329+
330+
// Update the priority of the task with the given topic, and update the order
331+
updatePriority (topic, priority) {
332+
const obj = this._tasks.get(topic)
333+
if (!obj) {
334+
return
335+
}
336+
337+
const i = this._tasks.indexOf(topic)
338+
obj.task.priority = priority
339+
this._tasks.update(i)
340+
}
341+
342+
// Sort by priority desc then FIFO
343+
_compare (a, b) {
344+
if (a[1].task.priority === b[1].task.priority) {
345+
// FIFO
346+
return a[1].created - b[1].created
347+
}
348+
// Priority high -> low
349+
return b[1].task.priority - a[1].task.priority
350+
}
351+
}
352+
353+
module.exports = RequestQueue

0 commit comments

Comments
 (0)