Skip to content

Commit 391eb24

Browse files
Merge branch 'master' into rabbitmq-server-1873-binding-recovery
2 parents 7521ab4 + 1117739 commit 391eb24

12 files changed

+80
-144
lines changed

docs/rabbitmq.conf.example

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -509,10 +509,10 @@
509509
# net_ticktime = 60
510510

511511
## Inter-node communication port range.
512+
## The parameters inet_dist_listen_min and inet_dist_listen_max
513+
## can be configured in the classic config format only.
512514
## Related doc guide: https://www.rabbitmq.com/networking.html#epmd-inet-dist-port-range.
513-
##
514-
# inet_dist_listen_min = 25672
515-
# inet_dist_listen_max = 25692
515+
516516

517517
## ----------------------------------------------------------------------------
518518
## RabbitMQ Management Plugin

priv/schema/rabbit.schema

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,16 +1352,6 @@ end}.
13521352
{validators, ["non_zero_positive_integer"]}
13531353
]}.
13541354

1355-
{mapping, "inet_dist_listen_min", "kernel.inet_dist_listen_min",[
1356-
{datatype, [integer]},
1357-
{validators, ["non_zero_positive_integer"]}
1358-
]}.
1359-
1360-
{mapping, "inet_dist_listen_max", "kernel.inet_dist_listen_max",[
1361-
{datatype, [integer]},
1362-
{validators, ["non_zero_positive_integer"]}
1363-
]}.
1364-
13651355
% ==========================
13661356
% sysmon_handler section
13671357
% ==========================

scripts/rabbitmq-server

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ RABBITMQ_PRELAUNCH_NODENAME="rabbitmqprelaunch${$}@localhost"
187187
NOTIFY_SOCKET= \
188188
RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \
189189
ERL_CRASH_DUMP=$ERL_CRASH_DUMP \
190+
RABBITMQ_CONFIG_ARG_FILE=$RABBITMQ_CONFIG_ARG_FILE \
190191
RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \
191192
${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \
192193
-boot "${CLEAN_BOOT_FILE}" \

src/rabbit_binding.erl

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949

5050
-type bind_ok_or_error() :: 'ok' | bind_errors() |
5151
rabbit_types:error(
52-
'binding_not_found' |
5352
{'binding_invalid', string(), [any()]}).
5453
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
5554
-type inner_fun() ::
@@ -178,19 +177,15 @@ add(Src, Dst, B, ActingUser) ->
178177
lock_resource(Src),
179178
lock_resource(Dst),
180179
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
181-
case (SrcDurable andalso DstDurable andalso
182-
mnesia:read({rabbit_durable_route, B}) =/= []) of
183-
false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
184-
fun mnesia:write/3),
185-
x_callback(transaction, Src, add_binding, B),
186-
Serial = rabbit_exchange:serial(Src),
187-
fun () ->
188-
x_callback(Serial, Src, add_binding, B),
189-
ok = rabbit_event:notify(
190-
binding_created,
191-
info(B) ++ [{user_who_performed_action, ActingUser}])
192-
end;
193-
true -> rabbit_misc:const({error, binding_not_found})
180+
ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
181+
fun mnesia:write/3),
182+
x_callback(transaction, Src, add_binding, B),
183+
Serial = rabbit_exchange:serial(Src),
184+
fun () ->
185+
x_callback(Serial, Src, add_binding, B),
186+
ok = rabbit_event:notify(
187+
binding_created,
188+
info(B) ++ [{user_who_performed_action, ActingUser}])
194189
end.
195190

196191
-spec remove(rabbit_types:binding()) -> bind_res().
@@ -208,7 +203,10 @@ remove(Binding, InnerFun, ActingUser) ->
208203
case mnesia:read(rabbit_route, B, write) of
209204
[] -> case mnesia:read(rabbit_durable_route, B, write) of
210205
[] -> rabbit_misc:const(ok);
211-
_ -> rabbit_misc:const({error, binding_not_found})
206+
%% We still delete the binding and run
207+
%% all post-delete functions if there is only
208+
%% a durable route in the database
209+
_ -> remove(Src, Dst, B, ActingUser)
212210
end;
213211
_ -> case InnerFun(Src, Dst) of
214212
ok -> remove(Src, Dst, B, ActingUser);
@@ -275,17 +273,20 @@ list_for_source(SrcName) ->
275273
-spec list_for_destination
276274
(rabbit_types:binding_destination()) -> bindings().
277275

