Skip to content

Commit 1a50622

Browse files
committed
feat: include pending bytes in response message (#205)
1 parent 1655585 commit 1a50622

File tree

5 files changed

+46
-10
lines changed

5 files changed

+46
-10
lines changed

src/decision-engine/index.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,17 @@ class DecisionEngine {
2828

2929
async _sendBlocks (peer, blocks) {
3030
// split into messages of max 512 * 1024 bytes
31-
const total = blocks.reduce((acc, b) => {
31+
let total = blocks.reduce((acc, b) => {
3232
return acc + b.data.byteLength
3333
}, 0)
3434

35+
// If the blocks fit into one message, send the message right away
3536
if (total < MAX_MESSAGE_SIZE) {
36-
await this._sendSafeBlocks(peer, blocks)
37+
await this._sendSafeBlocks(peer, blocks, 0)
3738
return
3839
}
3940

41+
// The blocks don't all fit into one message so we need to split them up
4042
let size = 0
4143
let batch = []
4244
let outstanding = blocks.length
@@ -45,6 +47,7 @@ class DecisionEngine {
4547
outstanding--
4648
batch.push(b)
4749
size += b.data.byteLength
50+
total -= b.data.byteLength
4851

4952
if (size >= MAX_MESSAGE_SIZE ||
5053
// need to ensure the last remaining items get sent
@@ -53,7 +56,7 @@ class DecisionEngine {
5356
const nextBatch = batch.slice()
5457
batch = []
5558
try {
56-
await this._sendSafeBlocks(peer, nextBatch)
59+
await this._sendSafeBlocks(peer, nextBatch, total)
5760
} catch (err) {
5861
// catch the error so as to send as many blocks as we can
5962
this._log('sendblock error: %s', err.message)
@@ -62,9 +65,10 @@ class DecisionEngine {
6265
}
6366
}
6467

65-
async _sendSafeBlocks (peer, blocks) {
68+
async _sendSafeBlocks (peer, blocks, pendingBytes) {
6669
const msg = new Message(false)
6770
blocks.forEach((b) => msg.addBlock(b))
71+
msg.setPendingBytes(pendingBytes)
6872

6973
await this.network.sendMessage(peer, msg)
7074
}

src/types/message/index.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class BitswapMessage {
1414
this.full = full
1515
this.wantlist = new Map()
1616
this.blocks = new Map()
17+
this.pendingBytes = 0
1718
}
1819

1920
get empty () {
@@ -45,6 +46,10 @@ class BitswapMessage {
4546
this.addEntry(cid, 0, true)
4647
}
4748

49+
setPendingBytes (size) {
50+
this.pendingBytes = size
51+
}
52+
4853
/*
4954
* Serializes to Bitswap Message protobuf of
5055
* version 1.0.0
@@ -100,6 +105,10 @@ class BitswapMessage {
100105
})
101106
})
102107

108+
if (this.pendingBytes > 0) {
109+
msg.pendingBytes = this.pendingBytes
110+
}
111+
103112
return Message.encode(msg)
104113
}
105114

@@ -161,6 +170,7 @@ BitswapMessage.deserialize = async (raw) => {
161170
const cid = new CID(cidVersion, getName(multicodec), hash)
162171
msg.addBlock(new Block(p.data, cid))
163172
}))
173+
msg.setPendingBytes(decoded.pendingBytes)
164174
return msg
165175
}
166176

src/types/message/message.proto.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,6 @@ module.exports = protons(`
4141
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0
4242
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
4343
repeated BlockPresence blockPresences = 4;
44-
int32 PendingBytes = 5;
44+
int32 pendingBytes = 5;
4545
}
4646
`)

test/decision-engine/decision-engine.js

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,17 @@ describe('Engine', () => {
158158
})
159159

160160
it('splits large block messages', () => {
161+
const sum = (nums) => nums.reduce((a, b) => a + b, 0)
162+
163+
const getMessageSizes = (messages) => {
164+
const sizes = []
165+
for (const [, msg] of messages) {
166+
const blocks = [...msg.blocks.values()]
167+
sizes.push(sum(blocks.map(b => b.data.byteLength)))
168+
}
169+
return sizes
170+
}
171+
161172
const data = range(10).map((i) => {
162173
const b = Buffer.alloc(1024 * 256)
163174
b.fill(i)
@@ -166,11 +177,16 @@ describe('Engine', () => {
166177

167178
return new Promise((resolve, reject) => {
168179
const net = mockNetwork(5, (res) => {
169-
res.messages.forEach((message) => {
180+
const messageSizes = getMessageSizes(res.messages)
181+
for (let i = 0; i < res.messages.length; i++) {
182+
const [, message] = res.messages[i]
170183
// The batch size is big enough to hold two blocks, so every
171184
// message should contain two blocks
172-
expect(message[1].blocks.size).to.eql(2)
173-
})
185+
expect(message.blocks.size).to.eql(2)
186+
// The pending bytes should be the sum of the size of blocks in the
187+
// remaining messages
188+
expect(message.pendingBytes).to.eql(sum(messageSizes.slice(i + 1)))
189+
}
174190
resolve()
175191
})
176192

@@ -188,10 +204,12 @@ describe('Engine', () => {
188204
const blocks = res[1]
189205
const cids = blocks.map((b) => b.cid)
190206

207+
// Put blocks into the node's blockstore
191208
await Promise.all((blocks.map((b) => sf.blockstore.put(b))))
209+
210+
// Simulate receiving a wantlist for all the blocks
192211
const msg = new Message(false)
193212
cids.forEach((c, i) => msg.addEntry(c, Math.pow(2, 32) - 1 - i))
194-
195213
sf.messageReceived(id, msg)
196214
})
197215
.catch(reject)

test/types/message.spec.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,13 @@ describe('BitswapMessage', () => {
5252
const block = blocks[1]
5353
const msg = new BitswapMessage(true)
5454
msg.addBlock(block)
55+
msg.setPendingBytes(10)
5556

5657
const serialized = msg.serializeToBitswap110()
5758
const decoded = Message.decode(serialized)
5859

5960
expect(decoded.payload[0].data).to.eql(block.data)
61+
expect(decoded.pendingBytes).to.eql(10)
6062
})
6163

6264
it('.deserialize a Bitswap100 Message', async () => {
@@ -119,7 +121,8 @@ describe('BitswapMessage', () => {
119121
}, {
120122
data: b2.data,
121123
prefix: cid2.prefix
122-
}]
124+
}],
125+
pendingBytes: 10
123126
})
124127

125128
const msg = await BitswapMessage.deserialize(raw)
@@ -136,6 +139,7 @@ describe('BitswapMessage', () => {
136139
[cid1.toString('base58btc'), b1.data],
137140
[cid2.toString('base58btc'), b2.data]
138141
])
142+
expect(msg.pendingBytes).to.equal(10)
139143
})
140144

141145
it('ignores duplicates', () => {

0 commit comments

Comments
 (0)