Skip to content

Commit 2dae35f

Browse files
committed
refactor: update benchmark tests and allow streaming to putMany
1 parent 9e3f12e commit 2dae35f

File tree

10 files changed

+228
-300
lines changed

10 files changed

+228
-300
lines changed

benchmarks/index.js

Lines changed: 55 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -2,109 +2,76 @@
22
/* eslint-disable no-console */
33
'use strict'
44

5-
const series = require('async/series')
6-
const parallel = require('async/parallel')
7-
const map = require('async/map')
8-
const mapSeries = require('async/mapSeries')
9-
const each = require('async/each')
10-
const _ = require('lodash')
11-
const Block = require('ipfs-block')
125
const assert = require('assert')
13-
const crypto = require('crypto')
14-
const CID = require('cids')
15-
const multihashing = require('multihashing-async')
6+
const range = require('lodash.range')
167

17-
const utils = require('../test/utils')
8+
const makeBlock = require('../test/utils/make-block')
9+
const genBitswapNetwork = require('../test/utils/mocks').genBitswapNetwork
1810

1911
const nodes = [2, 5, 10, 20]
2012
const blockFactors = [1, 10, 100]
2113

22-
console.log('-- start')
23-
mapSeries(nodes, (n, cb) => {
24-
mapSeries(blockFactors, (blockFactor, cb) => {
25-
utils.genBitswapNetwork(n, (err, nodeArr) => {
26-
if (err) {
27-
return cb(err)
28-
}
29-
30-
round(nodeArr, blockFactor, n, (err) => {
31-
if (err) {
32-
return cb(err)
33-
}
34-
35-
shutdown(nodeArr, cb)
36-
})
14+
;(async function () {
15+
console.log('-- start')
16+
await Promise.all(
17+
nodes.map(async nodeCount => {
18+
await Promise.all(
19+
blockFactors.map(async blockFactor => {
20+
const nodeArr = await genBitswapNetwork(nodeCount)
21+
await round(nodeArr, blockFactor, nodeCount)
22+
await shutdown(nodeArr)
23+
})
24+
)
3725
})
38-
}, cb)
39-
}, (err) => {
40-
if (err) {
41-
throw err
42-
}
26+
)
27+
4328
console.log('-- finished')
44-
})
29+
})()
4530

46-
function shutdown (nodeArr, cb) {
47-
each(nodeArr, (node, cb) => {
48-
node.bitswap.stop()
49-
node.libp2p.stop(cb)
50-
}, cb)
31+
async function shutdown (nodeArr) {
32+
await Promise.all(
33+
nodeArr.map(async node => {
34+
await node.bitswap.stop()
35+
await node.libp2p.stop()
36+
})
37+
)
5138
}
5239

53-
function round (nodeArr, blockFactor, n, cb) {
54-
createBlocks(n, blockFactor, (err, blocks) => {
55-
if (err) {
56-
return cb(err)
57-
}
58-
const cids = blocks.map((b) => b.cid)
59-
let d
60-
series([
61-
// put blockFactor amount of blocks per node
62-
(cb) => parallel(_.map(nodeArr, (node, i) => (callback) => {
63-
node.bitswap.start()
64-
65-
const data = _.map(_.range(blockFactor), (j) => {
40+
async function round (nodeArr, blockFactor, n) {
41+
const blocks = await makeBlock(n * blockFactor)
42+
const cids = blocks.map((b) => b.cid)
43+
44+
console.info('put blockFactor amount of blocks per node')
45+
46+
await Promise.all(
47+
nodeArr.map(async (node, i) => {
48+
await node.bitswap.start()
49+
50+
await Promise.all(
51+
range(blockFactor).map(async j => {
6652
const index = i * blockFactor + j
67-
return blocks[index]
68-
})
69-
each(
70-
data,
71-
(d, cb) => node.bitswap.put(d, cb),
72-
callback
73-
)
74-
}), cb),
75-
(cb) => {
76-
d = (new Date()).getTime()
77-
cb()
78-
},
79-
// fetch all blocks on every node
80-
(cb) => parallel(_.map(nodeArr, (node, i) => (callback) => {
81-
map(cids, (cid, cb) => node.bitswap.get(cid, cb), (err, res) => {
82-
if (err) {
83-
return callback(err)
84-
}
85-
86-
assert(res.length === blocks.length)
87-
callback()
53+
54+
await node.bitswap.put(blocks[index])
8855
})
89-
}), cb)
90-
], (err) => {
91-
if (err) {
92-
return cb(err)
93-
}
94-
console.log(' %s nodes - %s blocks/node - %sms', n, blockFactor, (new Date()).getTime() - d)
95-
cb()
56+
)
9657
})
97-
})
98-
}
58+
)
59+
60+
console.info('fetch all blocks on every node')
9961

100-
function createBlocks (n, blockFactor, callback) {
101-
map(_.range(n * blockFactor), (i, cb) => {
102-
const data = crypto.randomBytes(n * blockFactor)
103-
multihashing(data, 'sha2-256', (err, hash) => {
104-
if (err) {
105-
return cb(err)
62+
const d = Date.now()
63+
64+
await Promise.all(
65+
nodeArr.map(async node => {
66+
let count = 0
67+
68+
for await (const _ of node.bitswap.getMany(cids)) { // eslint-disable-line no-unused-vars
69+
count++
10670
}
107-
cb(null, new Block(data, new CID(hash)))
71+
72+
assert(count === blocks.length)
10873
})
109-
}, callback)
74+
)
75+
76+
console.log(' %s nodes - %s blocks/node - %sms', n, blockFactor, Date.now() - d)
11077
}

benchmarks/put-get.js

Lines changed: 19 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,34 @@
33
'use strict'
44

55
const Benchmark = require('benchmark')
6-
const _ = require('lodash')
7-
const Block = require('ipfs-block')
86
const assert = require('assert')
9-
const series = require('async/series')
10-
const map = require('async/map')
11-
const crypto = require('crypto')
12-
const CID = require('cids')
13-
const multihashing = require('multihashing-async')
14-
15-
const utils = require('../test/utils')
7+
const all = require('async-iterator-all')
8+
const makeBlock = require('../test/utils/make-block')
9+
const genBitswapNetwork = require('../test/utils/mocks').genBitswapNetwork
1610

1711
const suite = new Benchmark.Suite('put-get')
1812

1913
const blockCounts = [1, 10, 1000]
2014
const blockSizes = [10, 1024, 10 * 1024]
2115

22-
utils.genBitswapNetwork(1, (err, nodes) => {
23-
if (err) {
24-
throw err
25-
}
26-
const node = nodes[0]
16+
;(async function () {
17+
const [
18+
node
19+
] = await genBitswapNetwork(1)
20+
2721
const bitswap = node.bitswap
2822

2923
blockCounts.forEach((n) => blockSizes.forEach((k) => {
30-
suite.add(`put-get ${n} blocks of size ${k}`, (defer) => {
31-
createBlocks(n, k, (err, blocks) => {
32-
if (err) {
33-
throw err
34-
}
35-
series([
36-
(cb) => bitswap.putMany(blocks, cb),
37-
(cb) => get(blocks, bitswap, cb)
38-
], (err) => {
39-
if (err) {
40-
throw err
41-
}
42-
defer.resolve()
43-
})
44-
})
24+
suite.add(`put-get ${n} blocks of size ${k}`, async (defer) => {
25+
const blocks = await makeBlock(n, k)
26+
27+
await bitswap.putMany(blocks)
28+
29+
const res = await all(bitswap.getMany(blocks.map(block => block.cid)))
30+
31+
assert(res.length === blocks.length)
32+
33+
defer.resolve()
4534
}, {
4635
defer: true
4736
})
@@ -57,29 +46,4 @@ utils.genBitswapNetwork(1, (err, nodes) => {
5746
.run({
5847
async: true
5948
})
60-
})
61-
62-
function createBlocks (n, k, callback) {
63-
map(_.range(n), (i, cb) => {
64-
const data = crypto.randomBytes(k)
65-
multihashing(data, 'sha2-256', (err, hash) => {
66-
if (err) {
67-
return cb(err)
68-
}
69-
cb(null, new Block(data, new CID(hash)))
70-
})
71-
}, callback)
72-
}
73-
74-
function get (blocks, bs, callback) {
75-
map(blocks, (b, cb) => {
76-
bs.get(b.cid, cb)
77-
}, (err, res) => {
78-
if (err) {
79-
return callback(err)
80-
}
81-
82-
assert(res.length === blocks.length)
83-
callback()
84-
})
85-
}
49+
})()

package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,11 @@
4444
"devDependencies": {
4545
"@nodeutils/defaults-deep": "^1.1.0",
4646
"aegir": "^20.3.1",
47-
"async": "^2.6.1",
4847
"async-iterator-all": "^1.0.0",
4948
"benchmark": "^2.1.4",
5049
"chai": "^4.2.0",
5150
"dirty-chai": "^2.0.1",
52-
"ipfs-repo": "^0.27.1",
51+
"ipfs-repo": "ipfs/js-ipfs-repo#blockstore-accept-async-iterator",
5352
"libp2p": "^0.26.1",
5453
"libp2p-kad-dht": "^0.16.0",
5554
"libp2p-mplex": "^0.8.0",
@@ -83,7 +82,6 @@
8382
"protons": "^1.0.1",
8483
"pull-length-prefixed": "^1.3.1",
8584
"pull-stream": "^3.6.9",
86-
"typical": "^5.1.0",
8785
"varint-decoder": "~0.1.1"
8886
},
8987
"pre-push": [

src/index.js

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -264,41 +264,43 @@ class Bitswap {
264264
* @param {function(Error)} callback
265265
* @returns {void}
266266
*/
267-
async put (block) {
268-
this._log('putting block')
269-
270-
const has = await this.blockstore.has(block.cid)
271-
272-
if (has) {
273-
return
267+
async put (block) { // eslint-disable-line require-await
268+
if (!Array.isArray(block)) {
269+
block = [
270+
block
271+
]
274272
}
275273

276-
await this._putBlock(block)
274+
return this.putMany(block)
277275
}
278276

279277
/**
280278
* Put the given blocks to the underlying blockstore and
281279
* send it to nodes that have it them their wantlist.
282280
*
283-
* @param {Array<Block>} blocks
281+
* @param {AsyncIterable<Block>} blocks
284282
* @param {function(Error)} callback
285283
* @returns {void}
286284
*/
287-
async putMany (blocks) {
288-
const newBlocks = await Promise.all(blocks.map(async (b) => {
289-
return !(await this.blockstore.has(b.cid))
290-
})).filter(Boolean)
291-
292-
await this.blockstore.putMany(newBlocks)
293-
294-
for (const block of newBlocks) {
295-
this.notifications.hasBlock(block)
296-
this.engine.receivedBlocks([block.cid])
297-
// Note: Don't wait for provide to finish before returning
298-
this.network.provide(block.cid).catch((err) => {
299-
this._log.error('Failed to provide: %s', err.message)
300-
})
301-
}
285+
async putMany (blocks) { // eslint-disable-line require-await
286+
const self = this
287+
288+
return this.blockstore.putMany(async function * () {
289+
for await (const block of blocks) {
290+
if (await self.blockstore.has(block.cid)) {
291+
continue
292+
}
293+
294+
yield block
295+
296+
self.notifications.hasBlock(block)
297+
self.engine.receivedBlocks([block.cid])
298+
// Note: Don't wait for provide to finish before returning
299+
self.network.provide(block.cid).catch((err) => {
300+
self._log.error('Failed to provide: %s', err.message)
301+
})
302+
}
303+
}())
302304
}
303305

304306
/**

test/benchmarks/get-many.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44

55
const distributionTest = require('../utils/distribution-test')
66
const print = require('./helpers/print-swarm-results')
7+
const EventEmitter = require('events')
78

8-
print('10 nodes, 10 blocks, 5 iterations', distributionTest(10, 10, 5, (err) => {
9-
if (err) {
10-
throw err
11-
}
9+
;(async function () {
10+
const emitter = new EventEmitter()
11+
12+
print('10 nodes, 10 blocks, 5 iterations', emitter)
13+
14+
await distributionTest(10, 10, 5, emitter)
1215

1316
console.log('Finished. Can kill now...')
14-
}))
17+
})()

0 commit comments

Comments
 (0)