Skip to content

Commit f79bc1c

Browse files
Merge branch 'master' into stream-reader-close-in-terminate
Conflicts: deps/rabbitmq_stream/src/rabbit_stream_reader.erl
2 parents 9e45060 + 8f207e3 commit f79bc1c

File tree

6 files changed

+68
-75
lines changed

6 files changed

+68
-75
lines changed

BAZEL.md

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,33 +25,17 @@ Otherwise:
2525

2626
https://docs.bazel.build/versions/master/install-bazelisk.html
2727

28-
### Create `.bazelrc`
28+
### Create `user.bazelrc`
2929

30-
Create a `.bazelrc` file with at least:
31-
32-
```
33-
build --@bazel-erlang//:erlang_home=/path/to/erlang/installation
34-
build --@bazel-erlang//:erlang_version=23.1
35-
build --@bazel-erlang//:elixir_home=/path/to/elixir/installation
36-
build --test_strategy=exclusive
37-
build --incompatible_strict_action_env
38-
```
39-
40-
Additionally, on **macOS**, you will likely need to add
41-
42-
```
43-
build --spawn_strategy=local
44-
```
45-
46-
for certain `rabbitmq_cli` tests to pass. This is because `rabbitmqctl wait` shells out to 'ps', which is broken in the bazel macOS (https://github.com/bazelbuild/bazel/issues/7448).
30+
Create a `user.bazelrc` by making a copy of `user-template.bazelrc` and updating the paths in the first few lines.
4731

4832
### Run the broker
4933

5034
`bazel run broker`
5135

5236
### Running tests
5337

54-
Many rabbit tests spawn single or clustered rabbit nodes, and therefore it's best to run test suites sequentially on a single machine. Hence the `--test_strategy=exclusive` flag used in `.bazelrc` above. Naturally that restriction does not hold if utilizing remote execution (as is the case for RabbitMQ's CI pipelines).
38+
Many rabbit tests spawn single or clustered rabbit nodes, and therefore it's best to run test suites sequentially on a single machine. Hence the `build --local_test_jobs=1` flag used in `.bazelrc`. Naturally that restriction does not hold if utilizing remote execution (as is the case for RabbitMQ's CI pipelines).
5539

5640
Erlang Common Test logs will not be placed in the logs directory when run with bazel. They can be found under `bazel-testlogs`. For instance, those of the rabbit application's backing_queue suite will be under `bazel-testlogs/deps/rabbit/backing_queue_SUITE/test.outputs/`.
5741

deps/rabbitmq_stream/docs/PROTOCOL.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ RouteQuery => Key Version CorrelationId RoutingKey SuperStream
587587
RoutingKey => string
588588
SuperStream => string
589589

590-
RouteResponse => Key Version CorrelationId Stream
590+
RouteResponse => Key Version CorrelationId [Stream]
591591
Key => uint16 // 24
592592
Version => uint16
593593
CorrelationId => uint32

deps/rabbitmq_stream/src/rabbit_stream.erl

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,8 @@ kill_connection(ConnectionName) ->
118118
{ConnectionPid,
119119
#{<<"connection_name">> := ConnectionNameBin}} ->
120120
exit(ConnectionPid, kill);
121-
{ConnectionPid, _ClientProperties} ->
122-
ok
123-
after 1000 ->
124-
ok
121+
{ConnectionPid, _ClientProperties} -> ok
122+
after 1000 -> ok
125123
end
126124
end,
127125
pg_local:get_members(rabbit_stream_connections)).

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ topology(VirtualHost, Stream) ->
7575
gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
7676

