Skip to content

Commit 7d5f460

Browse files
committed
Support channel options like highwatermark
1 parent 6d1d528 commit 7d5f460

File tree

4 files changed

+83
-7
lines changed

4 files changed

+83
-7
lines changed

lib/callback_model.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,13 @@ class CallbackModel extends EventEmitter {
2828
this.connection._updateSecret(newSecret, reason, cb);
2929
}
3030

31-
createChannel (cb) {
31+
createChannel (options, cb) {
32+
if (arguments.length === 1) {
33+
cb = options;
34+
options = undefined;
35+
}
3236
var ch = new Channel(this.connection);
37+
ch.setOptions(options);
3338
ch.open(function (err, ok) {
3439
if (err === null)
3540
cb && cb(null, ch);
@@ -39,8 +44,13 @@ class CallbackModel extends EventEmitter {
3944
return ch;
4045
}
4146

42-
createConfirmChannel (cb) {
47+
createConfirmChannel (options, cb) {
48+
if (arguments.length === 1) {
49+
cb = options;
50+
options = undefined;
51+
}
4352
var ch = new ConfirmChannel(this.connection);
53+
ch.setOptions(options);
4454
ch.open(function (err) {
4555
if (err !== null)
4656
return cb && cb(err);

lib/channel.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,12 @@ class Channel extends EventEmitter {
4747
this.handleMessage = acceptDeliveryOrReturn;
4848
}
4949

50+
setOptions(options) {
51+
this.options = options;
52+
}
53+
5054
allocate () {
51-
this.ch = this.connection.freshChannel(this);
55+
this.ch = this.connection.freshChannel(this, this.options);
5256
return this;
5357
}
5458

lib/channel_model.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ class ChannelModel extends EventEmitter {
3030
return promisify(this.connection._updateSecret.bind(this.connection))(newSecret, reason);
3131
}
3232

33-
async createChannel() {
33+
async createChannel(options) {
3434
const channel = new Channel(this.connection);
35+
channel.setOptions(options);
3536
await channel.open();
3637
return channel;
3738
}
3839

39-
async createConfirmChannel() {
40+
async createConfirmChannel(options) {
4041
const channel = new ConfirmChannel(this.connection);
42+
channel.setOptions(options);
4143
await channel.open();
4244
await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk);
4345
return channel;

test/callback_api.js

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,14 @@ suite('updateSecret', function() {
7272
});
7373

7474
function channel_test_fn(method) {
75-
return function(name, chfun) {
75+
return function(name, options, chfun) {
76+
if (arguments.length === 2) {
77+
chfun = options;
78+
options = {};
79+
}
7680
test(name, function(done) {
7781
connect(kCallback(function(c) {
78-
c[method](kCallback(function(ch) {
82+
c[method](options, kCallback(function(ch) {
7983
chfun(ch, done);
8084
}, done));
8185
}, done));
@@ -210,6 +214,34 @@ suite('sending messages', function() {
210214
});
211215
});
212216

217+
channel_test('saturate buffer', function(ch, done) {
218+
var msg = randomString();
219+
ch.assertQueue('', {exclusive: true}, function(e, q) {
220+
if (e !== null) return done(e);
221+
let ok;
222+
for (let i = 0; i < 2047; i++) {
223+
ok = ch.sendToQueue(q.queue, Buffer.from(msg));
224+
if (!ok) break;
225+
}
226+
227+
assert.equal(ok, false);
228+
done();
229+
});
230+
});
231+
232+
channel_test('set high watermark (making it harder to saturate the buffer', { highWaterMark: 4092 }, function(ch, done) {
233+
var msg = randomString();
234+
ch.assertQueue('', {exclusive: true}, function(e, q) {
235+
if (e !== null) return done(e);
236+
let ok;
237+
for (let i = 0; i < 4092; i++) {
238+
ok = ch.sendToQueue(q.queue, Buffer.from(msg));
239+
assert.equal(ok, true);
240+
}
241+
done();
242+
});
243+
});
244+
213245
});
214246

215247
suite('ConfirmChannel', function() {
@@ -228,6 +260,34 @@ suite('ConfirmChannel', function() {
228260
ch.waitForConfirms(done);
229261
});
230262

263+
confirm_channel_test('saturate buffer', function(ch, done) {
264+
var msg = randomString();
265+
ch.assertQueue('', {exclusive: true}, function(e, q) {
266+
if (e !== null) return done(e);
267+
let ok;
268+
for (let i = 0; i < 2047; i++) {
269+
ok = ch.sendToQueue(q.queue, Buffer.from(msg));
270+
if (!ok) break;
271+
}
272+
273+
assert.equal(ok, false);
274+
done();
275+
});
276+
});
277+
278+
confirm_channel_test('set high watermark (making it harder to saturate the buffer', { highWaterMark: 4092 }, function(ch, done) {
279+
var msg = randomString();
280+
ch.assertQueue('', {exclusive: true}, function(e, q) {
281+
if (e !== null) return done(e);
282+
let ok;
283+
for (let i = 0; i < 4092; i++) {
284+
ok = ch.sendToQueue(q.queue, Buffer.from(msg));
285+
assert.equal(ok, true);
286+
}
287+
done();
288+
});
289+
});
290+
231291
});
232292

233293
suite("Error handling", function() {

0 commit comments

Comments
 (0)