278-
list_for_destination(DstName) ->
279-
implicit_for_destination(DstName) ++
280-
mnesia:async_dirty(
276+
list_for_destination(DstName = #resource{virtual_host = VHostPath}) ->
277+
AllBindings = mnesia:async_dirty(
281278
fun() ->
282279
Route = #route{binding = #binding{destination = DstName,
283280
_ = '_'}},
284281
[reverse_binding(B) ||
285282
#reverse_route{reverse_binding = B} <-
286283
mnesia:match_object(rabbit_reverse_route,
287284
reverse_route(Route), read)]
288-
end).
285+
end),
286+
Filtered = lists:filter(fun(#binding{source = S}) ->
287+
S =/= ?DEFAULT_EXCHANGE(VHostPath)
288+
end, AllBindings),
289+
implicit_for_destination(DstName) ++ Filtered.
289290

290291
implicit_bindings(VHostPath) ->
291292
DstQueues = rabbit_amqqueue:list_names(VHostPath),

test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -580,19 +580,6 @@ credential_validator.regexp = ^abc\\d+",
580580
]}],
581581
[]},
582582

583-
{kernel_inet_dist_listen_min,
584-
"inet_dist_listen_min = 16000",
585-
[{kernel, [
586-
{inet_dist_listen_min, 16000}
587-
]}],
588-
[]},
589-
{kernel_inet_dist_listen_max,
590-
"inet_dist_listen_max = 16100",
591-
[{kernel, [
592-
{inet_dist_listen_max, 16100}
593-
]}],
594-
[]},
595-
596583
{log_syslog_settings,
597584
"log.syslog = true
598585
log.syslog.identity = rabbitmq

test/dead_lettering_SUITE.erl

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,23 +93,14 @@ init_per_group(classic_queue, Config) ->
9393
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
9494
{queue_durable, false}]);
9595
init_per_group(quorum_queue, Config) ->
96-
Nodes = rabbit_ct_broker_helpers:get_node_configs(
97-
Config, nodename),
98-
Ret = rabbit_ct_broker_helpers:rpc(
99-
Config, 0,
100-
rabbit_feature_flags,
101-
is_supported_remotely,
102-
[Nodes, [quorum_queue], 60000]),
103-
case Ret of
104-
true ->
105-
ok = rabbit_ct_broker_helpers:rpc(
106-
Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
96+
case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
97+
ok ->
10798
rabbit_ct_helpers:set_config(
10899
Config,
109100
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
110101
{queue_durable, true}]);
111-
false ->
112-
{skip, "Quorum queues are unsupported"}
102+
Skip ->
103+
Skip
113104
end;
114105
init_per_group(mirrored_queue, Config) ->
115106
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,

test/dynamic_ha_SUITE.erl

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ groups() ->
6161
promote_on_shutdown,
6262
promote_on_failure,
6363
slave_recovers_after_vhost_failure,
64-
slave_recovers_after_vhost_down_an_up,
64+
slave_recovers_after_vhost_down_and_up,
6565
master_migrates_on_vhost_down,
6666
slave_recovers_after_vhost_down_and_master_migrated,
6767
queue_survive_adding_dead_vhost_mirror
@@ -133,7 +133,7 @@ change_policy(Config) ->
133133

134134
%% When we first declare a queue with no policy, it's not HA.
135135
amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME}),
136-
timer:sleep(100),
136+
timer:sleep(200),
137137
assert_slaves(A, ?QNAME, {A, ''}),
138138

139139
%% Give it policy "all", it becomes HA and gets all mirrors
@@ -417,7 +417,7 @@ slave_recovers_after_vhost_failure(Config) ->
417417
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
418418
QName = <<"slave_recovers_after_vhost_failure-q">>,
419419
amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
420-
timer:sleep(300),
420+
timer:sleep(500),
421421
assert_slaves(A, QName, {A, [B]}, [{A, []}]),
422422

423423
%% Crash vhost on a node hosting a mirror
@@ -426,22 +426,25 @@ slave_recovers_after_vhost_failure(Config) ->
426426

427427
assert_slaves(A, QName, {A, [B]}, [{A, []}]).
428428

429-
slave_recovers_after_vhost_down_an_up(Config) ->
429+
slave_recovers_after_vhost_down_and_up(Config) ->
430430
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
431431
rabbit_ct_broker_helpers:set_ha_policy_all(Config),
432432
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
433-
QName = <<"slave_recovers_after_vhost_down_an_up-q">>,
433+
QName = <<"slave_recovers_after_vhost_down_and_up-q">>,
434434
amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
435-
timer:sleep(100),
435+
timer:sleep(200),
436436
assert_slaves(A, QName, {A, [B]}, [{A, []}]),
437437

438438
%% Crash vhost on a node hosting a mirror
439439
rabbit_ct_broker_helpers:force_vhost_failure(Config, B, <<"/">>),
440-
%% Vhost is down now
441-
false = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, is_vhost_alive, [<<"/">>]),
442-
timer:sleep(300),
440+
%% rabbit_ct_broker_helpers:force_vhost_failure/2 will retry up to 10 times to
441+
%% make sure that the top vhost supervision tree process did go down. MK.
442+
timer:sleep(500),
443443
%% Vhost is back up
444-
{ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]),
444+
case rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]) of
445+
{ok, _Sup} -> ok;
446+
{error,{already_started, _Sup}} -> ok
447+
end,
445448

