@@ -5,15 +5,15 @@ import { Notifications } from './notifications.js'
5
5
import { logger } from './utils/index.js'
6
6
import { Stats } from './stats/index.js'
7
7
import { anySignal } from 'any-signal'
8
- import { BaseBlockstore } from 'blockstore-core/base'
9
8
import { CID } from 'multiformats/cid'
10
- import type { BitswapOptions , Bitswap , MultihashHasherLoader , WantListEntry } from './index.js'
9
+ import type { BitswapOptions , Bitswap , MultihashHasherLoader , WantListEntry , BitswapWantProgressEvents , BitswapNotifyProgressEvents } from './index.js'
11
10
import type { Libp2p } from '@libp2p/interface-libp2p'
12
11
import type { Blockstore , Options , Pair } from 'interface-blockstore'
13
12
import type { Logger } from '@libp2p/logger'
14
13
import type { PeerId } from '@libp2p/interface-peer-id'
15
14
import type { BitswapMessage } from './message/index.js'
16
15
import type { AbortOptions } from '@multiformats/multiaddr'
16
+ import type { ProgressOptions } from 'progress-events'
17
17
18
18
const hashLoader : MultihashHasherLoader = {
19
19
async getHasher ( ) {
@@ -46,35 +46,33 @@ const statsKeys = [
46
46
* JavaScript implementation of the Bitswap 'data exchange' protocol
47
47
* used by IPFS.
48
48
*/
49
- export class DefaultBitswap extends BaseBlockstore implements Bitswap {
49
+ export class DefaultBitswap implements Bitswap {
50
50
private readonly _libp2p : Libp2p
51
51
private readonly _log : Logger
52
52
private readonly _options : Required < BitswapOptions >
53
- private readonly _stats : Stats
53
+ public readonly stats : Stats
54
54
public network : Network
55
55
public blockstore : Blockstore
56
56
public engine : DecisionEngine
57
57
public wm : WantManager
58
58
public notifications : Notifications
59
- public started : boolean
59
+ private started : boolean
60
60
61
61
constructor ( libp2p : Libp2p , blockstore : Blockstore , options : BitswapOptions = { } ) {
62
- super ( )
63
-
64
62
this . _libp2p = libp2p
65
63
this . _log = logger ( this . peerId )
66
64
67
65
this . _options = Object . assign ( { } , defaultOptions , options )
68
66
69
67
// stats
70
- this . _stats = new Stats ( libp2p , statsKeys , {
68
+ this . stats = new Stats ( libp2p , statsKeys , {
71
69
enabled : this . _options . statsEnabled ,
72
70
computeThrottleTimeout : this . _options . statsComputeThrottleTimeout ,
73
71
computeThrottleMaxQueueSize : this . _options . statsComputeThrottleMaxQueueSize
74
72
} )
75
73
76
- // the network delivers messages
77
- this . network = new Network ( libp2p , this , this . _stats , {
74
+ // the network delivers a messages
75
+ this . network = new Network ( libp2p , this , this . stats , {
78
76
hashLoader : options . hashLoader ,
79
77
maxInboundStreams : options . maxInboundStreams ,
80
78
maxOutboundStreams : options . maxOutboundStreams ,
@@ -84,10 +82,10 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
84
82
// local database
85
83
this . blockstore = blockstore
86
84
87
- this . engine = new DecisionEngine ( this . peerId , blockstore , this . network , this . _stats , libp2p )
85
+ this . engine = new DecisionEngine ( this . peerId , blockstore , this . network , this . stats , libp2p )
88
86
89
87
// handle message sending
90
- this . wm = new WantManager ( this . peerId , this . network , this . _stats , libp2p )
88
+ this . wm = new WantManager ( this . peerId , this . network , this . stats , libp2p )
91
89
this . notifications = new Notifications ( this . peerId )
92
90
this . started = false
93
91
}
@@ -162,12 +160,12 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
162
160
}
163
161
164
162
_updateReceiveCounters ( peerIdStr : string , cid : CID , data : Uint8Array , exists : boolean ) : void {
165
- this . _stats . push ( peerIdStr , 'blocksReceived' , 1 )
166
- this . _stats . push ( peerIdStr , 'dataReceived' , data . length )
163
+ this . stats . push ( peerIdStr , 'blocksReceived' , 1 )
164
+ this . stats . push ( peerIdStr , 'dataReceived' , data . length )
167
165
168
166
if ( exists ) {
169
- this . _stats . push ( peerIdStr , 'dupBlksReceived' , 1 )
170
- this . _stats . push ( peerIdStr , 'dupDataReceived' , data . length )
167
+ this . stats . push ( peerIdStr , 'dupBlksReceived' , 1 )
168
+ this . stats . push ( peerIdStr , 'dupDataReceived' , data . length )
171
169
}
172
170
}
173
171
@@ -191,15 +189,15 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
191
189
_onPeerDisconnected ( peerId : PeerId ) : void {
192
190
this . wm . disconnected ( peerId )
193
191
this . engine . peerDisconnected ( peerId )
194
- this . _stats . disconnected ( peerId )
192
+ this . stats . disconnected ( peerId )
195
193
}
196
194
197
195
enableStats ( ) : void {
198
- this . _stats . enable ( )
196
+ this . stats . enable ( )
199
197
}
200
198
201
199
disableStats ( ) : void {
202
- this . _stats . disable ( )
200
+ this . stats . disable ( )
203
201
}
204
202
205
203
/**
@@ -220,8 +218,8 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
220
218
* Fetch a given block by cid. If the block is in the local
221
219
* blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us.
222
220
*/
223
- async get ( cid : CID , options : AbortOptions = { } ) : Promise < Uint8Array > {
224
- const fetchFromNetwork = async ( cid : CID , options : AbortOptions ) : Promise < Uint8Array > => {
221
+ async want ( cid : CID , options : AbortOptions & ProgressOptions < BitswapWantProgressEvents > = { } ) : Promise < Uint8Array > {
222
+ const fetchFromNetwork = async ( cid : CID , options : AbortOptions & ProgressOptions < BitswapWantProgressEvents > ) : Promise < Uint8Array > => {
225
223
// add it to the want list - n.b. later we will abort the AbortSignal
226
224
// so no need to remove the blocks from the wantlist after we have it
227
225
this . wm . wantBlocks ( [ cid ] , options )
@@ -231,7 +229,7 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
231
229
232
230
let promptedNetwork = false
233
231
234
- const loadOrFetchFromNetwork = async ( cid : CID , options : AbortOptions ) : Promise < Uint8Array > => {
232
+ const loadOrFetchFromNetwork = async ( cid : CID , options : AbortOptions & ProgressOptions < BitswapWantProgressEvents > ) : Promise < Uint8Array > => {
235
233
try {
236
234
// have to await here as we want to handle ERR_NOT_FOUND
237
235
const block = await this . blockstore . get ( cid , options )
@@ -266,30 +264,23 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
266
264
try {
267
265
const block = await Promise . race ( [
268
266
this . notifications . wantBlock ( cid , {
267
+ ...options ,
269
268
signal
270
269
} ) ,
271
270
loadOrFetchFromNetwork ( cid , {
271
+ ...options ,
272
272
signal
273
273
} )
274
274
] )
275
275
276
276
return block
277
277
} finally {
278
- // since we have the block we can now remove our listener
278
+ // since we have the block we can now abort any outstanding attempts to
279
+ // fetch it
279
280
controller . abort ( )
280
281
}
281
282
}
282
283
283
- /**
284
- * Fetch a a list of blocks by cid. If the blocks are in the local
285
- * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
286
- */
287
- async * getMany ( cids : AsyncIterable < CID > | Iterable < CID > , options : AbortOptions = { } ) : AsyncGenerator < Uint8Array > {
288
- for await ( const cid of cids ) {
289
- yield this . get ( cid , options )
290
- }
291
- }
292
-
293
284
/**
294
285
* Removes the given CIDs from the wantlist independent of any ref counts.
295
286
*
@@ -320,29 +311,29 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
320
311
*/
321
312
async put ( cid : CID , block : Uint8Array , _options ?: any ) : Promise < void > {
322
313
await this . blockstore . put ( cid , block )
323
- this . _sendHaveBlockNotifications ( cid , block )
314
+ this . notify ( cid , block )
324
315
}
325
316
326
317
/**
327
318
* Put the given blocks to the underlying blockstore and
328
319
* send it to nodes that have it them their wantlist.
329
320
*/
330
321
async * putMany ( source : Iterable < Pair > | AsyncIterable < Pair > , options ?: Options ) : AsyncGenerator < Pair > {
331
- for await ( const { key , value } of this . blockstore . putMany ( source , options ) ) {
332
- this . _sendHaveBlockNotifications ( key , value )
322
+ for await ( const { cid , block } of this . blockstore . putMany ( source , options ) ) {
323
+ this . notify ( cid , block )
333
324
334
- yield { key , value }
325
+ yield { cid , block }
335
326
}
336
327
}
337
328
338
329
/**
339
330
* Sends notifications about the arrival of a block
340
331
*/
341
- _sendHaveBlockNotifications ( cid : CID , data : Uint8Array ) : void {
342
- this . notifications . hasBlock ( cid , data )
343
- this . engine . receivedBlocks ( [ { cid, data } ] )
332
+ notify ( cid : CID , block : Uint8Array , options : ProgressOptions < BitswapNotifyProgressEvents > = { } ) : void {
333
+ this . notifications . hasBlock ( cid , block )
334
+ this . engine . receivedBlocks ( [ { cid, block } ] )
344
335
// Note: Don't wait for provide to finish before returning
345
- this . network . provide ( cid ) . catch ( ( err ) => {
336
+ this . network . provide ( cid , options ) . catch ( ( err ) => {
346
337
this . _log . error ( 'Failed to provide: %s' , err . message )
347
338
} )
348
339
}
@@ -357,17 +348,10 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
357
348
/**
358
349
* Get the current list of partners
359
350
*/
360
- peers ( ) : PeerId [ ] {
351
+ get peers ( ) : PeerId [ ] {
361
352
return this . engine . peers ( )
362
353
}
363
354
364
- /**
365
- * Get stats about the bitswap node
366
- */
367
- stat ( ) : Stats {
368
- return this . _stats
369
- }
370
-
371
355
/**
372
356
* Start the bitswap node
373
357
*/
@@ -382,18 +366,10 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
382
366
* Stop the bitswap node
383
367
*/
384
368
async stop ( ) : Promise < void > {
385
- this . _stats . stop ( )
369
+ this . stats . stop ( )
386
370
this . wm . stop ( )
387
371
await this . network . stop ( )
388
372
this . engine . stop ( )
389
373
this . started = false
390
374
}
391
-
392
- unwrap ( ) : Blockstore {
393
- return this . blockstore
394
- }
395
-
396
- async has ( cid : CID ) : Promise < boolean > {
397
- return await this . blockstore . has ( cid )
398
- }
399
375
}
0 commit comments