Skip to content

Commit 761af0a

Browse files
committed
Extract publishing IDs from batch publishing
In stream plugin, to e.g. send publish errors in case the stream does not exist. Batches were not taken into account.
1 parent e91c918 commit 761af0a

File tree

5 files changed

+50
-31
lines changed

5 files changed

+50
-31
lines changed

deps/rabbitmq_stream/src/rabbit_stream_core.erl

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
%% holds static or rarely changing fields
1313
-record(cfg, {}).
1414
-record(?MODULE,
15-
{cfg :: #cfg{}, frames = [] :: [iodata()],
15+
{cfg :: #cfg{},
16+
frames = [] :: [iodata()],
1617
%% partial data
1718
data ::
1819
undefined |
@@ -145,52 +146,61 @@ all_commands(#?MODULE{commands = Commands0} = State) ->
145146
%% returns frames
146147
-spec incoming_data(binary(), state()) -> state().
147148
%% TODO: check max frame size
148-
incoming_data(<<>>, #?MODULE{frames = Frames, commands = Commands} = State) ->
149+
incoming_data(<<>>,
150+
#?MODULE{frames = Frames, commands = Commands} = State) ->
149151
State#?MODULE{frames = [], commands = parse_frames(Frames, Commands)};
150152
incoming_data(<<Size:32, Frame:Size/binary, Rem/binary>>,
151153
#?MODULE{frames = Frames, data = undefined} = State) ->
152154
incoming_data(Rem,
153155
State#?MODULE{frames = [Frame | Frames], data = undefined});
154156
incoming_data(<<Size:32, Rem/binary>>,
155-
#?MODULE{frames = Frames, data = undefined,
156-
commands = Commands} = State) ->
157+
#?MODULE{frames = Frames,
158+
data = undefined,
159+
commands = Commands} =
160+
State) ->
157161
%% not enough data to complete frame, stash and await more data
158-
State#?MODULE{frames = [], data = {Size - byte_size(Rem), Rem},
162+
State#?MODULE{frames = [],
163+
data = {Size - byte_size(Rem), Rem},
159164
commands = parse_frames(Frames, Commands)};
160165
incoming_data(Data,
161-
#?MODULE{frames = Frames, data = undefined,
162-
commands = Commands} = State)
163-
when byte_size(Data) < 4 ->
166+
#?MODULE{frames = Frames,
167+
data = undefined,
168+
commands = Commands} =
169+
State)
170+
when byte_size(Data) < 4 ->
164171
%% not enough data to even know the size required
165172
%% just stash binary and hit last clause next
166-
State#?MODULE{frames = [], data = Data,
173+
State#?MODULE{frames = [],
174+
data = Data,
167175
commands = parse_frames(Frames, Commands)};
168176
incoming_data(Data,
169-
#?MODULE{frames = Frames, data = {Size, Partial},
170-
commands = Commands} = State) ->
177+
#?MODULE{frames = Frames,
178+
data = {Size, Partial},
179+
commands = Commands} =
180+
State) ->
171181
case Data of
172182
<<Part:Size/binary, Rem/binary>> ->
173183
incoming_data(Rem,
174184
State#?MODULE{frames =
175-
[append_data(Partial, Part)
176-
| Frames],
185+
[append_data(Partial, Part)
186+
| Frames],
177187
data = undefined});
178188
Rem ->
179189
State#?MODULE{frames = [],
180190
data =
181-
{Size - byte_size(Rem),
182-
append_data(Partial, Rem)},
191+
{Size - byte_size(Rem),
192+
append_data(Partial, Rem)},
183193
commands = parse_frames(Frames, Commands)}
184194
end;
185195
incoming_data(Data, #?MODULE{data = Partial} = State)
186-
when is_binary(Partial) ->
196+
when is_binary(Partial) ->
187197
incoming_data(<<Partial/binary, Data/binary>>,
188198
State#?MODULE{data = undefined}).
189199

190200
parse_frames(Frames, Queue) ->
191-
lists:foldr(fun(Frame, Acc) ->
192-
queue:in(parse_command(Frame), Acc)
193-
end, Queue, Frames).
201+
lists:foldr(fun(Frame, Acc) -> queue:in(parse_command(Frame), Acc)
202+
end,
203+
Queue, Frames).
194204

195205
-spec frame(command()) -> iodata().
196206
frame({publish_confirm, PublisherId, PublishingIds}) ->

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -836,18 +836,27 @@ handle_inbound_data(Transport,
836836
CoreState1 = rabbit_stream_core:incoming_data(Data, CoreState0),
837837
{Commands, CoreState} = rabbit_stream_core:all_commands(CoreState1),
838838
lists:foldl(fun(Command, {C, S}) ->
839-
HandleFrameFun(Transport, C, S, Command)
839+
HandleFrameFun(Transport, C, S, Command)
840840
end,
841-
{Connection,
842-
State#stream_connection_state{data = CoreState}},
841+
{Connection, State#stream_connection_state{data = CoreState}},
843842
Commands).
844843

845844
publishing_ids_from_messages(<<>>) ->
846845
[];
847846
publishing_ids_from_messages(<<PublishingId:64,
848-
MessageSize:32,
847+
0:1,
848+
MessageSize:31,
849849
_Message:MessageSize/binary,
850850
Rest/binary>>) ->
851+
[PublishingId | publishing_ids_from_messages(Rest)];
852+
publishing_ids_from_messages(<<PublishingId:64,
853+
1:1,
854+
_CompressionType:3,
855+
_Unused:4,
856+
_MessageCount:16,
857+
BatchSize:32,
858+
_Batch:BatchSize/binary,
859+
Rest/binary>>) ->
851860
[PublishingId | publishing_ids_from_messages(Rest)].
852861

853862
handle_frame_pre_auth(Transport,
@@ -1077,7 +1086,7 @@ handle_frame_pre_auth(Transport,
10771086
#{<<"advertised_host">> => AdvertisedHost,
10781087
<<"advertised_port">> => AdvertisedPort},
10791088

1080-
rabbit_log:info("sending open response ok ~s", [VirtualHost]),
1089+
rabbit_log:info("sending open response ok ~s", [VirtualHost]),
10811090
Frame =
10821091
rabbit_stream_core:frame({response, CorrelationId,
10831092
{open, ?RESPONSE_CODE_OK,

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
-include("rabbit_stream.hrl").
1919

2020
-define(WAIT, 5000).
21-
2221
-define(COMMAND_LIST_CONNECTIONS,
2322
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
2423
-define(COMMAND_LIST_CONSUMERS,

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,8 @@ test_authenticate(Transport, S, C0) ->
265265
OpenFrame =
266266
rabbit_stream_core:frame({request, 3, {open, VirtualHost}}),
267267
ok = Transport:send(S, OpenFrame),
268-
{{response, 3, {open, ?RESPONSE_CODE_OK, _ConnectionProperties}}, C4} =
268+
{{response, 3, {open, ?RESPONSE_CODE_OK, _ConnectionProperties}},
269+
C4} =
269270
receive_commands(Transport, S, C3),
270271
C4.
271272

@@ -322,7 +323,6 @@ test_subscribe(Transport, S, SubscriptionId, Stream, C0) ->
322323
C.
323324

324325
test_deliver(Transport, S, SubscriptionId, COffset, Body, C0) ->
325-
326326
ct:pal("test_deliver ", []),
327327
{{deliver, SubscriptionId, Chunk}, C} =
328328
receive_commands(Transport, S, C0),
@@ -372,9 +372,9 @@ receive_commands(Transport, S, C0) ->
372372
C1 = rabbit_stream_core:incoming_data(Data, C0),
373373
case rabbit_stream_core:next_command(C1) of
374374
empty ->
375-
{ok, Data2} = Transport:recv(S, 0, 5000),
375+
{ok, Data2} = Transport:recv(S, 0, 5000),
376376
rabbit_stream_core:next_command(
377-
rabbit_stream_core:incoming_data(Data2, C1));
377+
rabbit_stream_core:incoming_data(Data2, C1));
378378
Res ->
379379
Res
380380
end;

deps/rabbitmq_stream/test/rabbit_stream_core_SUITE.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ roundtrip_metadata_no_leader(_Config) ->
163163
test_roundtrip(Cmd) ->
164164
Init = rabbit_stream_core:init(undefined),
165165
Frame = iolist_to_binary(rabbit_stream_core:frame(Cmd)),
166-
{[Cmd], _} = rabbit_stream_core:all_commands(
167-
rabbit_stream_core:incoming_data(Frame, Init)),
166+
{[Cmd], _} =
167+
rabbit_stream_core:all_commands(
168+
rabbit_stream_core:incoming_data(Frame, Init)),
168169
ok.

0 commit comments

Comments
 (0)