Skip to content
This repository was archived by the owner on Nov 17, 2020. It is now read-only.

Commit 8bbff60

Browse files
authored
Merge pull request #274 from rabbitmq/quorum-queue
Quorum queue
2 parents 8833755 + b1ea14d commit 8bbff60

File tree

5 files changed

+45
-30
lines changed

5 files changed

+45
-30
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ DEP_PLUGINS = $(PROJECT)/mk/rabbitmq-build.mk \
4747

4848
WITHOUT = plugins/proper
4949

50+
PLT_APPS += mnesia crypto ssl
51+
5052
include mk/rabbitmq-components.mk
5153
include erlang.mk
5254

include/rabbit.hrl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@
143143
policy_version,
144144
slave_pids_pending_shutdown,
145145
vhost, %% secondary index
146-
options = #{}}).
146+
options = #{},
147+
type = classic,
148+
quorum_nodes }).
147149

148150
-record(exchange_serial, {name, next}).
149151

@@ -234,8 +236,8 @@
234236

235237
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2018 Pivotal Software, Inc.").
236238
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
237-
-define(OTP_MINIMUM, "19.3").
238-
-define(ERTS_MINIMUM, "8.3").
239+
-define(OTP_MINIMUM, "21.0").
240+
-define(ERTS_MINIMUM, "10.0").
239241

240242
%% EMPTY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
241243
%% - 1 byte of frame type

mk/rabbitmq-run.mk

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ node_pid_file = $(call node_tmpdir,$(1))/$(1).pid
6464
node_log_base = $(call node_tmpdir,$(1))/log
6565
node_mnesia_base = $(call node_tmpdir,$(1))/mnesia
6666
node_mnesia_dir = $(call node_mnesia_base,$(1))/$(1)
67+
node_quorum_dir = $(call node_mnesia_dir,$(1))/quorum
6768
node_schema_dir = $(call node_tmpdir,$(1))/schema
6869
node_plugins_expand_dir = $(call node_tmpdir,$(1))/plugins
6970
node_generated_config_dir = $(call node_tmpdir,$(1))/config
@@ -78,6 +79,7 @@ RABBITMQ_PID_FILE ?= $(call node_pid_file,$(RABBITMQ_NODENAME_FOR_PATHS))
7879
RABBITMQ_LOG_BASE ?= $(call node_log_base,$(RABBITMQ_NODENAME_FOR_PATHS))
7980
RABBITMQ_MNESIA_BASE ?= $(call node_mnesia_base,$(RABBITMQ_NODENAME_FOR_PATHS))
8081
RABBITMQ_MNESIA_DIR ?= $(call node_mnesia_dir,$(RABBITMQ_NODENAME_FOR_PATHS))
82+
RABBITMQ_QUORUM_DIR ?= $(call node_quorum_dir,$(RABBITMQ_NODENAME_FOR_PATHS))
8183
RABBITMQ_SCHEMA_DIR ?= $(call node_schema_dir,$(RABBITMQ_NODENAME_FOR_PATHS))
8284
RABBITMQ_PLUGINS_EXPAND_DIR ?= $(call node_plugins_expand_dir,$(RABBITMQ_NODENAME_FOR_PATHS))
8385
RABBITMQ_GENERATED_CONFIG_DIR ?= $(call node_generated_config_dir,$(RABBITMQ_NODENAME_FOR_PATHS))
@@ -105,6 +107,7 @@ RABBITMQ_PID_FILE="$(call node_pid_file,$(2))" \
105107
RABBITMQ_LOG_BASE="$(call node_log_base,$(2))" \
106108
RABBITMQ_MNESIA_BASE="$(call node_mnesia_base,$(2))" \
107109
RABBITMQ_MNESIA_DIR="$(call node_mnesia_dir,$(2))" \
110+
RABBITMQ_QUORUM_DIR="$(call node_quorum_dir,$(2))" \
108111
RABBITMQ_SCHEMA_DIR="$(call node_schema_dir,$(2))" \
109112
RABBITMQ_GENERATED_CONFIG_DIR="$(call node_generated_config_dir,$(2))" \
110113
RABBITMQ_PLUGINS_DIR="$(RMQ_PLUGINS_DIR)" \
@@ -162,6 +165,9 @@ define test_rabbitmq_config
162165
[
163166
{rabbit, [
164167
{loopback_users, []}
168+
]},
169+
{ra, [
170+
{data_dir, "$(RABBITMQ_QUORUM_DIR)"}
165171
]}
166172
].
167173
endef
@@ -193,7 +199,10 @@ define test_rabbitmq_config_with_tls
193199
{fail_if_no_peer_cert, false},
194200
{honor_cipher_order, true}]}
195201
]}
196-
]}
202+
]},
203+
{ra, [
204+
{data_dir, "$(RABBITMQ_QUORUM_DIR)"}
205+
]}
197206
].
198207
endef
199208

