Skip to content

Commit 36fb616

Browse files
committed
feat: include pending bytes in response message (#205)
1 parent 11f5ff7 commit 36fb616

File tree

5 files changed

+46
-10
lines changed

5 files changed

+46
-10
lines changed

src/decision-engine/index.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,17 @@ class DecisionEngine {
2828

2929
async _sendBlocks (peer, blocks) {
3030
// split into messages of max 512 * 1024 bytes
31-
const total = blocks.reduce((acc, b) => {
31+
let total = blocks.reduce((acc, b) => {
3232
return acc + b.data.byteLength
3333
}, 0)
3434

35+
// If the blocks fit into one message, send the message right away
3536
if (total < MAX_MESSAGE_SIZE) {
36-
await this._sendSafeBlocks(peer, blocks)
37+
await this._sendSafeBlocks(peer, blocks, 0)
3738
return
3839
}
3940

41+
// The blocks don't all fit into one message so we need to split them up
4042
let size = 0
4143
let batch = []
4244
let outstanding = blocks.length
@@ -45,6 +47,7 @@ class DecisionEngine {
4547
outstanding--
4648
batch.push(b)
4749
size += b.data.byteLength
50+
total -= b.data.byteLength
4851

4952
if (size >= MAX_MESSAGE_SIZE ||
5053
// need to ensure the last remaining items get sent
@@ -53,7 +56,7 @@ class DecisionEngine {
5356
const nextBatch = batch.slice()
5457
batch = []
5558
try {
56-
await this._sendSafeBlocks(peer, nextBatch)
59+
await this._sendSafeBlocks(peer, nextBatch, total)
5760
} catch (err) {
5861
// catch the error so as to send as many blocks as we can
5962
this._log('sendblock error: %s', err.message)
@@ -62,9 +65,10 @@ class DecisionEngine {
6265
}
6366
}
6467

65-
async _sendSafeBlocks (peer, blocks) {
68+
async _sendSafeBlocks (peer, blocks, pendingBytes) {
6669
const msg = new Message(false)
6770
blocks.forEach((b) => msg.addBlock(b))
71+
msg.setPendingBytes(pendingBytes)
6872

6973
await this.network.sendMessage(peer, msg)
7074
}

src/types/message/index.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class BitswapMessage {
1515
this.full = full
1616
this.wantlist = new Map()
1717
this.blocks = new Map()
18+
this.pendingBytes = 0
1819
}
1920

2021
get empty () {
@@ -46,6 +47,10 @@ class BitswapMessage {
4647
this.addEntry(cid, 0, true)
4748
}
4849

50+
setPendingBytes (size) {
51+
this.pendingBytes = size
52+
}
53+
4954
/*
5055
* Serializes to Bitswap Message protobuf of
5156
* version 1.0.0
@@ -101,6 +106,10 @@ class BitswapMessage {
101106
})
102107
})
103108

109+
if (this.pendingBytes > 0) {
110+
msg.pendingBytes = this.pendingBytes
111+
}
112+
104113
return pbm.Message.encode(msg)
105114
}
106115

@@ -162,6 +171,7 @@ BitswapMessage.deserialize = async (raw) => {
162171
const cid = new CID(cidVersion, getName(multicodec), hash)
163172
msg.addBlock(new Block(p.data, cid))
164173
}))
174+
msg.setPendingBytes(decoded.pendingBytes)
165175
return msg
166176
}
167177

src/types/message/message.proto.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,6 @@ module.exports = `
4141
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0
4242
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
4343
repeated BlockPresence blockPresences = 4;
44-
int32 PendingBytes = 5;
44+
int32 pendingBytes = 5;
4545
}
4646
`

test/decision-engine/decision-engine.js

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,17 @@ describe('Engine', () => {
159159
})
160160

161161
it('splits large block messages', () => {
162+
const sum = (nums) => nums.reduce((a, b) => a + b, 0)
163+
164+
const getMessageSizes = (messages) => {
165+
const sizes = []
166+
for (const [, msg] of messages) {
167+
const blocks = [...msg.blocks.values()]
168+
sizes.push(sum(blocks.map(b => b.data.byteLength)))
169+
}
170+
return sizes
171+
}
172+
162173
const data = range(10).map((i) => {
163174
const b = Buffer.alloc(1024 * 256)
164175
b.fill(i)
@@ -167,11 +178,16 @@ describe('Engine', () => {
167178

168179
return new Promise((resolve, reject) => {
169180
const net = mockNetwork(5, (res) => {
170-
res.messages.forEach((message) => {
181+
const messageSizes = getMessageSizes(res.messages)
182+
for (let i = 0; i < res.messages.length; i++) {
183+
const [, message] = res.messages[i]
171184
// The batch size is big enough to hold two blocks, so every
172185
// message should contain two blocks
173-
expect(message[1].blocks.size).to.eql(2)
174-
})
186+
expect(message.blocks.size).to.eql(2)
187+
// The pending bytes should be the sum of the size of blocks in the
188+
// remaining messages
189+
expect(message.pendingBytes).to.eql(sum(messageSizes.slice(i + 1)))
190+
}
175191
resolve()
176192
})
177193

@@ -189,10 +205,12 @@ describe('Engine', () => {
189205
const blocks = res[1]
190206
const cids = blocks.map((b) => b.cid)
191207

208+
// Put blocks into the node's blockstore
192209
await Promise.all((blocks.map((b) => sf.blockstore.put(b))))
210+
211+
// Simulate receiving a wantlist for all the blocks
193212
const msg = new Message(false)
194213
cids.forEach((c, i) => msg.addEntry(c, Math.pow(2, 32) - 1 - i))
195-
196214
sf.messageReceived(id, msg)
197215
})
198216
.catch(reject)

test/types/message.spec.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,13 @@ describe('BitswapMessage', () => {
5353
const block = blocks[1]
5454
const msg = new BitswapMessage(true)
5555
msg.addBlock(block)
56+
msg.setPendingBytes(10)
5657

5758
const serialized = msg.serializeToBitswap110()
5859
const decoded = pbm.Message.decode(serialized)
5960

6061
expect(decoded.payload[0].data).to.eql(block.data)
62+
expect(decoded.pendingBytes).to.eql(10)
6163
})
6264

6365
it('.deserialize a Bitswap100 Message', async () => {
@@ -120,7 +122,8 @@ describe('BitswapMessage', () => {
120122
}, {
121123
data: b2.data,
122124
prefix: cid2.prefix
123-
}]
125+
}],
126+
pendingBytes: 10
124127
})
125128

126129
const msg = await BitswapMessage.deserialize(raw)
@@ -137,6 +140,7 @@ describe('BitswapMessage', () => {
137140
[cid1.toString('base58btc'), b1.data],
138141
[cid2.toString('base58btc'), b2.data]
139142
])
143+
expect(msg.pendingBytes).to.equal(10)
140144
})
141145

142146
it('ignores duplicates', () => {

0 commit comments

Comments
 (0)