Skip to content

Commit 98d6514

Browse files
committed
fix: blocks may not be in the blockstore yet
Because some blockstore implementations batch up write before comitting them to disk or wherever, they may not be available when we think they should be. I noticed this module throwing unhandledPromiseRejections around this so the change here handles missing blocks by putting the task back into the queue to process later, with a fairly arbitrary 5x limit on processing a given task.
1 parent 936f899 commit 98d6514

File tree

3 files changed

+91
-3
lines changed

3 files changed

+91
-3
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
"promisify-es6": "^1.0.3",
7070
"rimraf": "^3.0.0",
7171
"safe-buffer": "^5.1.2",
72+
"sinon": "^9.0.0",
7273
"stats-lite": "^2.2.0",
7374
"uuid": "^3.3.2"
7475
},

src/decision-engine/index.js

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const Ledger = require('./ledger')
88
const { logger, groupBy, pullAllWith, uniqWith } = require('../utils')
99

1010
const MAX_MESSAGE_SIZE = 512 * 1024
11+
const MAX_TASK_ATTEMPTS = 5
1112

1213
class DecisionEngine {
1314
constructor (peerId, blockstore, network, stats) {
@@ -80,8 +81,41 @@ class DecisionEngine {
8081
const cids = entries.map((e) => e.cid)
8182
const uniqCids = uniqWith((a, b) => a.equals(b), cids)
8283
const groupedTasks = groupBy(task => task.target.toB58String(), tasks)
84+
const unresolvedCids = []
8385

84-
const blocks = await Promise.all(uniqCids.map(cid => this.blockstore.get(cid)))
86+
const blocks = (await Promise.all(
87+
uniqCids.map(async cid => {
88+
try {
89+
const block = await this.blockstore.get(cid)
90+
91+
return block
92+
} catch (err) {
93+
this._log.error(`Could not load block for cid ${cid}`, err)
94+
unresolvedCids.push(cid)
95+
}
96+
}))
97+
).filter(Boolean)
98+
99+
if (blocks.length !== uniqCids.length) {
100+
// put any tasks with unresolved CIDs back into the task queue
101+
unresolvedCids.forEach(cid => {
102+
this._tasks = this._tasks.concat(
103+
tasks
104+
.filter(task => task.entry.cid.equals(cid))
105+
.map(task => {
106+
task.attempt = (task.attempt || 0) + 1
107+
108+
if (task.attempt < MAX_TASK_ATTEMPTS) {
109+
return task
110+
}
111+
})
112+
.filter(Boolean)
113+
)
114+
})
115+
116+
// run queue again later
117+
this._outbox()
118+
}
85119

86120
await Promise.all(Object.values(groupedTasks).map(async (tasks) => {
87121
// all tasks in the group have the same target
@@ -99,8 +133,6 @@ class DecisionEngine {
99133
this.messageSent(peer, block)
100134
}
101135
}))
102-
103-
this._tasks = []
104136
}
105137

106138
wantlistForPeer (peerId) {

test/decision-engine/decision-engine.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ const range = require('lodash.range')
99
const difference = require('lodash.difference')
1010
const flatten = require('lodash.flatten')
1111
const Block = require('ipfs-block')
12+
const makeBlock = require('../utils/make-block')
1213
const CID = require('cids')
1314
const multihashing = require('multihashing-async')
1415
const Buffer = require('safe-buffer').Buffer
16+
const sinon = require('sinon')
1517

1618
const Message = require('../../src/types/message')
1719
const DecisionEngine = require('../../src/decision-engine')
@@ -197,4 +199,57 @@ describe('Engine', () => {
197199
.catch(reject)
198200
})
199201
})
202+
203+
it('handles the blockstore not having all the blocks', async () => {
204+
const block = await makeBlock()
205+
206+
const blockstore = {
207+
get: sinon.stub().withArgs(block.cid).throws(new Error('Not found'))
208+
}
209+
210+
const engine = new DecisionEngine(await PeerId.create({ bits: 512 }), blockstore)
211+
engine._running = true
212+
213+
engine._tasks.push({
214+
entry: {
215+
cid: block.cid
216+
},
217+
target: await PeerId.create({ bits: 512 })
218+
})
219+
220+
await engine._processTasks()
221+
222+
expect(engine).to.have.nested.deep.property('_tasks[0].entry.cid', block.cid)
223+
expect(engine).to.have.nested.deep.property('_tasks[0].attempt', 1)
224+
})
225+
226+
it('does not try to resolve blocks forever', async () => {
227+
const block = await makeBlock()
228+
229+
const blockstore = {
230+
get: sinon.stub().withArgs(block.cid).throws(new Error('Not found'))
231+
}
232+
233+
const engine = new DecisionEngine(await PeerId.create({ bits: 512 }), blockstore)
234+
engine._running = true
235+
236+
engine._tasks.push({
237+
entry: {
238+
cid: block.cid
239+
},
240+
target: await PeerId.create({ bits: 512 })
241+
})
242+
243+
await engine._processTasks()
244+
await engine._processTasks()
245+
await engine._processTasks()
246+
await engine._processTasks()
247+
248+
expect(engine).to.have.nested.deep.property('_tasks[0].entry.cid', block.cid)
249+
expect(engine).to.have.nested.deep.property('_tasks[0].attempt', 4)
250+
251+
await engine._processTasks()
252+
253+
expect(engine).to.have.property('_tasks').that.has.lengthOf(0)
254+
})
200255
})

0 commit comments

Comments
 (0)