Skip to content

Include pending bytes in response message #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ class DecisionEngine {

async _sendBlocks (peer, blocks) {
// split into messages of max 512 * 1024 bytes
const total = blocks.reduce((acc, b) => {
let total = blocks.reduce((acc, b) => {
return acc + b.data.byteLength
}, 0)

// If the blocks fit into one message, send the message right away
if (total < MAX_MESSAGE_SIZE) {
await this._sendSafeBlocks(peer, blocks)
await this._sendSafeBlocks(peer, blocks, 0)
return
}

// The blocks don't all fit into one message so we need to split them up
let size = 0
let batch = []
let outstanding = blocks.length
Expand All @@ -45,6 +47,7 @@ class DecisionEngine {
outstanding--
batch.push(b)
size += b.data.byteLength
total -= b.data.byteLength

if (size >= MAX_MESSAGE_SIZE ||
// need to ensure the last remaining items get sent
Expand All @@ -53,7 +56,7 @@ class DecisionEngine {
const nextBatch = batch.slice()
batch = []
try {
await this._sendSafeBlocks(peer, nextBatch)
await this._sendSafeBlocks(peer, nextBatch, total)
} catch (err) {
// catch the error so as to send as many blocks as we can
this._log('sendblock error: %s', err.message)
Expand All @@ -62,9 +65,10 @@ class DecisionEngine {
}
}

async _sendSafeBlocks (peer, blocks) {
async _sendSafeBlocks (peer, blocks, pendingBytes) {
const msg = new Message(false)
blocks.forEach((b) => msg.addBlock(b))
msg.setPendingBytes(pendingBytes)

await this.network.sendMessage(peer, msg)
}
Expand Down
10 changes: 10 additions & 0 deletions src/types/message/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class BitswapMessage {
this.full = full
this.wantlist = new Map()
this.blocks = new Map()
this.pendingBytes = 0
}

get empty () {
Expand Down Expand Up @@ -47,6 +48,10 @@ class BitswapMessage {
this.addEntry(cid, 0, true)
}

setPendingBytes (size) {
this.pendingBytes = size
}

/*
* Serializes to Bitswap Message protobuf of
* version 1.0.0
Expand Down Expand Up @@ -102,6 +107,10 @@ class BitswapMessage {
})
})

if (this.pendingBytes > 0) {
msg.pendingBytes = this.pendingBytes
}

return pbm.Message.encode(msg)
}

Expand Down Expand Up @@ -169,6 +178,7 @@ BitswapMessage.deserialize = async (raw) => {
const cid = new CID(cidVersion, codecName[multicodec.toString('16')], hash)
msg.addBlock(new Block(p.data, cid))
}))
msg.setPendingBytes(decoded.pendingBytes)
return msg
}

Expand Down
2 changes: 1 addition & 1 deletion src/types/message/message.proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ module.exports = `
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
repeated BlockPresence blockPresences = 4;
int32 PendingBytes = 5;
int32 pendingBytes = 5;
}
`
26 changes: 22 additions & 4 deletions test/decision-engine/decision-engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ describe('Engine', () => {
})

it('splits large block messages', () => {
const sum = (nums) => nums.reduce((a, b) => a + b, 0)

const getMessageSizes = (messages) => {
const sizes = []
for (const [, msg] of messages) {
const blocks = [...msg.blocks.values()]
sizes.push(sum(blocks.map(b => b.data.byteLength)))
}
return sizes
}

const data = range(10).map((i) => {
const b = Buffer.alloc(1024 * 256)
b.fill(i)
Expand All @@ -167,11 +178,16 @@ describe('Engine', () => {

return new Promise((resolve, reject) => {
const net = mockNetwork(5, (res) => {
res.messages.forEach((message) => {
const messageSizes = getMessageSizes(res.messages)
for (let i = 0; i < res.messages.length; i++) {
const [, message] = res.messages[i]
// The batch size is big enough to hold two blocks, so every
// message should contain two blocks
expect(message[1].blocks.size).to.eql(2)
})
expect(message.blocks.size).to.eql(2)
// The pending bytes should be the sum of the size of blocks in the
// remaining messages
expect(message.pendingBytes).to.eql(sum(messageSizes.slice(i + 1)))
}
resolve()
})

Expand All @@ -189,10 +205,12 @@ describe('Engine', () => {
const blocks = res[1]
const cids = blocks.map((b) => b.cid)

// Put blocks into the node's blockstore
await Promise.all((blocks.map((b) => sf.blockstore.put(b))))

// Simulate receiving a wantlist for all the blocks
const msg = new Message(false)
cids.forEach((c, i) => msg.addEntry(c, Math.pow(2, 32) - 1 - i))

sf.messageReceived(id, msg)
})
.catch(reject)
Expand Down
6 changes: 5 additions & 1 deletion test/types/message.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ describe('BitswapMessage', () => {
const block = blocks[1]
const msg = new BitswapMessage(true)
msg.addBlock(block)
msg.setPendingBytes(10)

const serialized = msg.serializeToBitswap110()
const decoded = pbm.Message.decode(serialized)

expect(decoded.payload[0].data).to.eql(block.data)
expect(decoded.pendingBytes).to.eql(10)
})

it('.deserialize a Bitswap100 Message', async () => {
Expand Down Expand Up @@ -120,7 +122,8 @@ describe('BitswapMessage', () => {
}, {
data: b2.data,
prefix: cid2.prefix
}]
}],
pendingBytes: 10
})

const msg = await BitswapMessage.deserialize(raw)
Expand All @@ -137,6 +140,7 @@ describe('BitswapMessage', () => {
[cid1.toString('base58btc'), b1.data],
[cid2.toString('base58btc'), b2.data]
])
expect(msg.pendingBytes).to.equal(10)
})

it('ignores duplicates', () => {
Expand Down