Skip to content

Commit 78ce032

Browse files
achingbraindirkmc
andauthored
fix: race condition when requesting the same block twice (#214)
When we call `blockstore.putMany`, some implementations will batch up all the `put`s and write them at once. This means that `blockstore.has` might not return `true` for a little while - if another request for a given block comes in before `blockstore.has` returns `true` it'll get added to the want list. If the block then finishes it's batch and finally a remote peer supplies the wanted block, the notifications that complete the second block request will never get sent and the process will hang indefinitely. The change made here is to separate the sending of notifications out from putting things into the blockstore. If the blockstore has a block, but the block is still in the wantlist, send notifications that we now have the block. Also: - Upgrade to use the streaming API from interface-datastore - Does not assume that only arrays of CIDs are being passed any more, only uses the AsyncIterable interface contract to access data - Actually dial remote nodes with bitswap 1.2.0 Co-authored-by: dirkmc <[email protected]>
1 parent 46490f5 commit 78ce032

25 files changed

+419
-230
lines changed

benchmarks/put-get.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
const Benchmark = require('benchmark')
66
const assert = require('assert')
7-
const all = require('async-iterator-all')
7+
const all = require('it-all')
8+
const drain = require('it-drain')
89
const makeBlock = require('../test/utils/make-block')
910
const genBitswapNetwork = require('../test/utils/mocks').genBitswapNetwork
1011

@@ -24,7 +25,7 @@ const blockSizes = [10, 1024, 10 * 1024]
2425
suite.add(`put-get ${n} blocks of size ${k}`, async (defer) => {
2526
const blocks = await makeBlock(n, k)
2627

27-
await bitswap.putMany(blocks)
28+
await drain(bitswap.putMany(blocks))
2829

2930
const res = await all(bitswap.getMany(blocks.map(block => block.cid)))
3031

package.json

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,15 @@
4343
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
4444
"devDependencies": {
4545
"@nodeutils/defaults-deep": "^1.1.0",
46-
"aegir": "^21.10.1",
47-
"async-iterator-all": "^1.0.0",
46+
"aegir": "^22.0.0",
4847
"benchmark": "^2.1.4",
4948
"buffer": "^5.6.0",
50-
"chai": "^4.2.0",
5149
"delay": "^4.3.0",
52-
"dirty-chai": "^2.0.1",
53-
"ipfs-repo": "^2.0.0",
50+
"ipfs-repo": "^3.0.1",
5451
"ipfs-utils": "^2.2.0",
5552
"iso-random-stream": "^1.1.1",
53+
"it-all": "^1.0.2",
54+
"it-drain": "^1.0.1",
5655
"libp2p": "^0.27.0",
5756
"libp2p-kad-dht": "^0.18.3",
5857
"libp2p-mplex": "^0.9.2",
@@ -71,10 +70,13 @@
7170
"peer-info": "^0.17.0",
7271
"promisify-es6": "^1.0.3",
7372
"rimraf": "^3.0.0",
73+
"sinon": "^9.0.0",
7474
"stats-lite": "^2.2.0",
75-
"uuid": "^3.3.2"
75+
"uuid": "^8.0.0"
7676
},
7777
"dependencies": {
78+
"abort-controller": "^3.0.0",
79+
"any-signal": "^1.1.0",
7880
"bignumber.js": "^9.0.0",
7981
"cids": "~0.8.0",
8082
"debug": "^4.1.0",

src/index.js

Lines changed: 93 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ const DecisionEngine = require('./decision-engine')
66
const Notifications = require('./notifications')
77
const logger = require('./utils').logger
88
const Stats = require('./stats')
9+
const AbortController = require('abort-controller')
10+
const anySignal = require('any-signal')
911

1012
const defaultOptions = {
1113
statsEnabled: false,
@@ -101,9 +103,10 @@ class Bitswap {
101103
this._log('received block')
102104

103105
const has = await this.blockstore.has(block.cid)
106+
104107
this._updateReceiveCounters(peerId.toB58String(), block, has)
105108

106-
if (has || !wasWanted) {
109+
if (!wasWanted) {
107110
return
108111
}
109112

@@ -176,65 +179,88 @@ class Bitswap {
176179
* blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us.
177180
*
178181
* @param {CID} cid
182+
* @param {Object} options
183+
* @param {AbortSignal} options.abortSignal
179184
* @returns {Promise<Block>}
180185
*/
181-
async get (cid) {
182-
for await (const block of this.getMany([cid])) {
183-
return block
186+
async get (cid, options = {}) {
187+
const fetchFromNetwork = (cid, options) => {
188+
// add it to the want list - n.b. later we will abort the AbortSignal
189+
// so no need to remove the blocks from the wantlist after we have it
190+
this.wm.wantBlocks([cid], options)
191+
192+
return this.notifications.wantBlock(cid, options)
184193
}
185-
}
186194

187-
/**
188-
* Fetch a a list of blocks by cid. If the blocks are in the local
189-
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
190-
*
191-
* @param {Iterable<CID>} cids
192-
* @returns {Promise<AsyncIterator<Block>>}
193-
*/
194-
async * getMany (cids) {
195-
let pendingStart = cids.length
196-
const wantList = []
197195
let promptedNetwork = false
198196

199-
const fetchFromNetwork = async (cid) => {
200-
wantList.push(cid)
197+
const loadOrFetchFromNetwork = async (cid, options) => {
198+
try {
199+
// have to await here as we want to handle ERR_NOT_FOUND
200+
const block = await this.blockstore.get(cid, options)
201201

202-
const blockP = this.notifications.wantBlock(cid)
202+
return block
203+
} catch (err) {
204+
if (err.code !== 'ERR_NOT_FOUND') {
205+
throw err
206+
}
203207

204-
if (!pendingStart) {
205-
this.wm.wantBlocks(wantList)
206-
}
208+
if (!promptedNetwork) {
209+
promptedNetwork = true
207210

208-
const block = await blockP
209-
this.wm.cancelWants([cid])
211+
this.network.findAndConnect(cid)
212+
.catch((err) => this._log.error(err))
213+
}
210214

211-
return block
215+
// we don't have the block locally so fetch it from the network
216+
return fetchFromNetwork(cid, options)
217+
}
212218
}
213219

214-
for (const cid of cids) {
215-
const has = await this.blockstore.has(cid)
216-
pendingStart--
217-
if (has) {
218-
if (!pendingStart) {
219-
this.wm.wantBlocks(wantList)
220-
}
221-
yield this.blockstore.get(cid)
220+
// depending on implementation it's possible for blocks to come in while
221+
// we do the async operations to get them from the blockstore leading to
222+
// a race condition, so register for incoming block notifications as well
223+
// as trying to get it from the datastore
224+
const controller = new AbortController()
225+
const signal = anySignal([options.signal, controller.signal])
226+
227+
const block = await Promise.race([
228+
this.notifications.wantBlock(cid, {
229+
signal
230+
}),
231+
loadOrFetchFromNetwork(cid, {
232+
signal
233+
})
234+
])
222235

223-
continue
224-
}
236+
// since we have the block we can now remove our listener
237+
controller.abort()
225238

226-
if (!promptedNetwork) {
227-
promptedNetwork = true
228-
this.network.findAndConnect(cids[0]).catch((err) => this._log.error(err))
229-
}
239+
return block
240+
}
230241

231-
// we don't have the block locally so fetch it from the network
232-
yield fetchFromNetwork(cid)
242+
/**
243+
* Fetch a a list of blocks by cid. If the blocks are in the local
244+
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
245+
*
246+
* @param {AsyncIterator<CID>} cids
247+
* @param {Object} options
248+
* @param {AbortSignal} options.abortSignal
249+
* @returns {Promise<AsyncIterator<Block>>}
250+
*/
251+
async * getMany (cids, options = {}) {
252+
for await (const cid of cids) {
253+
yield this.get(cid, options)
233254
}
234255
}
235256

236257
/**
237-
* Removes the given CIDs from the wantlist independent of any ref counts
258+
* Removes the given CIDs from the wantlist independent of any ref counts.
259+
*
260+
* This will cause all outstanding promises for a given block to reject.
261+
*
262+
* If you want to cancel the want for a block without doing that, pass an
263+
* AbortSignal in to `.get` or `.getMany` and abort it.
238264
*
239265
* @param {Iterable<CID>} cids
240266
* @returns {void}
@@ -249,7 +275,9 @@ class Bitswap {
249275
}
250276

251277
/**
252-
* Removes the given keys from the want list
278+
* Removes the given keys from the want list. This may cause pending promises
279+
* for blocks to never resolve. If you wish these promises to abort instead
280+
* call `unwant(cids)` instead.
253281
*
254282
* @param {Iterable<CID>} cids
255283
* @returns {void}
@@ -268,46 +296,40 @@ class Bitswap {
268296
* @param {Block} block
269297
* @returns {Promise<void>}
270298
*/
271-
async put (block) { // eslint-disable-line require-await
272-
return this.putMany([block])
299+
async put (block) {
300+
await this.blockstore.put(block)
301+
this._sendHaveBlockNotifications(block)
273302
}
274303

275304
/**
276305
* Put the given blocks to the underlying blockstore and
277306
* send it to nodes that have it them their wantlist.
278307
*
279-
* @param {AsyncIterable<Block>|Iterable<Block>} blocks
280-
* @returns {Promise<void>}
308+
* @param {AsyncIterable<Block>} blocks
309+
* @returns {AsyncIterable<Block>}
281310
*/
282-
async putMany (blocks) { // eslint-disable-line require-await
283-
const self = this
284-
285-
// Add any new blocks to the blockstore
286-
const newBlocks = []
287-
await this.blockstore.putMany(async function * () {
288-
for await (const block of blocks) {
289-
if (await self.blockstore.has(block.cid)) {
290-
continue
291-
}
292-
293-
yield block
294-
newBlocks.push(block)
295-
}
296-
}())
297-
298-
// Notify engine that we have new blocks
299-
this.engine.receivedBlocks(newBlocks)
311+
async * putMany (blocks) {
312+
for await (const block of this.blockstore.putMany(blocks)) {
313+
this._sendHaveBlockNotifications(block)
300314

301-
// Notify listeners that we have received the new blocks
302-
for (const block of newBlocks) {
303-
this.notifications.hasBlock(block)
304-
// Note: Don't wait for provide to finish before returning
305-
this.network.provide(block.cid).catch((err) => {
306-
self._log.error('Failed to provide: %s', err.message)
307-
})
315+
yield block
308316
}
309317
}
310318

319+
/**
320+
* Sends notifications about the arrival of a block
321+
*
322+
* @param {Block} block
323+
*/
324+
_sendHaveBlockNotifications (block) {
325+
this.notifications.hasBlock(block)
326+
this.engine.receivedBlocks([block])
327+
// Note: Don't wait for provide to finish before returning
328+
this.network.provide(block.cid).catch((err) => {
329+
this._log.error('Failed to provide: %s', err.message)
330+
})
331+
}
332+
311333
/**
312334
* Get the current list of wants.
313335
*

src/network.js

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,17 @@ class Network {
105105
*
106106
* @param {CID} cid
107107
* @param {number} maxProviders
108-
* @returns {Promise<Result<Array>>}
108+
* @param {Object} options
109+
* @param {AbortSignal} options.abortSignal
110+
* @returns {AsyncIterable<PeerInfo>}
109111
*/
110-
findProviders (cid, maxProviders) {
112+
findProviders (cid, maxProviders, options = {}) {
111113
return this.libp2p.contentRouting.findProviders(
112114
cid,
113115
{
114116
maxTimeout: CONSTANTS.providerRequestTimeout,
115-
maxNumProviders: maxProviders
117+
maxNumProviders: maxProviders,
118+
signal: options.signal
116119
}
117120
)
118121
}
@@ -121,19 +124,29 @@ class Network {
121124
* Find the providers of a given `cid` and connect to them.
122125
*
123126
* @param {CID} cid
127+
* @param {Object} options
128+
* @param {AbortSignal} options.abortSignal
124129
* @returns {void}
125130
*/
126-
async findAndConnect (cid) {
131+
async findAndConnect (cid, options) {
127132
const connectAttempts = []
128-
for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest)) {
133+
for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest, options)) {
129134
this._log('connecting to providers', provider.id.toB58String())
130-
connectAttempts.push(this.connectTo(provider))
135+
connectAttempts.push(this.connectTo(provider, options))
131136
}
132137
await Promise.all(connectAttempts)
133138
}
134139

135-
async provide (cid) {
136-
await this.libp2p.contentRouting.provide(cid)
140+
/**
141+
* Tell the network we can provide content for the passed CID
142+
*
143+
* @param {CID} cid
144+
* @param {Object} options
145+
* @param {AbortSignal} options.abortSignal
146+
* @returns {Promise<void>}
147+
*/
148+
async provide (cid, options) {
149+
await this.libp2p.contentRouting.provide(cid, options)
137150
}
138151

139152
// Connect to the given peer
@@ -169,19 +182,21 @@ class Network {
169182
* Connects to another peer
170183
*
171184
* @param {PeerInfo|PeerId|Multiaddr} peer
172-
* @returns {Promise.<Connection>}
185+
* @param {Object} options
186+
* @param {AbortSignal} options.abortSignal
187+
* @returns {Promise<Connection>}
173188
*/
174-
async connectTo (peer) { // eslint-disable-line require-await
189+
async connectTo (peer, options) { // eslint-disable-line require-await
175190
if (!this._running) {
176191
throw new Error('network isn\'t running')
177192
}
178193

179-
return this.libp2p.dial(peer)
194+
return this.libp2p.dial(peer, options)
180195
}
181196

182197
// Dial to the peer and try to use the most recent Bitswap
183198
_dialPeer (peer) {
184-
return this.libp2p.dialProtocol(peer, [BITSWAP110, BITSWAP100])
199+
return this.libp2p.dialProtocol(peer, [BITSWAP120, BITSWAP110, BITSWAP100])
185200
}
186201

187202
_updateSentStats (peer, blocks) {

0 commit comments

Comments
 (0)