Skip to content

Commit 23eb985

Browse files
Test SAC with consumres w/o priorities
1 parent 870c667 commit 23eb985

File tree

2 files changed

+138
-42
lines changed

2 files changed

+138
-42
lines changed

selenium/test/amqp.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ var connectionOptions = getConnectionOptions()
77

88
function getAmqpConnectionOptions() {
99
return {
10+
'scheme': process.env.RABBITMQ_AMQP_SCHEME || 'amqp',
1011
'host': process.env.RABBITMQ_HOSTNAME || 'rabbitmq',
1112
'port': process.env.RABBITMQ_AMQP_PORT || 5672,
1213
'username' : process.env.RABBITMQ_AMQP_USERNAME || 'guest',
@@ -39,7 +40,12 @@ function getConnectionOptions() {
3940
}
4041
}
4142
module.exports = {
42-
43+
getAmqpConnectionOptions: () => { return connectionOptions },
44+
getAmqpUrl: () => {
45+
return connectionOptions.scheme + '://' +
46+
connectionOptions.username + ":" + connectionOptions.password + "@" +
47+
connectionOptions.host + ":" + connectionOptions.port
48+
},
4349
open: (queueName = "my-queue") => {
4450
let promise = new Promise((resolve, reject) => {
4551
container.on('connection_open', function(context) {

selenium/test/queuesAndStreams/view-qq-consumers.js

Lines changed: 131 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ require('chromedriver')
33
const assert = require('assert')
44
const { buildDriver, goToHome, captureScreensFor, teardown, doWhile, goToQueue,delay } = require('../utils')
55
const { createQueue, deleteQueue, getManagementUrl, basicAuthorization } = require('../mgt-api')
6-
const { open: openAmqp, once: onceAmqp, on: onAmqp, close: closeAmqp,
7-
openReceiver : openReceiver} = require('../amqp')
6+
const { getAmqpUrl : getAmqpUrl } = require('../amqp')
87
const amqplib = require('amqplib');
98

109
const LoginPage = require('../pageobjects/LoginPage')
@@ -13,12 +12,6 @@ const QueuesAndStreamsPage = require('../pageobjects/QueuesAndStreamsPage')
1312
const QueuePage = require('../pageobjects/QueuePage')
1413
const StreamPage = require('../pageobjects/StreamPage')
1514

16-
var untilConnectionEstablished = new Promise((resolve, reject) => {
17-
onAmqp('connection_open', function(context) {
18-
console.log("Amqp connection opened")
19-
resolve()
20-
})
21-
})
2215

2316
describe('Given a quorum queue configured with SAC', function () {
2417
let login
@@ -44,7 +37,6 @@ describe('Given a quorum queue configured with SAC', function () {
4437
throw new Error('Failed to login')
4538
}
4639
await overview.selectRefreshOption("Do not refresh")
47-
await overview.clickOnQueuesTab()
4840
queueName = "test_" + Math.floor(Math.random() * 1000)
4941

5042
createQueue(getManagementUrl(), basicAuthorization("management", "guest"),
@@ -77,16 +69,21 @@ describe('Given a quorum queue configured with SAC', function () {
7769
assert.equal("Consumers (0)", await queuePage.getConsumersSectionTitle())
7870
})
7971

80-
describe("given there is a consumer attached to the queue", function () {
81-
let amqp
72+
describe("given there is a consumer (without priority) attached to the queue", function () {
8273
let amqp091conn
74+
let ch1
75+
let ch1Consumer
76+
let ch2
77+
let ch2Consumer
8378

8479
before(async function() {
85-
amqp = openAmqp(queueName)
86-
await untilConnectionEstablished
80+
let amqpUrl = getAmqpUrl() + "?frameMax=0"
81+
amqp091conn = await amqplib.connect(amqpUrl)
82+
ch1 = await amqp091conn.createChannel()
83+
ch1Consumer = ch1.consume(queueName, (msg) => {}, {consumerTag: "one"})
8784
})
8885

89-
it('it should have one consumer', async function() {
86+
it('it should have one consumer as active', async function() {
9087
await doWhile(async function() {
9188
await queuePage.refresh()
9289
await queuePage.isLoaded()
@@ -100,53 +97,146 @@ describe('Given a quorum queue configured with SAC', function () {
10097
let consumerTable = await doWhile(async function() {
10198
return queuePage.getConsumersTable()
10299
}, function(table) {
103-
return table[0][6].localeCompare("single active") == 0
100+
return table[0][6].localeCompare("single active") == 0 &&
101+
table[0][1].localeCompare("one") == 0
104102
})
105103
assert.equal("single active", consumerTable[0][6])
104+
assert.equal("one", consumerTable[0][1])
106105

107106
})
108107

109-
it('it should have two consumers, after adding a second subscriber', async function() {
110-
amqp091conn = await amqplib.connect('amqp://guest:guest@localhost?frameMax=0')
111-
const ch1 = await amqp091conn.createChannel()
112-
// Listener
113-
114-
ch1.consume(queueName, (msg) => {}, {priority: 10})
108+
describe("given another consumer is added with priority", function () {
109+
before(async function() {
110+
ch2 = await amqp091conn.createChannel()
111+
ch2Consumer = ch2.consume(queueName, (msg) => {}, {consumerTag: "two", priority: 10})
112+
})
113+
114+
it('the latter consumer should be active and the former waiting', async function() {
115+
116+
await doWhile(async function() {
117+
await queuePage.refresh()
118+
await queuePage.isLoaded()
119+
return queuePage.getConsumerCount()
120+
}, function(count) {
121+
return count.localeCompare("2") == 0
122+
}, 5000)
123+
124+
assert.equal("2", await queuePage.getConsumerCount())
125+
assert.equal("Consumers (2)", await queuePage.getConsumersSectionTitle())
126+
await queuePage.clickOnConsumerSection()
127+
let consumerTable = await doWhile(async function() {
128+
return queuePage.getConsumersTable()
129+
}, function(table) {
130+
return table.length == 2 && table[0][1] != "" && table[1][1] != ""
131+
}, 5000)
132+
133+
let activeConsumer = consumerTable[1][6].localeCompare("single active") == 0 ?
134+
1 : 0
135+
let nonActiveConsumer = activeConsumer == 1 ? 0 : 1
136+
137+
assert.equal("waiting", consumerTable[nonActiveConsumer][6])
138+
assert.equal("one", consumerTable[nonActiveConsumer][1])
139+
assert.equal("single active", consumerTable[activeConsumer][6])
140+
assert.equal("two", consumerTable[activeConsumer][1])
141+
await delay(5000)
142+
})
143+
})
144+
145+
after(async function() {
146+
try {
147+
if (amqp091conn != null) {
148+
amqp091conn.close()
149+
}
150+
} catch (error) {
151+
error("Failed to close amqp091 connection due to " + error);
152+
}
153+
// ensure there are no more consumers
154+
await doWhile(async function() {
155+
await queuePage.refresh()
156+
await queuePage.isLoaded()
157+
return queuePage.getConsumerCount()
158+
}, function(count) {
159+
return count.localeCompare("0") == 0
160+
}, 5000)
161+
115162

163+
})
164+
})
165+
166+
describe("given there is a consumer (with priority) attached to the queue", function () {
167+
let amqp091conn
168+
let ch1
169+
let ch1Consumer
170+
let ch2
171+
let ch2Consumer
172+
173+
before(async function() {
174+
let amqpUrl = getAmqpUrl() + "?frameMax=0"
175+
amqp091conn = await amqplib.connect(amqpUrl)
176+
ch1 = await amqp091conn.createChannel()
177+
ch1Consumer = ch1.consume(queueName, (msg) => {}, {consumerTag: "one", priority: 10})
178+
})
179+
180+
it('it should have one consumer as active', async function() {
116181
await doWhile(async function() {
117182
await queuePage.refresh()
118183
await queuePage.isLoaded()
119184
return queuePage.getConsumerCount()
120185
}, function(count) {
121-
return count.localeCompare("2") == 0
186+
return count.localeCompare("0") == 1
122187
}, 5000)
123-
124-
assert.equal("2", await queuePage.getConsumerCount())
125-
assert.equal("Consumers (2)", await queuePage.getConsumersSectionTitle())
188+
assert.equal("1", await queuePage.getConsumerCount())
189+
assert.equal("Consumers (1)", await queuePage.getConsumersSectionTitle())
126190
await queuePage.clickOnConsumerSection()
127191
let consumerTable = await doWhile(async function() {
128192
return queuePage.getConsumersTable()
129193
}, function(table) {
130-
return table.length == 2
131-
}, 5000)
194+
return table[0][6].localeCompare("single active") == 0 &&
195+
table[0][1].localeCompare("one") == 0
196+
})
197+
assert.equal("single active", consumerTable[0][6])
198+
assert.equal("one", consumerTable[0][1])
199+
200+
})
201+
202+
describe("given another consumer is added without priority", function () {
203+
before(async function() {
204+
ch2 = await amqp091conn.createChannel()
205+
ch2Consumer = ch2.consume(queueName, (msg) => {}, {consumerTag: "two"})
206+
})
132207

133-
let activeConsumer = consumerTable[1][6].localeCompare("single active") == 0 ?
134-
1 : 0
135-
let nonActiveConsumer = activeConsumer == 1 ? 0 : 1
208+
it('the former consumer should still be active and the latter be waiting', async function() {
209+
210+
await doWhile(async function() {
211+
await queuePage.refresh()
212+
await queuePage.isLoaded()
213+
return queuePage.getConsumerCount()
214+
}, function(count) {
215+
return count.localeCompare("2") == 0
216+
}, 5000)
217+
218+
assert.equal("2", await queuePage.getConsumerCount())
219+
assert.equal("Consumers (2)", await queuePage.getConsumersSectionTitle())
220+
await queuePage.clickOnConsumerSection()
221+
let consumerTable = await doWhile(async function() {
222+
return queuePage.getConsumersTable()
223+
}, function(table) {
224+
return table.length == 2 && table[0][1] != "" && table[1][1] != ""
225+
}, 5000)
136226

137-
assert.equal("waiting", consumerTable[nonActiveConsumer][6])
138-
assert.equal("single active", consumerTable[activeConsumer][6])
139-
await delay(5000)
227+
let activeConsumer = consumerTable[1][6].localeCompare("single active") == 0 ?
228+
1 : 0
229+
let nonActiveConsumer = activeConsumer == 1 ? 0 : 1
230+
231+
assert.equal("waiting", consumerTable[nonActiveConsumer][6])
232+
assert.equal("two", consumerTable[nonActiveConsumer][1])
233+
assert.equal("single active", consumerTable[activeConsumer][6])
234+
assert.equal("one", consumerTable[activeConsumer][1])
235+
await delay(5000)
236+
})
140237
})
141238

142-
after(function() {
143-
try {
144-
if (amqp != null) {
145-
closeAmqp(amqp.connection)
146-
}
147-
} catch (error) {
148-
error("Failed to close amqp10 connection due to " + error);
149-
}
239+
after(function() {
150240
try {
151241
if (amqp091conn != null) {
152242
amqp091conn.close()

0 commit comments

Comments
 (0)