Skip to content

Commit 7a3f038

Browse files
committed
Fix conflicts in stream publishing
To include the internal ID for publishers.
1 parent 35ca976 commit 7a3f038

File tree

2 files changed

+29
-96
lines changed

2 files changed

+29
-96
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,9 @@
9999
outstanding_requests :: #{integer() => #request{}},
100100
deliver_version :: rabbit_stream_core:command_version(),
101101
request_timeout :: pos_integer(),
102-
<<<<<<< HEAD
103-
outstanding_requests_timer :: undefined | erlang:reference()}).
104-
=======
105102
outstanding_requests_timer :: undefined | erlang:reference(),
106-
filtering_supported :: boolean(),
107103
%% internal sequence used for publishers
108104
internal_sequence = 0 :: integer()}).
109-
>>>>>>> 42bdcfb8e4 (Add incremental ID for non-dedup stream publishers)
110105
-record(configuration,
111106
{initial_credits :: integer(),
112107
credits_required_for_unblocking :: integer(),

deps/rabbitmq_stream/src/rabbit_stream_utils.erl

Lines changed: 29 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,7 @@
1818

1919
%% API
2020
-export([enforce_correct_name/1,
21-
<<<<<<< HEAD
22-
write_messages/4,
23-
=======
24-
write_messages/6,
25-
>>>>>>> 42bdcfb8e4 (Add incremental ID for non-dedup stream publishers)
21+
write_messages/5,
2622
parse_map/2,
2723
auth_mechanisms/1,
2824
auth_mechanism_to_module/2,
@@ -58,30 +54,26 @@ check_name(<<"">>) ->
5854
check_name(_Name) ->
5955
ok.
6056

61-
<<<<<<< HEAD
62-
write_messages(_ClusterLeader, undefined, _PublisherId, <<>>) ->
63-
=======
64-
write_messages(_Version, _ClusterLeader, _PublisherRef, _PublisherId, _InternalId, <<>>) ->
65-
>>>>>>> 42bdcfb8e4 (Add incremental ID for non-dedup stream publishers)
57+
write_messages(_ClusterLeader, undefined, _PublisherId, _InternalId, <<>>) ->
6658
ok;
6759
write_messages(ClusterLeader,
6860
undefined,
6961
PublisherId,
62+
InternalId,
7063
<<PublishingId:64,
7164
0:1,
7265
MessageSize:31,
7366
Message:MessageSize/binary,
7467
Rest/binary>>) ->
75-
% FIXME handle write error
76-
ok =
77-
osiris:write(ClusterLeader,
78-
undefined,
79-
{PublisherId, PublishingId},
80-
Message),
81-
write_messages(ClusterLeader, undefined, PublisherId, Rest);
68+
ok = osiris:write(ClusterLeader,
69+
undefined,
70+
{PublisherId, InternalId, PublishingId},
71+
Message),
72+
write_messages(ClusterLeader, undefined, PublisherId, InternalId, Rest);
8273
write_messages(ClusterLeader,
8374
undefined,
8475
PublisherId,
76+
InternalId,
8577
<<PublishingId:64,
8678
1:1,
8779
CompressionType:3,
@@ -91,18 +83,16 @@ write_messages(ClusterLeader,
9183
BatchSize:32,
9284
Batch:BatchSize/binary,
9385
Rest/binary>>) ->
94-
% FIXME handle write error
95-
ok =
96-
osiris:write(ClusterLeader,
97-
undefined,
98-
{PublisherId, PublishingId},
99-
{batch,
100-
MessageCount,
101-
CompressionType,
102-
UncompressedSize,
103-
Batch}),
104-
write_messages(ClusterLeader, undefined, PublisherId, Rest);
105-
write_messages(_ClusterLeader, _PublisherRef, _PublisherId, <<>>) ->
86+
ok = osiris:write(ClusterLeader,
87+
undefined,
88+
{PublisherId, InternalId, PublishingId},
89+
{batch,
90+
MessageCount,
91+
CompressionType,
92+
UncompressedSize,
93+
Batch}),
94+
write_messages(ClusterLeader, undefined, PublisherId, InternalId, Rest);
95+
write_messages(_ClusterLeader, _PublisherRef, _PublisherId, _InternalId, <<>>) ->
10696
ok;
10797
write_messages(ClusterLeader,
10898
PublisherRef,
@@ -113,16 +103,9 @@ write_messages(ClusterLeader,
113103
MessageSize:31,
114104
Message:MessageSize/binary,
115105
Rest/binary>>) ->
116-
<<<<<<< HEAD
117-
% FIXME handle write error
118106
ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message),
119-
write_messages(ClusterLeader, PublisherRef, PublisherId, Rest);
107+
write_messages(ClusterLeader, PublisherRef, PublisherId, InternalId, Rest);
120108
write_messages(ClusterLeader,
121-
=======
122-
write_messages0(V, ClusterLeader, PublisherRef, PublisherId, InternalId,
123-
PublishingId, Message, Rest);
124-
write_messages(?VERSION_1 = V, ClusterLeader,
125-
>>>>>>> 42bdcfb8e4 (Add incremental ID for non-dedup stream publishers)
126109
PublisherRef,
127110
PublisherId,
128111
InternalId,
@@ -135,60 +118,15 @@ write_messages(?VERSION_1 = V, ClusterLeader,
135118
BatchSize:32,
136119
Batch:BatchSize/binary,
137120
Rest/binary>>) ->
138-
<<<<<<< HEAD
139-
% FIXME handle write error
140-
ok =
141-
osiris:write(ClusterLeader,
142-
PublisherRef,
143-
PublishingId,
144-
{batch,
145-
MessageCount,
146-
CompressionType,
147-
UncompressedSize,
148-
Batch}),
149-
write_messages(ClusterLeader, PublisherRef, PublisherId, Rest).
150-
=======
151-
Data = {batch, MessageCount, CompressionType, UncompressedSize, Batch},
152-
write_messages0(V, ClusterLeader, PublisherRef, PublisherId, InternalId,
153-
PublishingId, Data, Rest);
154-
write_messages(?VERSION_2 = V, ClusterLeader,
155-
PublisherRef,
156-
PublisherId,
157-
InternalId,
158-
<<PublishingId:64,
159-
-1:16/signed,
160-
0:1,
161-
MessageSize:31,
162-
Message:MessageSize/binary,
163-
Rest/binary>>) ->
164-
write_messages0(V, ClusterLeader, PublisherRef, PublisherId, InternalId,
165-
PublishingId, Message, Rest);
166-
write_messages(?VERSION_2 = V, ClusterLeader,
167-
PublisherRef,
168-
PublisherId,
169-
InternalId,
170-
<<PublishingId:64,
171-
FilterValueLength:16, FilterValue:FilterValueLength/binary,
172-
0:1,
173-
MessageSize:31,
174-
Message:MessageSize/binary,
175-
Rest/binary>>) ->
176-
write_messages0(V, ClusterLeader, PublisherRef, PublisherId, InternalId,
177-
PublishingId, {FilterValue, Message}, Rest).
178-
179-
write_messages0(Vsn, ClusterLeader, PublisherRef, PublisherId, InternalId, PublishingId, Data, Rest) ->
180-
Corr = case PublisherRef of
181-
undefined ->
182-
%% we add the internal ID to detect late confirms from a stale publisher
183-
{PublisherId, InternalId, PublishingId};
184-
_ ->
185-
%% we cannot add the internal ID because the correlation ID must be an integer
186-
%% when deduplication is activated.
187-
PublishingId
188-
end,
189-
ok = osiris:write(ClusterLeader, PublisherRef, Corr, Data),
190-
write_messages(Vsn, ClusterLeader, PublisherRef, PublisherId, InternalId, Rest).
191-
>>>>>>> 42bdcfb8e4 (Add incremental ID for non-dedup stream publishers)
121+
ok = osiris:write(ClusterLeader,
122+
PublisherRef,
123+
PublishingId,
124+
{batch,
125+
MessageCount,
126+
CompressionType,
127+
UncompressedSize,
128+
Batch}),
129+
write_messages(ClusterLeader, PublisherRef, PublisherId, InternalId, Rest).
192130

193131
parse_map(<<>>, _Count) ->
194132
{#{}, <<>>};

0 commit comments

Comments
 (0)