@@ -6,6 +6,8 @@ const DecisionEngine = require('./decision-engine')
6
6
const Notifications = require ( './notifications' )
7
7
const logger = require ( './utils' ) . logger
8
8
const Stats = require ( './stats' )
9
+ const AbortController = require ( 'abort-controller' )
10
+ const anySignal = require ( 'any-signal' )
9
11
10
12
const defaultOptions = {
11
13
statsEnabled : false ,
@@ -101,9 +103,10 @@ class Bitswap {
101
103
this . _log ( 'received block' )
102
104
103
105
const has = await this . blockstore . has ( block . cid )
106
+
104
107
this . _updateReceiveCounters ( peerId . toB58String ( ) , block , has )
105
108
106
- if ( has || ! wasWanted ) {
109
+ if ( ! wasWanted ) {
107
110
return
108
111
}
109
112
@@ -176,65 +179,88 @@ class Bitswap {
176
179
* blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us.
177
180
*
178
181
* @param {CID } cid
182
+ * @param {Object } options
183
+ * @param {AbortSignal } options.abortSignal
179
184
* @returns {Promise<Block> }
180
185
*/
181
- async get ( cid ) {
182
- for await ( const block of this . getMany ( [ cid ] ) ) {
183
- return block
186
+ async get ( cid , options = { } ) {
187
+ const fetchFromNetwork = ( cid , options ) => {
188
+ // add it to the want list - n.b. later we will abort the AbortSignal
189
+ // so no need to remove the blocks from the wantlist after we have it
190
+ this . wm . wantBlocks ( [ cid ] , options )
191
+
192
+ return this . notifications . wantBlock ( cid , options )
184
193
}
185
- }
186
194
187
- /**
188
- * Fetch a a list of blocks by cid. If the blocks are in the local
189
- * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
190
- *
191
- * @param {Iterable<CID> } cids
192
- * @returns {Promise<AsyncIterator<Block>> }
193
- */
194
- async * getMany ( cids ) {
195
- let pendingStart = cids . length
196
- const wantList = [ ]
197
195
let promptedNetwork = false
198
196
199
- const fetchFromNetwork = async ( cid ) => {
200
- wantList . push ( cid )
197
+ const loadOrFetchFromNetwork = async ( cid , options ) => {
198
+ try {
199
+ // have to await here as we want to handle ERR_NOT_FOUND
200
+ const block = await this . blockstore . get ( cid , options )
201
201
202
- const blockP = this . notifications . wantBlock ( cid )
202
+ return block
203
+ } catch ( err ) {
204
+ if ( err . code !== 'ERR_NOT_FOUND' ) {
205
+ throw err
206
+ }
203
207
204
- if ( ! pendingStart ) {
205
- this . wm . wantBlocks ( wantList )
206
- }
208
+ if ( ! promptedNetwork ) {
209
+ promptedNetwork = true
207
210
208
- const block = await blockP
209
- this . wm . cancelWants ( [ cid ] )
211
+ this . network . findAndConnect ( cid )
212
+ . catch ( ( err ) => this . _log . error ( err ) )
213
+ }
210
214
211
- return block
215
+ // we don't have the block locally so fetch it from the network
216
+ return fetchFromNetwork ( cid , options )
217
+ }
212
218
}
213
219
214
- for ( const cid of cids ) {
215
- const has = await this . blockstore . has ( cid )
216
- pendingStart --
217
- if ( has ) {
218
- if ( ! pendingStart ) {
219
- this . wm . wantBlocks ( wantList )
220
- }
221
- yield this . blockstore . get ( cid )
220
+ // depending on implementation it's possible for blocks to come in while
221
+ // we do the async operations to get them from the blockstore leading to
222
+ // a race condition, so register for incoming block notifications as well
223
+ // as trying to get it from the datastore
224
+ const controller = new AbortController ( )
225
+ const signal = anySignal ( [ options . signal , controller . signal ] )
226
+
227
+ const block = await Promise . race ( [
228
+ this . notifications . wantBlock ( cid , {
229
+ signal
230
+ } ) ,
231
+ loadOrFetchFromNetwork ( cid , {
232
+ signal
233
+ } )
234
+ ] )
222
235
223
- continue
224
- }
236
+ // since we have the block we can now remove our listener
237
+ controller . abort ( )
225
238
226
- if ( ! promptedNetwork ) {
227
- promptedNetwork = true
228
- this . network . findAndConnect ( cids [ 0 ] ) . catch ( ( err ) => this . _log . error ( err ) )
229
- }
239
+ return block
240
+ }
230
241
231
- // we don't have the block locally so fetch it from the network
232
- yield fetchFromNetwork ( cid )
242
+ /**
243
+ * Fetch a a list of blocks by cid. If the blocks are in the local
244
+ * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
245
+ *
246
+ * @param {AsyncIterator<CID> } cids
247
+ * @param {Object } options
248
+ * @param {AbortSignal } options.abortSignal
249
+ * @returns {Promise<AsyncIterator<Block>> }
250
+ */
251
+ async * getMany ( cids , options = { } ) {
252
+ for await ( const cid of cids ) {
253
+ yield this . get ( cid , options )
233
254
}
234
255
}
235
256
236
257
/**
237
- * Removes the given CIDs from the wantlist independent of any ref counts
258
+ * Removes the given CIDs from the wantlist independent of any ref counts.
259
+ *
260
+ * This will cause all outstanding promises for a given block to reject.
261
+ *
262
+ * If you want to cancel the want for a block without doing that, pass an
263
+ * AbortSignal in to `.get` or `.getMany` and abort it.
238
264
*
239
265
* @param {Iterable<CID> } cids
240
266
* @returns {void }
@@ -249,7 +275,9 @@ class Bitswap {
249
275
}
250
276
251
277
/**
252
- * Removes the given keys from the want list
278
+ * Removes the given keys from the want list. This may cause pending promises
279
+ * for blocks to never resolve. If you wish these promises to abort instead
280
+ * call `unwant(cids)` instead.
253
281
*
254
282
* @param {Iterable<CID> } cids
255
283
* @returns {void }
@@ -268,46 +296,40 @@ class Bitswap {
268
296
* @param {Block } block
269
297
* @returns {Promise<void> }
270
298
*/
271
- async put ( block ) { // eslint-disable-line require-await
272
- return this . putMany ( [ block ] )
299
+ async put ( block ) {
300
+ await this . blockstore . put ( block )
301
+ this . _sendHaveBlockNotifications ( block )
273
302
}
274
303
275
304
/**
276
305
* Put the given blocks to the underlying blockstore and
277
306
* send it to nodes that have it them their wantlist.
278
307
*
279
- * @param {AsyncIterable<Block>|Iterable<Block> } blocks
280
- * @returns {Promise<void > }
308
+ * @param {AsyncIterable<Block> } blocks
309
+ * @returns {AsyncIterable<Block > }
281
310
*/
282
- async putMany ( blocks ) { // eslint-disable-line require-await
283
- const self = this
284
-
285
- // Add any new blocks to the blockstore
286
- const newBlocks = [ ]
287
- await this . blockstore . putMany ( async function * ( ) {
288
- for await ( const block of blocks ) {
289
- if ( await self . blockstore . has ( block . cid ) ) {
290
- continue
291
- }
292
-
293
- yield block
294
- newBlocks . push ( block )
295
- }
296
- } ( ) )
297
-
298
- // Notify engine that we have new blocks
299
- this . engine . receivedBlocks ( newBlocks )
311
+ async * putMany ( blocks ) {
312
+ for await ( const block of this . blockstore . putMany ( blocks ) ) {
313
+ this . _sendHaveBlockNotifications ( block )
300
314
301
- // Notify listeners that we have received the new blocks
302
- for ( const block of newBlocks ) {
303
- this . notifications . hasBlock ( block )
304
- // Note: Don't wait for provide to finish before returning
305
- this . network . provide ( block . cid ) . catch ( ( err ) => {
306
- self . _log . error ( 'Failed to provide: %s' , err . message )
307
- } )
315
+ yield block
308
316
}
309
317
}
310
318
319
+ /**
320
+ * Sends notifications about the arrival of a block
321
+ *
322
+ * @param {Block } block
323
+ */
324
+ _sendHaveBlockNotifications ( block ) {
325
+ this . notifications . hasBlock ( block )
326
+ this . engine . receivedBlocks ( [ block ] )
327
+ // Note: Don't wait for provide to finish before returning
328
+ this . network . provide ( block . cid ) . catch ( ( err ) => {
329
+ this . _log . error ( 'Failed to provide: %s' , err . message )
330
+ } )
331
+ }
332
+
311
333
/**
312
334
* Get the current list of wants.
313
335
*
0 commit comments