446449
assert_slaves(A, QName, {A, [B]}, [{A, []}]).
447450

@@ -451,12 +454,12 @@ master_migrates_on_vhost_down(Config) ->
451454
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
452455
QName = <<"master_migrates_on_vhost_down-q">>,
453456
amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
454-
timer:sleep(100),
457+
timer:sleep(200),
455458
assert_slaves(A, QName, {A, [B]}, [{A, []}]),
456459

457460
%% Crash vhost on the node hosting queue master
458461
rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>),
459-
timer:sleep(300),
462+
timer:sleep(500),
460463
assert_slaves(A, QName, {B, []}).
461464

462465
slave_recovers_after_vhost_down_and_master_migrated(Config) ->
@@ -465,16 +468,19 @@ slave_recovers_after_vhost_down_and_master_migrated(Config) ->
465468
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
466469
QName = <<"slave_recovers_after_vhost_down_and_master_migrated-q">>,
467470
amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
468-
timer:sleep(100),
471+
timer:sleep(200),
469472
assert_slaves(A, QName, {A, [B]}, [{A, []}]),
470473
%% Crash vhost on the node hosting queue master
471474
rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>),
472-
timer:sleep(300),
475+
timer:sleep(500),
473476
assert_slaves(B, QName, {B, []}),
474477

475478
%% Restart the vhost on the node (previously) hosting queue master
476-
{ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]),
477-
timer:sleep(300),
479+
case rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]) of
480+
{ok, _Sup} -> ok;
481+
{error,{already_started, _Sup}} -> ok
482+
end,
483+
timer:sleep(500),
478484
assert_slaves(B, QName, {B, [A]}, [{B, []}]).
479485

480486
random_policy(Config) ->
@@ -569,7 +575,7 @@ assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Att
569575
State ->
570576
ct:pal("Waiting to leave state ~p~n Waiting for ~p~n",
571577
[State, {ExpMNode, ExpSNodes}]),
572-
timer:sleep(100),
578+
timer:sleep(200),
573579
assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes},
574580
PermittedIntermediate,
575581
Attempts - 1)

test/dynamic_qq_SUITE.erl

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -87,21 +87,12 @@ init_per_testcase(Testcase, Config) ->
8787
Config1,
8888
rabbit_ct_broker_helpers:setup_steps() ++
8989
rabbit_ct_client_helpers:setup_steps()),
90-
Nodes = rabbit_ct_broker_helpers:get_node_configs(
91-
Config2, nodename),
92-
Ret = rabbit_ct_broker_helpers:rpc(
93-
Config2, 0,
94-
rabbit_feature_flags,
95-
is_supported_remotely,
96-
[Nodes, [quorum_queue], 60000]),
97-
case Ret of
98-
true ->
99-
ok = rabbit_ct_broker_helpers:rpc(
100-
Config2, 0, rabbit_feature_flags, enable, [quorum_queue]),
90+
case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of
91+
ok ->
10192
Config2;
102-
false ->
93+
Skip ->
10394
end_per_testcase(Testcase, Config2),
104-
{skip, "Quorum queues are unsupported"}
95+
Skip
10596
end.
10697

10798
end_per_testcase(Testcase, Config) ->

test/publisher_confirms_parallel_SUITE.erl

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -74,23 +74,14 @@ init_per_group(classic_queue, Config) ->
7474
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
7575
{queue_durable, true}]);
7676
init_per_group(quorum_queue, Config) ->
77-
Nodes = rabbit_ct_broker_helpers:get_node_configs(
78-
Config, nodename),
79-
Ret = rabbit_ct_broker_helpers:rpc(
80-
Config, 0,
81-
rabbit_feature_flags,
82-
is_supported_remotely,
83-
[Nodes, [quorum_queue], 60000]),
84-
case Ret of
85-
true ->
86-
ok = rabbit_ct_broker_helpers:rpc(
87-
Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
77+
case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
78+
ok ->
8879
rabbit_ct_helpers:set_config(
8980
Config,
9081
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
9182
{queue_durable, true}]);
92-
false ->
93-
{skip, "Quorum queues are unsupported"}
83+
Skip ->
84+
Skip
9485
end;
9586
init_per_group(mirrored_queue, Config) ->
9687
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,

test/queue_parallel_SUITE.erl

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,15 @@ init_per_group(classic_queue, Config) ->
8888
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
8989
{queue_durable, true}]);
9090
init_per_group(quorum_queue, Config) ->
91-
rabbit_ct_helpers:set_config(
92-
Config,
93-
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
94-
{queue_durable, true}]);
91+
case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
92+
ok ->
93+
rabbit_ct_helpers:set_config(
94+
Config,
95+
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
96+
{queue_durable, true}]);
97+
Skip ->
98+
Skip
99+
end;
95100
init_per_group(mirrored_queue, Config) ->
96101
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
97102
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),

0 commit comments

Comments
 (0)