@@ -310,6 +319,7 @@ start-brokers start-cluster:
310319
RABBITMQ_NODE_PORT="$$((5672 + $$n - 1))" \
311320
RABBITMQ_SERVER_START_ARGS=" \
312321
-rabbit loopback_users [] \
322+
-ra data_dir "$(RABBITMQ_QUORUM_DIR)"
313323
-rabbitmq_management listener [{port,$$((15672 + $$n - 1))}] \
314324
"; \
315325
if test '$@' = 'start-cluster' && test "$$nodename1"; then \

src/credit_flow.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262

6363
-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]).
6464
-export([peer_down/1]).
65+
-export([block/1, unblock/1]).
6566

6667
%%----------------------------------------------------------------------------
6768

src/pmon.erl

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,62 +34,62 @@
3434

3535
-compile({no_auto_import, [monitor/2]}).
3636

37-
-record(state, {dict, module}).
37+
-record(state, {monitors = #{} :: #{item() => reference()},
38+
module = erlang :: module()}).
3839

3940
%%----------------------------------------------------------------------------
4041

4142
-export_type([?MODULE/0]).
4243

43-
-opaque(?MODULE() :: #state{dict :: dict:dict(),
44-
module :: atom()}).
44+
-opaque(?MODULE() :: #state{}).
4545

46-
-type(item() :: pid() | {atom(), node()}).
46+
-type(item() :: pid() | {atom(), node()}).
4747

48-
-spec new() -> ?MODULE().
49-
-spec new('erlang' | 'delegate') -> ?MODULE().
50-
-spec monitor(item(), ?MODULE()) -> ?MODULE().
51-
-spec monitor_all([item()], ?MODULE()) -> ?MODULE().
52-
-spec demonitor(item(), ?MODULE()) -> ?MODULE().
53-
-spec is_monitored(item(), ?MODULE()) -> boolean().
54-
-spec erase(item(), ?MODULE()) -> ?MODULE().
55-
-spec monitored(?MODULE()) -> [item()].
56-
-spec is_empty(?MODULE()) -> boolean().
5748

49+
-spec new() -> ?MODULE().
5850
new() -> new(erlang).
5951

60-
new(Module) -> #state{dict = dict:new(),
61-
module = Module}.
52+
-spec new('erlang' | 'delegate') -> ?MODULE().
53+
new(Module) -> #state{module = Module}.
6254

63-
monitor(Item, S = #state{dict = M, module = Module}) ->
64-
case dict:is_key(Item, M) of
55+
-spec monitor(item(), ?MODULE()) -> ?MODULE().
56+
monitor(Item, S = #state{monitors = M, module = Module}) ->
57+
case maps:is_key(Item, M) of
6558
true -> S;
6659
false -> case node_alive_shortcut(Item) of
6760
true -> Ref = Module:monitor(process, Item),
68-
S#state{dict = dict:store(Item, Ref, M)};
61+
S#state{monitors = maps:put(Item, Ref, M)};
6962
false -> self() ! {'DOWN', fake_ref, process, Item,
7063
nodedown},
7164
S
7265
end
7366
end.
7467

68+
-spec monitor_all([item()], ?MODULE()) -> ?MODULE().
7569
monitor_all([], S) -> S; %% optimisation
7670
monitor_all([Item], S) -> monitor(Item, S); %% optimisation
7771
monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items).
7872

79-
demonitor(Item, S = #state{dict = M, module = Module}) ->
80-
case dict:find(Item, M) of
81-
{ok, MRef} -> Module:demonitor(MRef),
82-
S#state{dict = dict:erase(Item, M)};
73+
-spec demonitor(item(), ?MODULE()) -> ?MODULE().
74+
demonitor(Item, S = #state{monitors = M0, module = Module}) ->
75+
case maps:take(Item, M0) of
76+
{MRef, M} -> Module:demonitor(MRef),
77+
S#state{monitors = M};
8378
error -> S
8479
end.
8580

86-
is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M).
81+
-spec is_monitored(item(), ?MODULE()) -> boolean().
82+
is_monitored(Item, #state{monitors = M}) -> maps:is_key(Item, M).
8783

88-
erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}.
84+
-spec erase(item(), ?MODULE()) -> ?MODULE().
85+
erase(Item, S = #state{monitors = M}) ->
86+
S#state{monitors = maps:remove(Item, M)}.
8987

90-
monitored(#state{dict = M}) -> dict:fetch_keys(M).
88+
-spec monitored(?MODULE()) -> [item()].
89+
monitored(#state{monitors = M}) -> maps:keys(M).
9190

92-
is_empty(#state{dict = M}) -> dict:size(M) == 0.
91+
-spec is_empty(?MODULE()) -> boolean().
92+
is_empty(#state{monitors = M}) -> maps:size(M) == 0.
9393

9494
%%----------------------------------------------------------------------------
9595

0 commit comments

Comments
 (0)