7777
-spec route(binary(), binary(), binary()) ->
78-
{ok, binary() | no_route} | {error, stream_not_found}.
78+
{ok, [binary()] | no_route} | {error, stream_not_found}.
7979
route(RoutingKey, VirtualHost, SuperStream) ->
8080
gen_server:call(?MODULE,
8181
{route, RoutingKey, VirtualHost, SuperStream}).
@@ -368,10 +368,9 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
368368
case rabbit_exchange:route(Exchange, Delivery) of
369369
[] ->
370370
{ok, no_route};
371-
[#resource{name = Stream}] ->
372-
{ok, Stream};
373-
[#resource{name = Stream} | _] ->
374-
{ok, Stream}
371+
Routes ->
372+
%% FIXME filter non-stream resources
373+
{ok, [Stream || #resource{name = Stream} <- Routes]}
375374
end
376375
catch
377376
exit:Error ->

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 55 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,8 @@ tuned(info, Msg, StateData) ->
447447
end).
448448

449449
state_timeout(State, Transport, Socket) ->
450-
rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
450+
rabbit_log_connection:warning("Closing connection because of timeout in state "
451+
"'~s' likely due to lack of client action.",
451452
[State]),
452453
close_immediately(Transport, Socket),
453454
stop.
@@ -526,7 +527,8 @@ transition_to_opened(Transport,
526527
config = Configuration}}.
527528

528529
invalid_transition(Transport, Socket, From, To) ->
529-
rabbit_log_connection:warning("Closing socket ~w. Invalid transition from ~s to ~s.",
530+
rabbit_log_connection:warning("Closing socket ~w. Invalid transition from ~s "
531+
"to ~s.",
530532
[Socket, From, To]),
531533
close_immediately(Transport, Socket),
532534
stop.
@@ -875,8 +877,7 @@ open(cast,
875877
Ids ->
876878
Acc#{PublisherId => [PublishingId | Ids]}
877879
end;
878-
false ->
879-
Acc
880+
false -> Acc
880881
end
881882
end,
882883
#{}, CorrelationList),
@@ -956,7 +957,8 @@ open(cast,
956957
{queue_event, #resource{name = StreamName},
957958
{osiris_offset, _QueueResource, -1}},
958959
_StatemData) ->
959-
rabbit_log:debug("Stream protocol connection received osiris offset event for ~p with offset ~p",
960+
rabbit_log:debug("Stream protocol connection received osiris offset "
961+
"event for ~p with offset ~p",
960962
[StreamName, -1]),
961963
keep_state_and_data;
962964
open(cast,
@@ -975,11 +977,14 @@ open(cast,
975977
{Connection1, State1} =
976978
case maps:get(StreamName, StreamSubscriptions, undefined) of
977979
undefined ->
978-
rabbit_log:debug("Stream protocol connection: osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)",
980+
rabbit_log:debug("Stream protocol connection: osiris offset event "
981+
"for ~p, but no subscription (leftover messages "
982+
"after unsubscribe?)",
979983
[StreamName]),
980984
{Connection, State};
981985
[] ->
982-
rabbit_log:debug("Stream protocol connection: osiris offset event for ~p, but no registered consumers!",
986+
rabbit_log:debug("Stream protocol connection: osiris offset event "
987+
"for ~p, but no registered consumers!",
983988
[StreamName]),
984989
{Connection#stream_connection{stream_subscriptions =
985990
maps:remove(StreamName,
@@ -992,15 +997,15 @@ open(cast,
992997
#consumer{credit = Credit} = Consumer,
993998
Consumer1 =
994999
case Credit of
995-
0 ->
996-
Consumer;
1000+
0 -> Consumer;
9971001
_ ->
9981002
case send_chunks(Transport,
9991003
Consumer,
10001004
SendFileOct)
10011005
of
10021006
{error, closed} ->
1003-
rabbit_log_connection:info("Stream protocol connection has been closed by peer",
1007+
rabbit_log_connection:info("Stream protocol connection has been closed by "
1008+
"peer",
10041009
[]),
10051010
throw({stop, normal});
10061011
{error, Reason} ->
@@ -1075,13 +1080,21 @@ close_sent(info, {tcp_closed, S}, _StatemData) ->
10751080
rabbit_log_connection:debug("Stream protocol connection socket ~w closed [~w]",
10761081
[S, self()]),
10771082
stop;
1083+
<<<<<<< HEAD
10781084
close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
10791085
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]",
1086+
=======
1087+
close_sent(info, {tcp_error, S, Reason},
1088+
#statem_data{transport = Transport, connection_state = State}) ->
1089+
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] "
1090+
"[~w]",
1091+
>>>>>>> master
10801092
[Reason, S, self()]),
10811093
stop;
10821094
close_sent(info, {resource_alarm, IsThereAlarm},
10831095
StatemData = #statem_data{connection = Connection}) ->
1084-
rabbit_log:warning("Stream protocol connection ignored a resource alarm ~p in state ~s",
1096+
rabbit_log:warning("Stream protocol connection ignored a resource "
1097+
"alarm ~p in state ~s",
10851098
[IsThereAlarm, ?FUNCTION_NAME]),
10861099
{keep_state,
10871100
StatemData#statem_data{connection =
@@ -1814,7 +1827,8 @@ handle_frame_post_auth(Transport,
18141827
SendFileOct)
18151828
of
18161829
{error, closed} ->
1817-
rabbit_log_connection:info("Stream protocol connection has been closed by peer",
1830+
rabbit_log_connection:info("Stream protocol connection has been closed by "
1831+
"peer",
18181832
[]),
18191833
throw({stop, normal});
18201834
{{segment, Segment1}, {credit, Credit1}} ->
@@ -1895,7 +1909,8 @@ handle_frame_post_auth(Transport,
18951909
SendFileOct)
18961910
of
18971911
{error, closed} ->
1898-
rabbit_log_connection:info("Stream protocol connection has been closed by peer",
1912+
rabbit_log_connection:info("Stream protocol connection has been closed by "
1913+
"peer",
18991914
[]),
19001915
throw({stop, normal});
19011916
{{segment, Segment1}, {credit, Credit1}} ->
@@ -2047,7 +2062,8 @@ handle_frame_post_auth(Transport,
20472062
{ok,
20482063
#{leader_node := LeaderPid,
20492064
replica_nodes := ReturnedReplicas}} ->
2050-
rabbit_log:debug("Created stream cluster with leader on ~p and replicas on ~p",
2065+
rabbit_log:debug("Created stream cluster with leader on ~p and "
2066+
"replicas on ~p",
20512067
[LeaderPid, ReturnedReplicas]),
20522068
response_ok(Transport,
20532069
Connection,
@@ -2208,8 +2224,7 @@ handle_frame_post_auth(Transport,
22082224
NodesAcc)
22092225
end,
22102226
Acc1, ReplicaNodes);
2211-
{error, _} ->
2212-
Acc
2227+
{error, _} -> Acc
22132228
end
22142229
end,
22152230
#{}, Streams),
@@ -2221,16 +2236,13 @@ handle_frame_post_auth(Transport,
22212236
lists:foldr(fun(Node, Acc) ->
22222237
PortFunction =
22232238
case TransportLayer of
2224-
tcp ->
2225-
port;
2226-
ssl ->
2227-
tls_port
2239+
tcp -> port;
2240+
ssl -> tls_port
22282241
end,
22292242
Host = rpc:call(Node, rabbit_stream, host, []),
22302243
Port = rpc:call(Node, rabbit_stream, PortFunction, []),
22312244
case {is_binary(Host), is_integer(Port)} of
2232-
{true, true} ->
2233-
Acc#{Node => {Host, Port}};
2245+
{true, true} -> Acc#{Node => {Host, Port}};
22342246
_ ->
22352247
rabbit_log:warning("Error when retrieving broker metadata: ~p ~p",
22362248
[Host, Port]),
@@ -2242,25 +2254,21 @@ handle_frame_post_auth(Transport,
22422254
Metadata =
22432255
lists:foldl(fun(Stream, Acc) ->
22442256
case maps:get(Stream, Topology) of
2245-
{error, Err} ->
2246-
Acc#{Stream => Err};
2257+
{error, Err} -> Acc#{Stream => Err};
22472258
{ok,
22482259
#{leader_node := LeaderNode,
22492260
replica_nodes := Replicas}} ->
22502261
LeaderInfo =
22512262
case NodeEndpoints of
2252-
#{LeaderNode := Info} ->
2253-
Info;
2254-
_ ->
2255-
undefined
2263+
#{LeaderNode := Info} -> Info;
2264+
_ -> undefined
22562265
end,
22572266
ReplicaInfos =
22582267
lists:foldr(fun(Replica, A) ->
22592268
case NodeEndpoints of
22602269
#{Replica := I} ->
22612270
[I | A];
2262-
_ ->
2263-
A
2271+
_ -> A
22642272
end
22652273
end,
22662274
[], Replicas),
@@ -2287,16 +2295,21 @@ handle_frame_post_auth(Transport,
22872295
case rabbit_stream_manager:route(RoutingKey, VirtualHost, SuperStream)
22882296
of
22892297
{ok, no_route} ->
2290-
{?RESPONSE_CODE_OK, <<(-1):16>>};
2291-
{ok, Stream} ->
2292-
StreamSize = byte_size(Stream),
2293-
{?RESPONSE_CODE_OK,
2294-
<<StreamSize:16, Stream:StreamSize/binary>>};
2298+
{?RESPONSE_CODE_OK, <<0:32>>};
2299+
{ok, Streams} ->
2300+
StreamCount = length(Streams),
2301+
Bin = lists:foldl(fun(Stream, Acc) ->
2302+
StreamSize = byte_size(Stream),
2303+
<<Acc/binary, StreamSize:16,
2304+
Stream:StreamSize/binary>>
2305+
end,
2306+
<<StreamCount:32>>, Streams),
2307+
{?RESPONSE_CODE_OK, Bin};
22952308
{error, _} ->
22962309
rabbit_global_counters:increase_protocol_counter(stream,
22972310
?STREAM_DOES_NOT_EXIST,
22982311
1),
2299-
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<(-1):16>>}
2312+
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<0:32>>}
23002313
end,
23012314

23022315
Frame =
@@ -2348,7 +2361,8 @@ handle_frame_post_auth(Transport,
23482361
State,
23492362
{request, CorrelationId,
23502363
{close, ClosingCode, ClosingReason}}) ->
2351-
rabbit_log:debug("Stream protocol reader received close command ~p ~p",
2364+
rabbit_log:debug("Stream protocol reader received close command "
2365+
"~p ~p",
23522366
[ClosingCode, ClosingReason]),
23532367
Frame =
23542368
rabbit_stream_core:frame({response, CorrelationId,
@@ -2471,8 +2485,7 @@ clean_state_after_stream_deletion_or_failure(Stream,
24712485
PubId),
24722486
{maps:remove(PubId, Pubs),
24732487
maps:remove({Stream, Ref}, PubToIds)};
2474-
_ ->
2475-
{Pubs, PubToIds}
2488+
_ -> {Pubs, PubToIds}
24762489
end
24772490
end,
24782491
{Publishers, PublisherToIds}, Publishers),
@@ -2589,8 +2602,7 @@ demonitor_stream(Stream,
25892602
Stream ->
25902603
demonitor(MonitorRef, [flush]),
25912604
Acc;
2592-
_ ->
2593-
maps:put(MonitorRef, Strm, Acc)
2605+
_ -> maps:put(MonitorRef, Strm, Acc)
25942606
end
25952607
end,
25962608
#{}, Monitors0),
@@ -2611,10 +2623,8 @@ stream_has_publishers(Stream,
26112623
#stream_connection{publishers = Publishers}) ->
26122624
lists:any(fun(#publisher{stream = S}) ->
26132625
case S of
2614-
Stream ->
2615-
true;
2616-
_ ->
2617-
false
2626+
Stream -> true;
2627+
_ -> false
26182628
end
26192629
end,
26202630
maps:values(Publishers)).

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ groups() ->
4040
init_per_suite(Config) ->
4141
case rabbit_ct_helpers:is_mixed_versions() of
4242
true ->
43-
{skip, "mixed version clusters are not supported for this suite"};
43+
{skip,
44+
"mixed version clusters are not supported for "
45+
"this suite"};
4446
_ ->
4547
Config1 =
4648
rabbit_ct_helpers:set_config(Config,

0 commit comments

Comments
 (0)