Skip to content

Commit 8f207e3

Browse files
committed
Make stream protocol route command return several streams
We expect to have 1 stream for each routing key, but as binding can return several queues for a given key we let that possibility open in the stream protocol.
1 parent 3d7afcd commit 8f207e3

File tree

5 files changed

+62
-58
lines changed

5 files changed

+62
-58
lines changed

deps/rabbitmq_stream/docs/PROTOCOL.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ RouteQuery => Key Version CorrelationId RoutingKey SuperStream
587587
RoutingKey => string
588588
SuperStream => string
589589

590-
RouteResponse => Key Version CorrelationId Stream
590+
RouteResponse => Key Version CorrelationId [Stream]
591591
Key => uint16 // 24
592592
Version => uint16
593593
CorrelationId => uint32

deps/rabbitmq_stream/src/rabbit_stream.erl

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,8 @@ kill_connection(ConnectionName) ->
118118
{ConnectionPid,
119119
#{<<"connection_name">> := ConnectionNameBin}} ->
120120
exit(ConnectionPid, kill);
121-
{ConnectionPid, _ClientProperties} ->
122-
ok
123-
after 1000 ->
124-
ok
121+
{ConnectionPid, _ClientProperties} -> ok
122+
after 1000 -> ok
125123
end
126124
end,
127125
pg_local:get_members(rabbit_stream_connections)).

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ topology(VirtualHost, Stream) ->
7575
gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
7676

