Skip to content

Commit 57185ea

Browse files
committed
fix: race condition when requesting the same block twice
When we call `blockstore.putMany`, some implementations will batch up all the `put`s and write them at once. This means that `blockstore.has` might not return `true` for a little while - if another request for a given block comes in before `blockstore.has` returns `true` it'll get added to the want list. If the block then finishes it's batch and finally a remote peer supplies the wanted block, the notifications that complete the second block request will never get sent and the process will hang idefinately. The change made here is to separate the sending of notifications out from putting things into the blockstore. If the blockstore has a block, but the block is still in the wantlist, send notifications that we now have the block.
1 parent 7e7f36c commit 57185ea

File tree

3 files changed

+75
-22
lines changed

3 files changed

+75
-22
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
"peer-info": "^0.17.0",
7272
"promisify-es6": "^1.0.3",
7373
"rimraf": "^3.0.0",
74+
"sinon": "^9.0.0",
7475
"stats-lite": "^2.2.0",
7576
"uuid": "^3.3.2"
7677
},

src/index.js

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ class Bitswap {
104104
this._updateReceiveCounters(peerId.toB58String(), block, has)
105105

106106
if (has || !wasWanted) {
107+
if (wasWanted) {
108+
this._sendHaveBlockNotifications(block)
109+
}
110+
107111
return
108112
}
109113

@@ -282,32 +286,31 @@ class Bitswap {
282286
async putMany (blocks) { // eslint-disable-line require-await
283287
const self = this
284288

285-
// Add any new blocks to the blockstore
286-
const newBlocks = []
287-
await this.blockstore.putMany(async function * () {
288-
for await (const block of blocks) {
289-
if (await self.blockstore.has(block.cid)) {
290-
continue
291-
}
292-
293-
yield block
294-
newBlocks.push(block)
289+
for await (const block of blocks) {
290+
if (await self.blockstore.has(block.cid)) {
291+
continue
295292
}
296-
}())
297-
298-
// Notify engine that we have new blocks
299-
this.engine.receivedBlocks(newBlocks)
300-
301-
// Notify listeners that we have received the new blocks
302-
for (const block of newBlocks) {
303-
this.notifications.hasBlock(block)
304-
// Note: Don't wait for provide to finish before returning
305-
this.network.provide(block.cid).catch((err) => {
306-
self._log.error('Failed to provide: %s', err.message)
307-
})
293+
294+
await this.blockstore.put(block)
295+
296+
self._sendHaveBlockNotifications(block)
308297
}
309298
}
310299

300+
/**
301+
* Sends notifications about the arrival of a block
302+
*
303+
* @param {Block} block
304+
*/
305+
_sendHaveBlockNotifications (block) {
306+
this.notifications.hasBlock(block)
307+
this.engine.receivedBlocks([block])
308+
// Note: Don't wait for provide to finish before returning
309+
this.network.provide(block.cid).catch((err) => {
310+
this._log.error('Failed to provide: %s', err.message)
311+
})
312+
}
313+
311314
/**
312315
* Get the current list of wants.
313316
*

test/bitswap.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ const chai = require('chai')
55
chai.use(require('dirty-chai'))
66
const expect = chai.expect
77
const delay = require('delay')
8+
const PeerId = require('peer-id')
9+
const sinon = require('sinon')
810

911
const Bitswap = require('../src')
1012

1113
const createTempRepo = require('./utils/create-temp-repo-nodejs')
1214
const createLibp2pNode = require('./utils/create-libp2p-node')
1315
const makeBlock = require('./utils/make-block')
1416
const orderedFinish = require('./utils/helpers').orderedFinish
17+
const Message = require('../src/types/message')
1518

1619
// Creates a repo + libp2pNode + Bitswap with or without DHT
1720
async function createThing (dht) {
@@ -70,6 +73,52 @@ describe('bitswap without DHT', function () {
7073

7174
finish.assert()
7275
})
76+
77+
it('wants a block, receives a block, wants it again before the blockstore has it, receives it after the blockstore has it', async () => {
78+
// the block we want
79+
const block = await makeBlock()
80+
81+
// id of a peer with the block we want
82+
const peerId = await PeerId.create({ bits: 512 })
83+
84+
// incoming message with requested block from the other peer
85+
const message = new Message(false)
86+
message.addEntry(block.cid, 1, false)
87+
message.addBlock(block)
88+
89+
// slow blockstore
90+
nodes[0].bitswap.blockstore = {
91+
has: sinon.stub().withArgs(block.cid).returns(false),
92+
put: sinon.stub()
93+
}
94+
95+
// add the block to our want list
96+
const wantBlockPromise1 = nodes[0].bitswap.get(block.cid)
97+
98+
// oh look, a peer has sent it to us - this will trigger a `blockstore.put` which
99+
// is an async operation so `self.blockstore.has(cid)` will still return false
100+
// until the write has completed
101+
await nodes[0].bitswap._receiveMessage(peerId, message)
102+
103+
// block store did not have it
104+
expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true()
105+
106+
// another context wants the same block
107+
const wantBlockPromise2 = nodes[0].bitswap.get(block.cid)
108+
109+
// meanwhile the blockstore finishes it's batch
110+
nodes[0].bitswap.blockstore.has = sinon.stub().withArgs(block.cid).returns(true)
111+
112+
// here it comes again
113+
await nodes[0].bitswap._receiveMessage(peerId, message)
114+
115+
// block store had it this time
116+
expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true()
117+
118+
// both requests should get the block
119+
expect(await wantBlockPromise1).to.deep.equal(block)
120+
expect(await wantBlockPromise2).to.deep.equal(block)
121+
})
73122
})
74123

75124
describe('bitswap with DHT', function () {

0 commit comments

Comments
 (0)