Skip to content

Commit 92ee470

Browse files
Merge branch 'main' into rin/add-looking_glass
2 parents 3f1e913 + 17e0349 commit 92ee470

File tree

12 files changed

+75
-89
lines changed

12 files changed

+75
-89
lines changed

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ handle_call(_Request, _From, State) ->
413413
{noreply, State}.
414414

415415
handle_cast(notify_node_up, State = #state{guid = GUID}) ->
416-
Nodes = rabbit_nodes:list_running() -- [node()],
416+
Nodes = rabbit_nodes:list_reachable() -- [node()],
417417
gen_server:abcast(Nodes, ?SERVER,
418418
{node_up, node(), rabbit_db_cluster:node_type(), GUID}),
419419
%% register other active rabbits with this rabbit

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1744,17 +1744,17 @@ force_shrink_member_to_current_member(VHost, Name) ->
17441744
force_all_queues_shrink_member_to_current_member() ->
17451745
rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues to a single node cluster"),
17461746
Node = node(),
1747-
[begin
1748-
QName = amqqueue:get_name(Q),
1749-
{RaName, _} = amqqueue:get_pid(Q),
1750-
rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]),
1751-
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
1752-
Fun = fun (QQ) ->
1753-
TS0 = amqqueue:get_type_state(QQ),
1754-
TS = TS0#{nodes => [Node]},
1755-
amqqueue:set_type_state(QQ, TS)
1756-
end,
1757-
_ = rabbit_amqqueue:update(QName, Fun)
1758-
end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE],
1747+
_ = [begin
1748+
QName = amqqueue:get_name(Q),
1749+
{RaName, _} = amqqueue:get_pid(Q),
1750+
rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]),
1751+
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
1752+
Fun = fun (QQ) ->
1753+
TS0 = amqqueue:get_type_state(QQ),
1754+
TS = TS0#{nodes => [Node]},
1755+
amqqueue:set_type_state(QQ, TS)
1756+
end,
1757+
_ = rabbit_amqqueue:update(QName, Fun)
1758+
end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE],
17591759
rabbit_log:warning("Disaster recovery procedure: shrinking finished"),
17601760
ok.

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,9 @@ update_metadata(Name, Metadata0, ActingUser) ->
252252
Error
253253
end.
254254

