Skip to content

Commit 3307428

Browse files
committed
Fix crash in consistent hash exchange
Prior to this commit, a crash occurred when a consistent hash exchange got declared with a `hash-header` argument, but the publishing client didn't set that header on the message. This bug is present in RabbitMQ 3.13.0 - 3.13.6. Fixes #11671 (cherry picked from commit cdc5b88)
1 parent 54c9b96 commit 3307428

File tree

3 files changed

+71
-17
lines changed

3 files changed

+71
-17
lines changed

deps/rabbitmq_consistent_hash_exchange/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ ring partitions, and thus queues according to their binding weights.
8484
#### One Binding Per Queue
8585

8686
This exchange type **assumes a single binding between a queue and an exchange**.
87-
Starting with RabbitMQ `3.10.6` and `3.9.21` this will be enforced in the code:
87+
This will be enforced in the code:
8888
when multiple bindings are created, only the first one will actually update the ring.
8989

9090
This limitation makes most semantic sense: the purpose is to achieve
@@ -376,7 +376,7 @@ exchange to route based on a named header instead. To do this, declare the
376376
exchange with a string argument called "hash-header" naming the header to
377377
be used.
378378

379-
When a `"hash-header"` is specified, the chosen header **must be provided**.
379+
When a `"hash-header"` is specified, the chosen header should be provided.
380380
If published messages do not contain the header, they will all get
381381
routed to the same **arbitrarily chosen** queue.
382382

@@ -579,7 +579,7 @@ declare the exchange with a string argument called ``"hash-property"`` naming th
579579
property to be used.
580580
The `"hash-header"` and `"hash-property"` are mutually exclusive.
581581

582-
When a `"hash-property"` is specified, the chosen property **must be provided**.
582+
When a `"hash-property"` is specified, the chosen property should be provided.
583583
If published messages do not contain the property, they will all get
584584
routed to the same **arbitrarily chosen** queue.
585585

deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,9 @@ jump_consistent_hash_value(_B0, J0, NumberOfBuckets, SeedState0) ->
261261

262262
value_to_hash(undefined, Msg) ->
263263
mc:routing_keys(Msg);
264-
value_to_hash({header, Header}, Msg0) ->
265-
maps:get(Header, mc:routing_headers(Msg0, [x_headers]));
264+
value_to_hash({header, Header}, Msg) ->
265+
Headers = mc:routing_headers(Msg, [x_headers]),
266+
maps:get(Header, Headers, undefined);
266267
value_to_hash({property, Property}, Msg) ->
267268
case Property of
268269
<<"correlation_id">> ->

deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ routing_tests() ->
4040
[
4141
routing_key_hashing_test,
4242
custom_header_hashing_test,
43+
custom_header_undefined,
4344
message_id_hashing_test,
4445
correlation_id_hashing_test,
4546
timestamp_hashing_test,
@@ -121,7 +122,7 @@ end_per_testcase(Testcase, Config) ->
121122

122123
%% N.B. lowering this value below 100K increases the probability
123124
%% of failing the Chi squared test in some environments
124-
-define(DEFAULT_SAMPLE_COUNT, 150000).
125+
-define(DEFAULT_SAMPLE_COUNT, 150_000).
125126

126127
routing_key_hashing_test(Config) ->
127128
ok = test_with_rk(Config, ?RoutingTestQs).
@@ -145,6 +146,43 @@ other_routing_test(Config) ->
145146
ok = test_mutually_exclusive_arguments(Config),
146147
ok.
147148

149+
%% Test case for
150+
%% https://github.com/rabbitmq/rabbitmq-server/discussions/11671
151+
%% According to our docs, it's allowed (although not recommended)
152+
%% for the publishing client to omit the header:
153+
%% "If published messages do not contain the header,
154+
%% they will all get routed to the same arbitrarily chosen queue."
155+
custom_header_undefined(Config) ->
156+
Exchange = <<"my exchange">>,
157+
Queue = <<"my queue">>,
158+
159+
Ch = rabbit_ct_client_helpers:open_channel(Config),
160+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
161+
#'exchange.declare_ok'{} = amqp_channel:call(
162+
Ch, #'exchange.declare' {
163+
exchange = Exchange,
164+
type = <<"x-consistent-hash">>,
165+
arguments = [{<<"hash-header">>, longstr, <<"hashme">>}]
166+
}),
167+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Queue}),
168+
#'queue.bind_ok'{} = amqp_channel:call(
169+
Ch, #'queue.bind'{queue = Queue,
170+
exchange = Exchange,
171+
routing_key = <<"1">>}),
172+
173+
amqp_channel:call(Ch,
174+
#'basic.publish'{exchange = Exchange},
175+
%% We leave the "hashme" header undefined.
176+
#amqp_msg{}),
177+
amqp_channel:wait_for_confirms(Ch, 10),
178+
179+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{}},
180+
amqp_channel:call(Ch, #'basic.get'{queue = Queue})),
181+
182+
rabbit_ct_client_helpers:close_channel(Ch),
183+
clean_up_test_topology(Config, Exchange, [Queue]),
184+
ok.
185+
148186
%% Test that messages originally published with AMQP to a quorum queue
149187
%% can be dead lettered via the consistent hash exchange to a stream.
150188
amqp_dead_letter(Config) ->
@@ -280,45 +318,60 @@ wait_for_accepts(N) ->
280318
%% -------------------------------------------------------------------
281319

282320
test_with_rk(Config, Qs) ->
283-
test0(Config, fun (E) ->
321+
test0(Config,
322+
fun (E) ->
284323
#'basic.publish'{exchange = E, routing_key = rnd()}
285324
end,
286325
fun() ->
287326
#amqp_msg{props = #'P_basic'{}, payload = <<>>}
288-
end, [], Qs).
327+
end,
328+
[],
329+
Qs).
289330

290331
test_with_header(Config, Qs) ->
291-
test0(Config, fun (E) ->
332+
test0(Config,
333+
fun (E) ->
292334
#'basic.publish'{exchange = E}
293335
end,
294336
fun() ->
295337
H = [{<<"hashme">>, longstr, rnd()}],
296338
#amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
297-
end, [{<<"hash-header">>, longstr, <<"hashme">>}], Qs).
339+
end,
340+
[{<<"hash-header">>, longstr, <<"hashme">>}],
341+
Qs).
298342

299343
test_with_correlation_id(Config, Qs) ->
300-
test0(Config, fun(E) ->
344+
test0(Config,
345+
fun(E) ->
301346
#'basic.publish'{exchange = E}
302347
end,
303348
fun() ->
304349
#amqp_msg{props = #'P_basic'{correlation_id = rnd()}, payload = <<>>}
305-
end, [{<<"hash-property">>, longstr, <<"correlation_id">>}], Qs).
350+
end,
351+
[{<<"hash-property">>, longstr, <<"correlation_id">>}],
352+
Qs).
306353

307354
test_with_message_id(Config, Qs) ->
308-
test0(Config, fun(E) ->
355+
test0(Config,
356+
fun(E) ->
309357
#'basic.publish'{exchange = E}
310358
end,
311359
fun() ->
312360
#amqp_msg{props = #'P_basic'{message_id = rnd()}, payload = <<>>}
313-
end, [{<<"hash-property">>, longstr, <<"message_id">>}], Qs).
361+
end,
362+
[{<<"hash-property">>, longstr, <<"message_id">>}],
363+
Qs).
314364

315365
test_with_timestamp(Config, Qs) ->
316-
test0(Config, fun(E) ->
366+
test0(Config,
367+
fun(E) ->
317368
#'basic.publish'{exchange = E}
318369
end,
319370
fun() ->
320371
#amqp_msg{props = #'P_basic'{timestamp = rnd_int()}, payload = <<>>}
321-
end, [{<<"hash-property">>, longstr, <<"timestamp">>}], Qs).
372+
end,
373+
[{<<"hash-property">>, longstr, <<"timestamp">>}],
374+
Qs).
322375

323376
test_mutually_exclusive_arguments(Config) ->
324377
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
@@ -359,7 +412,7 @@ test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues) ->
359412
test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues, ?DEFAULT_SAMPLE_COUNT).
360413

361414
test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, IterationCount) ->
362-
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
415+
Chan = rabbit_ct_client_helpers:open_channel(Config),
363416
#'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}),
364417

365418
CHX = <<"e">>,

0 commit comments

Comments
 (0)