Skip to content

Commit dd25bf9

Browse files
Merge pull request #857 from rabbitmq/rabbitmq-server-802
Infinity loop in synchronisation of priority queues
2 parents ebd33ba + b6aaf53 commit dd25bf9

File tree

4 files changed

+310
-79
lines changed

4 files changed

+310
-79
lines changed

src/rabbit_mirror_queue_slave.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
120120
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
121121
{ok, BQ} = application:get_env(backing_queue_module),
122122
Q1 = Q #amqqueue { pid = QPid },
123-
ok = rabbit_queue_index:erase(QName), %% For crash recovery
123+
_ = BQ:delete_crashed(Q), %% For crash recovery
124124
BQS = bq_init(BQ, Q1, new),
125125
State = #state { q = Q1,
126126
gm = GM,

src/rabbit_priority_queue.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
4444
zip_msgs_and_acks/4]).
4545

46-
-record(state, {bq, bqss}).
46+
-record(state, {bq, bqss, max_priority}).
4747
-record(passthrough, {bq, bqs}).
4848

4949
%% See 'note on suffixes' below
@@ -157,7 +157,8 @@ init(Q, Recover, AsyncCallback) ->
157157
[{P, Init(P, Term)} || {P, Term} <- PsTerms]
158158
end,
159159
#state{bq = BQ,
160-
bqss = BQSs}
160+
bqss = BQSs,
161+
max_priority = hd(Ps)}
161162
end.
162163
%% [0] collapse_recovery has the effect of making a list of recovery
163164
%% terms in priority order, even for non priority queues. It's easier
@@ -419,6 +420,8 @@ info(Item, #passthrough{bq = BQ, bqs = BQS}) ->
419420

420421
invoke(Mod, {P, Fun}, State = #state{bq = BQ}) ->
421422
pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State);
423+
invoke(Mod, Fun, State = #state{bq = BQ, max_priority = P}) ->
424+
pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State);
422425
invoke(Mod, Fun, State = #passthrough{bq = BQ, bqs = BQS}) ->
423426
?passthrough1(invoke(Mod, Fun, BQS)).
424427

0 commit comments

Comments
 (0)