Skip to content

Commit 8156282

Browse files
committed
feat: add request queue
1 parent ce382d4 commit 8156282

File tree

6 files changed

+1197
-8
lines changed

6 files changed

+1197
-8
lines changed

src/decision-engine/req-queue.js

Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
'use strict'
2+
3+
const SortedMap = require('../utils/sorted-map')
4+
5+
/**
6+
* @typedef {Object} Task
7+
* @property {string} topic - a name for the Task (like an id but not necessarily unique)
8+
* @property {number} priority - tasks are ordered by priority per peer
9+
* @property {number} size - the size of the task, eg the number of bytes in a block
10+
*/
11+
12+
/**
13+
* @typedef {Object} TaskMerger
14+
* @property {function(task, tasksWithTopic)} hasNewInfo - given the existing
15+
* tasks with the same topic, does the task add some new information?
16+
* Used to decide whether to merge the task or ignore it.
17+
* @property {function(task, existingTask)} merge - merge the information from
18+
* the given task into the existing task (with the same topic)
19+
*/
20+
21+
/**
22+
* The task merger that is used by default.
23+
* Assumes that new tasks do not add any information over existing tasks,
24+
* and doesn't try to merge.
25+
*/
26+
const DefaultTaskMerger = {
27+
hasNewInfo () {
28+
return false
29+
},
30+
31+
merge () {}
32+
}
33+
34+
/**
35+
* Queue of requests to be processed by the engine.
36+
* The requests from each peer are added to the peer's queue, sorted by
37+
* priority.
38+
* Tasks are popped in priority order from the best peer - see popTasks()
39+
* for more details.
40+
*/
41+
class RequestQueue {
42+
/**
43+
* @param {TaskMerger} taskMerger
44+
*/
45+
constructor (taskMerger) {
46+
this._taskMerger = taskMerger || DefaultTaskMerger
47+
this._peerTasks = new SortedMap([], PeerTasks.compare, true)
48+
}
49+
50+
/**
51+
* Push tasks onto the queue for the given peer
52+
* @param {PeerId} peerId
53+
* @param {Task} tasks
54+
*/
55+
pushTasks (peerId, tasks) {
56+
let peerTasks = this._peerTasks.get(peerId.toB58String())
57+
if (peerTasks) {
58+
peerTasks.pushTasks(tasks)
59+
return
60+
}
61+
62+
peerTasks = new PeerTasks(peerId, this._taskMerger)
63+
peerTasks.pushTasks(tasks)
64+
this._peerTasks.set(peerId.toB58String(), peerTasks)
65+
}
66+
67+
/**
68+
* Choose the peer with the least active work (or if all have the same active
69+
* work, the most pending tasks) and pop off the highest priority tasks until
70+
* the total size is at least targetMinSize.
71+
* This puts the popped tasks into the "active" state, meaning they are
72+
* actively being processed (and cannot be modified).
73+
* @param {number} targetMinSize - the minimum total size of tasks to pop
74+
* @returns {Object}
75+
*/
76+
popTasks (targetMinSize) {
77+
if (this._peerTasks.size === 0) {
78+
return { tasks: [], pendingSize: 0 }
79+
}
80+
81+
// Get the queue of tasks for the best peer and pop off tasks up to
82+
// targetMinSize
83+
const peerTasks = this._head()
84+
const { tasks, pendingSize } = peerTasks.popTasks(targetMinSize)
85+
if (tasks.length === 0) {
86+
return { tasks, pendingSize }
87+
}
88+
89+
const peerId = peerTasks.peerId
90+
if (peerTasks.isIdle()) {
91+
// If there are no more tasks for the peer, free up its memory
92+
this._peerTasks.delete(peerId.toB58String())
93+
} else {
94+
// If there are still tasks remaining, update the sort order of peerTasks
95+
// (because it depends on the number of pending tasks)
96+
this._peerTasks.update(0)
97+
}
98+
99+
return {
100+
peerId, tasks, pendingSize
101+
}
102+
}
103+
104+
_head () {
105+
for (const [, v] of this._peerTasks) {
106+
return v
107+
}
108+
return undefined
109+
}
110+
111+
/**
112+
* Remove the task with the given topic for the given peer.
113+
* @param {string} topic
114+
* @param {PeerId} peerId
115+
*/
116+
remove (topic, peerId) {
117+
const peerTasks = this._peerTasks.get(peerId.toB58String())
118+
peerTasks && peerTasks.remove(topic)
119+
}
120+
121+
/**
122+
* Called when the tasks for the given peer complete.
123+
* @param {PeerId} peerId
124+
* @param {Task[]} tasks
125+
*/
126+
tasksDone (peerId, tasks) {
127+
const peerTasks = this._peerTasks.get(peerId.toB58String())
128+
if (!peerTasks) {
129+
return
130+
}
131+
132+
const i = this._peerTasks.indexOf(peerId.toB58String())
133+
for (const task of tasks) {
134+
peerTasks.taskDone(task)
135+
}
136+
137+
// Marking the tasks as done takes them out of the "active" state, and the
138+
// sort order depends on the size of the active tasks, so we need to update
139+
// the order.
140+
this._peerTasks.update(i)
141+
}
142+
}
143+
144+
/**
145+
* Queue of tasks for a particular peer, sorted by priority.
146+
*/
147+
class PeerTasks {
148+
/**
149+
* @param {PeerId} peerId
150+
* @param {TaskMerger} taskMerger
151+
*/
152+
constructor (peerId, taskMerger) {
153+
this.peerId = peerId
154+
this._taskMerger = taskMerger
155+
this._activeSize = 0
156+
this._pending = new PendingTasks()
157+
this._active = new Set()
158+
}
159+
160+
/**
161+
* Push tasks onto the queue.
162+
* @param {Task[]} tasks
163+
*/
164+
pushTasks (tasks) {
165+
for (const t of tasks) {
166+
this._pushTask(t)
167+
}
168+
}
169+
170+
_pushTask (task) {
171+
// If the new task doesn't add any more information over what we
172+
// already have in the active queue, then we can skip the new task
173+
if (!this._taskHasMoreInfoThanActiveTasks(task)) {
174+
return
175+
}
176+
177+
// If there is already a non-active (pending) task with this topic
178+
const existingTask = this._pending.get(task.topic)
179+
if (existingTask) {
180+
// If the new task has a higher priority than the old task,
181+
if (task.priority > existingTask.priority) {
182+
// Update the priority and the task's position in the queue
183+
this._pending.updatePriority(task.topic, task.priority)
184+
}
185+
186+
// Merge the information from the new task into the existing task
187+
this._taskMerger.merge(task, existingTask)
188+
189+
// A task with the topic exists, so we don't need to add
190+
// the new task to the queue
191+
return
192+
}
193+
194+
// Push the new task onto the queue
195+
this._pending.add(task)
196+
}
197+
198+
// Indicates whether the new task adds any more information over tasks that are
199+
// already in the active task queue
200+
_taskHasMoreInfoThanActiveTasks (task) {
201+
const tasksWithTopic = []
202+
for (const activeTask of this._active) {
203+
if (activeTask.topic === task.topic) {
204+
tasksWithTopic.push(activeTask)
205+
}
206+
}
207+
208+
// No tasks with that topic, so the new task adds information
209+
if (tasksWithTopic.length === 0) {
210+
return true
211+
}
212+
213+
return this._taskMerger.hasNewInfo(task, tasksWithTopic)
214+
}
215+
216+
/**
217+
* Pop tasks off the queue such that the total size is at least targetMinSize
218+
* @param {number} targetMinSize
219+
* @returns {Object}
220+
*/
221+
popTasks (targetMinSize) {
222+
let size = 0
223+
const tasks = []
224+
225+
// Keep popping tasks until we get to targetMinSize
226+
const pendingTasks = this._pending.tasks()
227+
for (let i = 0; i < pendingTasks.length && size < targetMinSize; i++) {
228+
const task = pendingTasks[i]
229+
tasks.push(task)
230+
size += task.size
231+
232+
// Move tasks from pending to active
233+
this._pending.delete(task.topic)
234+
this._activeSize += task.size
235+
this._active.add(task)
236+
}
237+
238+
return {
239+
tasks, pendingSize: this._pending.totalSize
240+
}
241+
}
242+
243+
/**
244+
* Called when a task completes.
245+
* Note: must be the same reference as returned from popTasks.
246+
* @param {Task} task
247+
*/
248+
taskDone (task) {
249+
if (this._active.has(task)) {
250+
this._activeSize -= task.size
251+
this._active.delete(task)
252+
}
253+
}
254+
255+
/**
256+
* Remove pending tasks with the given topic
257+
* @param {string} topic
258+
*/
259+
remove (topic) {
260+
this._pending.delete(topic)
261+
}
262+
263+
/**
264+
* No work to be done, this PeerTasks object can be freed.
265+
* @returns {boolean}
266+
*/
267+
isIdle () {
268+
return this._pending.length === 0 && this._active.length === 0
269+
}
270+
271+
// Compare PeerTasks
272+
static compare (a, b) {
273+
// Move peers with no pending tasks to the back of the queue
274+
if (a[1]._pending.length === 0) {
275+
return 1
276+
}
277+
if (b[1]._pending.length === 0) {
278+
return -1
279+
}
280+
281+
// If the amount of active work is the same
282+
if (a[1]._activeSize === b[1]._activeSize) {
283+
// Choose the peer with the most pending work
284+
return b[1]._pending.length - a[1]._pending.length
285+
}
286+
287+
// Choose the peer with the least amount of active work ("keep peers busy")
288+
return a[1]._activeSize - b[1]._activeSize
289+
}
290+
}
291+
292+
/**
293+
* Queue of pending tasks for a particular peer, sorted by priority.
294+
*/
295+
class PendingTasks {
296+
constructor () {
297+
this._tasks = new SortedMap([], this._compare)
298+
}
299+
300+
get length () {
301+
return this._tasks.size
302+
}
303+
304+
// Sum of the size of all pending tasks
305+
get totalSize () {
306+
return [...this._tasks.values()].reduce((a, t) => a + t.task.size, 0)
307+
}
308+
309+
get (topic) {
310+
return (this._tasks.get(topic) || {}).task
311+
}
312+
313+
add (task) {
314+
this._tasks.set(task.topic, {
315+
created: Date.now(),
316+
task
317+
})
318+
}
319+
320+
delete (topic) {
321+
this._tasks.delete(topic)
322+
}
323+
324+
// All pending tasks, in priority order
325+
tasks () {
326+
return [...this._tasks.values()].map(i => i.task)
327+
}
328+
329+
// Update the priority of the task with the given topic, and update the order
330+
updatePriority (topic, priority) {
331+
const obj = this._tasks.get(topic)
332+
if (!obj) {
333+
return
334+
}
335+
336+
const i = this._tasks.indexOf(topic)
337+
obj.task.priority = priority
338+
this._tasks.update(i)
339+
}
340+
341+
// Sort by priority desc then FIFO
342+
_compare (a, b) {
343+
if (a[1].task.priority === b[1].task.priority) {
344+
// FIFO
345+
return a[1].created - b[1].created
346+
}
347+
// Priority high -> low
348+
return b[1].task.priority - a[1].task.priority
349+
}
350+
}
351+
352+
module.exports = RequestQueue

0 commit comments

Comments
 (0)