7777
-spec route(binary(), binary(), binary()) ->
78-
{ok, binary() | no_route} | {error, stream_not_found}.
78+
{ok, [binary()] | no_route} | {error, stream_not_found}.
7979
route(RoutingKey, VirtualHost, SuperStream) ->
8080
gen_server:call(?MODULE,
8181
{route, RoutingKey, VirtualHost, SuperStream}).
@@ -368,10 +368,9 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
368368
case rabbit_exchange:route(Exchange, Delivery) of
369369
[] ->
370370
{ok, no_route};
371-
[#resource{name = Stream}] ->
372-
{ok, Stream};
373-
[#resource{name = Stream} | _] ->
374-
{ok, Stream}
371+
Routes ->
372+
%% FIXME filter non-stream resources
373+
{ok, [Stream || #resource{name = Stream} <- Routes]}
375374
end
376375
catch
377376
exit:Error ->

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 52 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,8 @@ tuned(info, Msg, StateData) ->
445445
end).
446446

447447
state_timeout(State, Transport, Socket) ->
448-
rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
448+
rabbit_log_connection:warning("Closing connection because of timeout in state "
449+
"'~s' likely due to lack of client action.",
449450
[State]),
450451
close_immediately(Transport, Socket),
451452
stop.
@@ -524,7 +525,8 @@ transition_to_opened(Transport,
524525
config = Configuration}}.
525526

526527
invalid_transition(Transport, Socket, From, To) ->
527-
rabbit_log_connection:warning("Closing socket ~w. Invalid transition from ~s to ~s.",
528+
rabbit_log_connection:warning("Closing socket ~w. Invalid transition from ~s "
529+
"to ~s.",
528530
[Socket, From, To]),
529531
close_immediately(Transport, Socket),
530532
stop.
@@ -882,8 +884,7 @@ open(cast,
882884
Ids ->
883885
Acc#{PublisherId => [PublishingId | Ids]}
884886
end;
885-
false ->
886-
Acc
887+
false -> Acc
887888
end
888889
end,
889890
#{}, CorrelationList),
@@ -963,7 +964,8 @@ open(cast,
963964
{queue_event, #resource{name = StreamName},
964965
{osiris_offset, _QueueResource, -1}},
965966
_StatemData) ->
966-
rabbit_log:debug("Stream protocol connection received osiris offset event for ~p with offset ~p",
967+
rabbit_log:debug("Stream protocol connection received osiris offset "
968+
"event for ~p with offset ~p",
967969
[StreamName, -1]),
968970
keep_state_and_data;
969971
open(cast,
@@ -982,11 +984,14 @@ open(cast,
982984
{Connection1, State1} =
983985
case maps:get(StreamName, StreamSubscriptions, undefined) of
984986
undefined ->
985-
rabbit_log:debug("Stream protocol connection: osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)",
987+
rabbit_log:debug("Stream protocol connection: osiris offset event "
988+
"for ~p, but no subscription (leftover messages "
989+
"after unsubscribe?)",
986990
[StreamName]),
987991
{Connection, State};
988992
[] ->
989-
rabbit_log:debug("Stream protocol connection: osiris offset event for ~p, but no registered consumers!",
993+
rabbit_log:debug("Stream protocol connection: osiris offset event "
994+
"for ~p, but no registered consumers!",
990995
[StreamName]),
991996
{Connection#stream_connection{stream_subscriptions =
992997
maps:remove(StreamName,
@@ -999,15 +1004,15 @@ open(cast,
9991004
#consumer{credit = Credit} = Consumer,
10001005
Consumer1 =
10011006
case Credit of
1002-
0 ->
1003-
Consumer;
1007+
0 -> Consumer;
10041008
_ ->
10051009
case send_chunks(Transport,
10061010
Consumer,
10071011
SendFileOct)
10081012
of
10091013
{error, closed} ->
1010-
rabbit_log_connection:info("Stream protocol connection has been closed by peer",
1014+
rabbit_log_connection:info("Stream protocol connection has been closed by "
1015+
"peer",
10111016
[]),
10121017
throw({stop, normal});
10131018
{error, Reason} ->
@@ -1058,7 +1063,8 @@ close_sent(state_timeout, close,
10581063
#statem_data{transport = Transport,
10591064
connection = #stream_connection{socket = Socket},
10601065
connection_state = State}) ->
1061-
rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
1066+
rabbit_log_connection:warning("Closing connection because of timeout in state "
1067+
"'~s' likely due to lack of client action.",
10621068
[?FUNCTION_NAME]),
10631069
close(Transport, Socket, State),
10641070
stop;
@@ -1089,13 +1095,15 @@ close_sent(info, {tcp_closed, S}, _StatemData) ->
10891095
stop;
10901096
close_sent(info, {tcp_error, S, Reason},
10911097
#statem_data{transport = Transport, connection_state = State}) ->
1092-
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]",
1098+
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] "
1099+
"[~w]",
10931100
[Reason, S, self()]),
10941101
close(Transport, S, State),
10951102
stop;
10961103
close_sent(info, {resource_alarm, IsThereAlarm},
10971104
StatemData = #statem_data{connection = Connection}) ->
1098-
rabbit_log:warning("Stream protocol connection ignored a resource alarm ~p in state ~s",
1105+
rabbit_log:warning("Stream protocol connection ignored a resource "
1106+
"alarm ~p in state ~s",
10991107
[IsThereAlarm, ?FUNCTION_NAME]),
11001108
{keep_state,
11011109
StatemData#statem_data{connection =
@@ -1828,7 +1836,8 @@ handle_frame_post_auth(Transport,
18281836
SendFileOct)
18291837
of
18301838
{error, closed} ->
1831-
rabbit_log_connection:info("Stream protocol connection has been closed by peer",
1839+
rabbit_log_connection:info("Stream protocol connection has been closed by "
1840+
"peer",
18321841
[]),
18331842
throw({stop, normal});
18341843
{{segment, Segment1}, {credit, Credit1}} ->
@@ -1909,7 +1918,8 @@ handle_frame_post_auth(Transport,
19091918
SendFileOct)
19101919
of
19111920
{error, closed} ->
1912-
rabbit_log_connection:info("Stream protocol connection has been closed by peer",
1921+
rabbit_log_connection:info("Stream protocol connection has been closed by "
1922+
"peer",
19131923
[]),
19141924
throw({stop, normal});
19151925
{{segment, Segment1}, {credit, Credit1}} ->
@@ -2061,7 +2071,8 @@ handle_frame_post_auth(Transport,
20612071
{ok,
20622072
#{leader_node := LeaderPid,
20632073
replica_nodes := ReturnedReplicas}} ->
2064-
rabbit_log:debug("Created stream cluster with leader on ~p and replicas on ~p",
2074+
rabbit_log:debug("Created stream cluster with leader on ~p and "
2075+
"replicas on ~p",
20652076
[LeaderPid, ReturnedReplicas]),
20662077
response_ok(Transport,
20672078
Connection,
@@ -2222,8 +2233,7 @@ handle_frame_post_auth(Transport,
22222233
NodesAcc)
22232234
end,
22242235
Acc1, ReplicaNodes);
2225-
{error, _} ->
2226-
Acc
2236+
{error, _} -> Acc
22272237
end
22282238
end,
22292239
#{}, Streams),
@@ -2235,16 +2245,13 @@ handle_frame_post_auth(Transport,
22352245
lists:foldr(fun(Node, Acc) ->
22362246
PortFunction =
22372247
case TransportLayer of
2238-
tcp ->
2239-
port;
2240-
ssl ->
2241-
tls_port
2248+
tcp -> port;
2249+
ssl -> tls_port
22422250
end,
22432251
Host = rpc:call(Node, rabbit_stream, host, []),
22442252
Port = rpc:call(Node, rabbit_stream, PortFunction, []),
22452253
case {is_binary(Host), is_integer(Port)} of
2246-
{true, true} ->
2247-
Acc#{Node => {Host, Port}};
2254+
{true, true} -> Acc#{Node => {Host, Port}};
22482255
_ ->
22492256
rabbit_log:warning("Error when retrieving broker metadata: ~p ~p",
22502257
[Host, Port]),
@@ -2256,25 +2263,21 @@ handle_frame_post_auth(Transport,
22562263
Metadata =
22572264
lists:foldl(fun(Stream, Acc) ->
22582265
case maps:get(Stream, Topology) of
2259-
{error, Err} ->
2260-
Acc#{Stream => Err};
2266+
{error, Err} -> Acc#{Stream => Err};
22612267
{ok,
22622268
#{leader_node := LeaderNode,
22632269
replica_nodes := Replicas}} ->
22642270
LeaderInfo =
22652271
case NodeEndpoints of
2266-
#{LeaderNode := Info} ->
2267-
Info;
2268-
_ ->
2269-
undefined
2272+
#{LeaderNode := Info} -> Info;
2273+
_ -> undefined
22702274
end,
22712275
ReplicaInfos =
22722276
lists:foldr(fun(Replica, A) ->
22732277
case NodeEndpoints of
22742278
#{Replica := I} ->
22752279
[I | A];
2276-
_ ->
2277-
A
2280+
_ -> A
22782281
end
22792282
end,
22802283
[], Replicas),
@@ -2301,16 +2304,21 @@ handle_frame_post_auth(Transport,
23012304
case rabbit_stream_manager:route(RoutingKey, VirtualHost, SuperStream)
23022305
of
23032306
{ok, no_route} ->
2304-
{?RESPONSE_CODE_OK, <<(-1):16>>};
2305-
{ok, Stream} ->
2306-
StreamSize = byte_size(Stream),
2307-
{?RESPONSE_CODE_OK,
2308-
<<StreamSize:16, Stream:StreamSize/binary>>};
2307+
{?RESPONSE_CODE_OK, <<0:32>>};
2308+
{ok, Streams} ->
2309+
StreamCount = length(Streams),
2310+
Bin = lists:foldl(fun(Stream, Acc) ->
2311+
StreamSize = byte_size(Stream),
2312+
<<Acc/binary, StreamSize:16,
2313+
Stream:StreamSize/binary>>
2314+
end,
2315+
<<StreamCount:32>>, Streams),
2316+
{?RESPONSE_CODE_OK, Bin};
23092317
{error, _} ->
23102318
rabbit_global_counters:increase_protocol_counter(stream,
23112319
?STREAM_DOES_NOT_EXIST,
23122320
1),
2313-
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<(-1):16>>}
2321+
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<0:32>>}
23142322
end,
23152323

23162324
Frame =
@@ -2362,7 +2370,8 @@ handle_frame_post_auth(Transport,
23622370
State,
23632371
{request, CorrelationId,
23642372
{close, ClosingCode, ClosingReason}}) ->
2365-
rabbit_log:debug("Stream protocol reader received close command ~p ~p",
2373+
rabbit_log:debug("Stream protocol reader received close command "
2374+
"~p ~p",
23662375
[ClosingCode, ClosingReason]),
23672376
Frame =
23682377
rabbit_stream_core:frame({response, CorrelationId,
@@ -2485,8 +2494,7 @@ clean_state_after_stream_deletion_or_failure(Stream,
24852494
PubId),
24862495
{maps:remove(PubId, Pubs),
24872496
maps:remove({Stream, Ref}, PubToIds)};
2488-
_ ->
2489-
{Pubs, PubToIds}
2497+
_ -> {Pubs, PubToIds}
24902498
end
24912499
end,
24922500
{Publishers, PublisherToIds}, Publishers),
@@ -2603,8 +2611,7 @@ demonitor_stream(Stream,
26032611
Stream ->
26042612
demonitor(MonitorRef, [flush]),
26052613
Acc;
2606-
_ ->
2607-
maps:put(MonitorRef, Strm, Acc)
2614+
_ -> maps:put(MonitorRef, Strm, Acc)
26082615
end
26092616
end,
26102617
#{}, Monitors0),
@@ -2625,10 +2632,8 @@ stream_has_publishers(Stream,
26252632
#stream_connection{publishers = Publishers}) ->
26262633
lists:any(fun(#publisher{stream = S}) ->
26272634
case S of
2628-
Stream ->
2629-
true;
2630-
_ ->
2631-
false
2635+
Stream -> true;
2636+
_ -> false
26322637
end
26332638
end,
26342639
maps:values(Publishers)).

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ groups() ->
4040
init_per_suite(Config) ->
4141
case rabbit_ct_helpers:is_mixed_versions() of
4242
true ->
43-
{skip, "mixed version clusters are not supported for this suite"};
43+
{skip,
44+
"mixed version clusters are not supported for "
45+
"this suite"};
4446
_ ->
4547
Config1 =
4648
rabbit_ct_helpers:set_config(Config,

0 commit comments

Comments
 (0)