255-
-spec update(vhost:name(), binary(), [atom()], rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
256-
update(Name, Description, Tags, ActingUser) ->
257-
Metadata = #{description => Description, tags => Tags},
255+
-spec update(vhost:name(), binary(), [atom()], rabbit_queue_type:queue_type() | 'undefined', rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
256+
update(Name, Description, Tags, DefaultQueueType, ActingUser) ->
257+
Metadata = #{description => Description, tags => Tags, default_queue_type => DefaultQueueType},
258258
update_metadata(Name, Metadata, ActingUser).
259259

260260
-spec delete(vhost:name(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
@@ -325,7 +325,7 @@ put_vhost(Name, Description, Tags0, DefaultQueueType, Trace, Username) ->
325325
rabbit_log:debug("Parsed tags ~tp to ~tp", [Tags, ParsedTags]),
326326
Result = case exists(Name) of
327327
true ->
328-
update(Name, Description, ParsedTags, Username);
328+
update(Name, Description, ParsedTags, DefaultQueueType, Username);
329329
false ->
330330
Metadata0 = #{description => Description,
331331
tags => ParsedTags},

deps/rabbitmq_federation/src/rabbit_federation_link_util.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,9 @@ connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Messag
140140
connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
141141
rabbit_federation_status:report(
142142
Upstream, UParams, XorQName, clean_reason(E)),
143-
Reason = case E of
144-
{error, Value} -> Value;
145-
Other -> Other
146-
end,
147143
log_warning(XorQName, "did not connect to ~ts. Reason: ~tp",
148144
[rabbit_federation_upstream:params_to_string(UParams),
149-
Reason]),
145+
E]),
150146
{stop, {shutdown, restart}, State};
151147

152148
connection_error(remote, E, Upstream, UParams, XorQName, State) ->

deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,7 @@ handle_cast(pause, State = #state{run = false}) ->
104104
handle_cast(pause, State = #not_started{}) ->
105105
{noreply, State#not_started{run = false}};
106106

107-
handle_cast(pause, State = #state{ch = Ch, upstream = Upstream = #upstream{
108-
name = UpName, queue_name = QName
109-
}}) ->
110-
rabbit_log_federation:debug("Federation link of ~s (upstream: '~s'): asked to pause",
111-
[QName, UpName]),
107+
handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) ->
112108
cancel(Ch, Upstream),
113109
{noreply, State#state{run = false}};
114110

@@ -309,22 +305,18 @@ visit_match(_ ,_) ->
309305
consumer_tag(#upstream{consumer_tag = ConsumerTag}) ->
310306
ConsumerTag.
311307

312-
consume(Ch, Upstream = #upstream{name = UpName}, UQueue) ->
308+
consume(Ch, Upstream, UQueue) ->
313309
ConsumerTag = consumer_tag(Upstream),
314310
NoAck = Upstream#upstream.ack_mode =:= 'no-ack',
315-
rabbit_log_federation:debug("Federation link of ~ts: will consume from the upstream '~ts'",
316-
[rabbit_misc:rs(amqqueue:get_name(UQueue)), UpName]),
317311
amqp_channel:cast(
318312
Ch, #'basic.consume'{queue = name(UQueue),
319313
no_ack = NoAck,
320314
nowait = true,
321315
consumer_tag = ConsumerTag,
322316
arguments = [{<<"x-priority">>, long, -1}]}).
323317

324-
cancel(Ch, Upstream = #upstream{name = UpName, queue_name = QName}) ->
318+
cancel(Ch, Upstream) ->
325319
ConsumerTag = consumer_tag(Upstream),
326-
rabbit_log_federation:debug("Federation queue '~ts' link: will cancel consumer '~ts' on upstream '~ts'",
327-
[QName, ConsumerTag, UpName]),
328320
amqp_channel:cast(Ch, #'basic.cancel'{nowait = true,
329321
consumer_tag = ConsumerTag}).
330322

deps/rabbitmq_federation/src/rabbit_federation_upstream.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ params_table(SafeURI, XorQ) ->
6161

6262
params_to_string(#upstream_params{safe_uri = SafeURI,
6363
x_or_q = XorQ}) ->
64-
print("~ts on '~ts'", [rabbit_misc:rs(r(XorQ)), SafeURI]).
64+
print("~ts on ~ts", [rabbit_misc:rs(r(XorQ)), SafeURI]).
6565

6666
remove_credentials(URI) ->
6767
list_to_binary(amqp_uri:remove_credentials(binary_to_list(URI))).

deps/rabbitmq_web_dispatch/src/rabbit_web_dispatch_registry.erl

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,17 @@ listener_info(Listener) ->
157157
P
158158
end,
159159
Port = pget(port, Listener),
160-
[{IPAddress, _Port, _Family} | _]
161-
= rabbit_networking:tcp_listener_addresses(Port),
160+
IPAddress = case rabbit_misc:pget(ip, Listener) of
161+
undefined ->
162+
[{AutoIPAddress, _Port, _Family} | _]
163+
= rabbit_networking:tcp_listener_addresses(Port),
164+
AutoIPAddress;
165+
IP when is_tuple(IP) ->
166+
IP;
167+
IP when is_list(IP) ->
168+
{ok, ParsedIP} = inet_parse:address(IP),
169+
ParsedIP
170+
end,
162171
[{Protocol, IPAddress, Port}].
163172

164173
lookup_dispatch(Lsnr) ->

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,20 @@ start_tls_listener(TLSConf0, CowboyOpts) ->
143143

144144
listener_started(Protocol, Listener) ->
145145
Port = rabbit_misc:pget(port, Listener),
146-
[rabbit_networking:tcp_listener_started(Protocol, Listener,
147-
IPAddress, Port)
148-
|| {IPAddress, _Port, _Family}
149-
<- rabbit_networking:tcp_listener_addresses(Port)],
146+
_ = case rabbit_misc:pget(ip, Listener) of
147+
undefined ->
148+
[rabbit_networking:tcp_listener_started(Protocol, Listener,
149+
IPAddress, Port)
150+
|| {IPAddress, _Port, _Family}
151+
<- rabbit_networking:tcp_listener_addresses(Port)];
152+
IP when is_tuple(IP) ->
153+
rabbit_networking:tcp_listener_started(Protocol, Listener,
154+
IP, Port);
155+
IP when is_list(IP) ->
156+
{ok, ParsedIP} = inet_parse:address(IP),
157+
rabbit_networking:tcp_listener_started(Protocol, Listener,
158+
ParsedIP, Port)
159+
end,
150160
ok.
151161

152162
get_tcp_conf(TCPConf0) ->

deps/rabbitmq_web_stomp/src/rabbit_web_stomp_listener.erl

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,20 @@ start_tls_listener(TLSConf0, CowboyOpts0, Routes) ->
165165

166166
listener_started(Protocol, Listener) ->
167167
Port = rabbit_misc:pget(port, Listener),
168-
[rabbit_networking:tcp_listener_started(Protocol, Listener,
169-
IPAddress, Port)
170-
|| {IPAddress, _Port, _Family}
171-
<- rabbit_networking:tcp_listener_addresses(Port)],
168+
_ = case rabbit_misc:pget(ip, Listener) of
169+
undefined ->
170+
[rabbit_networking:tcp_listener_started(Protocol, Listener,
171+
IPAddress, Port)
172+
|| {IPAddress, _Port, _Family}
173+
<- rabbit_networking:tcp_listener_addresses(Port)];
174+
IP when is_tuple(IP) ->
175+
rabbit_networking:tcp_listener_started(Protocol, Listener,
176+
IP, Port);
177+
IP when is_list(IP) ->
178+
{ok, ParsedIP} = inet_parse:address(IP),
179+
rabbit_networking:tcp_listener_started(Protocol, Listener,
180+
ParsedIP, Port)
181+
end,
172182
ok.
173183

174184
get_env(Key, Default) ->

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ dep_accept = hex 0.3.5
114114
dep_cowboy = hex 2.10.0
115115
dep_cowlib = hex 2.12.1
116116
dep_credentials_obfuscation = hex 3.4.0
117-
dep_looking_glass = git https://github.com/rabbitmq/looking_glass.git master
117+
dep_looking_glass = git https://github.com/rabbitmq/looking_glass.git main
118118
dep_prometheus = hex 4.10.0
119119
dep_ra = hex 2.6.1
120120
dep_ranch = hex 2.1.0

release-notes/3.12.0.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,12 @@ This release includes all bug fixes shipped in the `3.11.x` series.
243243

244244
GitHub issue: [#7743](https://github.com/rabbitmq/rabbitmq-server/issues/7743)
245245

246+
* Quorum queues now have a mechanism of shrinking down to just one replica.
247+
This is meant to be used **exclusively** for disaster recovery when a majority
248+
of nodes hosting replicas of a queue were permanently lost.
249+
250+
GitHub issue: [#8322](https://github.com/rabbitmq/rabbitmq-server/pull/8322)
251+
246252
* Classic mirrored queues that had a [length limit](https://rabbitmq.com/maxlength.html) defined on them
247253
handled the overflow of messages differently from the current elected leader,
248254
eventually causing exceptions and replica restarts.

release-notes/3.13.0.md

Lines changed: 7 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
## RabbitMQ 3.13.0
22

3-
RabbitMQ `3.12.0` is a new feature release.
3+
RabbitMQ `3.13.0` is a new feature release.
44

55
## Highlights
66

77
This release includes several new features, optimizations, and graduates (makes mandatory) a number of feature flags.
88

99
The user-facing areas that have seen the biggest improvements in this release are
1010

11-
* Classic queues use version 2 of the format format. This should significantly improve performance.
11+
* Classic queues use version 2 of the storage implementation (CQv2).
12+
This should significantly improve performance.
1213

1314
This release also features many internal API improvements in preparation to 4.0
1415
with [Khepri](https://www.youtube.com/watch?v=huT-zmXvfuM).
@@ -43,6 +44,7 @@ for release notes of other releases.
4344

4445
### Required Feature Flags
4546

47+
TBD
4648

4749
### Mixed version cluster compatibility
4850

@@ -59,7 +61,7 @@ periods of time (no more than a few hours).
5961

6062
## Compatibility Notes
6163

62-
### More Feature Flags Gratuate to Core Features ("Always Enabled")
64+
TBD
6365

6466

6567
### Minimum Supported Erlang Version
@@ -83,7 +85,7 @@ Any questions about this release, upgrades or RabbitMQ in general are welcome on
8385

8486
## Changes Worth Mentioning
8587

86-
Release notes are kept under [rabbitmq-server/release-notes](https://github.com/rabbitmq/rabbitmq-server/tree/v3.11.x/release-notes).
88+
Release notes are kept under [rabbitmq-server/release-notes](https://github.com/rabbitmq/rabbitmq-server/tree/main/release-notes).
8789

8890
### Core Server
8991

@@ -107,48 +109,9 @@ Release notes are kept under [rabbitmq-server/release-notes](https://github.com/
107109
This release includes all bug fixes shipped in the `3.12.x` series.
108110

109111

110-
111-
### CLI Tools
112-
113-
#### Enhancements
114-
115-
116-
#### Bug Fixes
117-
118-
119-
120-
### MQTT Plugin
121-
122-
#### Enhancements
123-
124-
125-
### Management Plugin
126-
127-
#### Enhancements
128-
129-
130-
#### Bug Fixes
131-
132-
133-
### OAuth 2 AuthN/AuthZ Backend Plugin
134-
135-
#### Enhancement
136-
137-
138-
### HTTPS AuthN/AuthZ Backend Plugin
139-
140-
#### Bug Fixes
141-
142-
143-
### Consul Peer Discovery Plugin
144-
145-
#### Bug Fixes
146-
147-
148-
149112
### Dependency Changes
150113

151-
114+
TBD
152115

153116
## Source Code Archives
154117

0 commit comments

Comments
 (0)