Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support channel options like highwatermark #778

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions lib/callback_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ class CallbackModel extends EventEmitter {
this.connection._updateSecret(newSecret, reason, cb);
}

createChannel (cb) {
createChannel (options, cb) {
if (arguments.length === 1) {
cb = options;
options = undefined;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maintain backwards compatibility if no options are specified

var ch = new Channel(this.connection);
ch.setOptions(options);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a setter to specify the channel options to avoid making a breaking change to the Channel constructor

ch.open(function (err, ok) {
if (err === null)
cb && cb(null, ch);
Expand All @@ -39,8 +44,13 @@ class CallbackModel extends EventEmitter {
return ch;
}

createConfirmChannel (cb) {
createConfirmChannel (options, cb) {
if (arguments.length === 1) {
cb = options;
options = undefined;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maintain backwards compatibility if no options are specified

var ch = new ConfirmChannel(this.connection);
ch.setOptions(options);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a setter to specify the channel options to avoid making a breaking change to the Channel constructor

ch.open(function (err) {
if (err !== null)
return cb && cb(err);
Expand Down
6 changes: 5 additions & 1 deletion lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ class Channel extends EventEmitter {
this.handleMessage = acceptDeliveryOrReturn;
}

setOptions(options) {
this.options = options;
}

allocate () {
this.ch = this.connection.freshChannel(this);
this.ch = this.connection.freshChannel(this, this.options);
return this;
}

Expand Down
6 changes: 4 additions & 2 deletions lib/channel_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ class ChannelModel extends EventEmitter {
return promisify(this.connection._updateSecret.bind(this.connection))(newSecret, reason);
}

async createChannel() {
async createChannel(options) {
const channel = new Channel(this.connection);
channel.setOptions(options);
await channel.open();
return channel;
}

async createConfirmChannel() {
async createConfirmChannel(options) {
const channel = new ConfirmChannel(this.connection);
channel.setOptions(options);
await channel.open();
await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk);
return channel;
Expand Down
63 changes: 61 additions & 2 deletions test/callback_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ suite('updateSecret', function() {
});

function channel_test_fn(method) {
return function(name, chfun) {
return function(name, options, chfun) {
if (arguments.length === 2) {
chfun = options;
options = {};
}
test(name, function(done) {
connect(kCallback(function(c) {
c[method](kCallback(function(ch) {
c[method](options, kCallback(function(ch) {
chfun(ch, done);
}, done));
}, done));
Expand Down Expand Up @@ -210,6 +214,33 @@ suite('sending messages', function() {
});
});

var channelOptions = {};

channel_test('find high watermark', function(ch, done) {
var msg = randomString();
var baseline = 0;
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
while (ch.sendToQueue(q.queue, Buffer.from(msg))) {
baseline++;
};
channelOptions.highWaterMark = baseline * 2;
done();
})
});
Copy link
Collaborator Author

@cressie176 cressie176 Nov 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "test" finds how much the local environment running the test can take before the buffer is saturated, then updates the channel options for the next test. It's more than a little hacky, but I couldn't think of a better way to get the test working reliably.


channel_test('set high watermark', channelOptions, function(ch, done) {
var msg = randomString();
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
var ok;
for (var i = 0; i < channelOptions.highWaterMark; i++) {
ok = ch.sendToQueue(q.queue, Buffer.from(msg));
assert.equal(ok, true);
}
done();
});
});
});

suite('ConfirmChannel', function() {
Expand All @@ -228,6 +259,34 @@ suite('ConfirmChannel', function() {
ch.waitForConfirms(done);
});

var channelOptions = {};

confirm_channel_test('find high watermark', function(ch, done) {
var msg = randomString();
var baseline = 0;
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
while (ch.sendToQueue(q.queue, Buffer.from(msg))) {
baseline++;
};
channelOptions.highWaterMark = baseline * 2;
done();
})
});

confirm_channel_test('set high watermark', channelOptions, function(ch, done) {
var msg = randomString();
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
var ok;
for (var i = 0; i < channelOptions.highWaterMark; i++) {
ok = ch.sendToQueue(q.queue, Buffer.from(msg));
assert.equal(ok, true);
}
done();
});
});

});

suite("Error handling", function() {
Expand Down
Loading