Skip to content

Commit b9cac83

Browse files
authored
Merge pull request #10158 from rabbitmq/stream-publisher-internal-id
Add internal incremental ID for stream publishers
2 parents f206ebb + 42bdcfb commit b9cac83

File tree

2 files changed

+42
-24
lines changed

2 files changed

+42
-24
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@
2525
-type publisher_id() :: byte().
2626
-type publisher_reference() :: binary().
2727
-type subscription_id() :: byte().
28+
-type internal_id() :: integer().
2829

2930
-record(publisher,
3031
{publisher_id :: publisher_id(),
3132
stream :: stream(),
3233
reference :: undefined | publisher_reference(),
3334
leader :: pid(),
34-
message_counters :: atomics:atomics_ref()}).
35+
message_counters :: atomics:atomics_ref(),
36+
%% use to distinguish a stale publisher from a live publisher with the same ID
37+
%% used only for publishers without a reference (dedup off)
38+
internal_id :: internal_id()}).
3539
-record(consumer_configuration,
3640
{socket :: rabbit_net:socket(), %% ranch_transport:socket(),
3741
member_pid :: pid(),
@@ -96,7 +100,9 @@
96100
deliver_version :: rabbit_stream_core:command_version(),
97101
request_timeout :: pos_integer(),
98102
outstanding_requests_timer :: undefined | erlang:reference(),
99-
filtering_supported :: boolean()}).
103+
filtering_supported :: boolean(),
104+
%% internal sequence used for publishers
105+
internal_sequence = 0 :: integer()}).
100106
-record(configuration,
101107
{initial_credits :: integer(),
102108
credits_required_for_unblocking :: integer(),
@@ -1034,16 +1040,17 @@ open(cast,
10341040
config = Configuration} =
10351041
StatemData) ->
10361042
ByPublisher =
1037-
lists:foldr(fun({PublisherId, PublishingId}, Acc) ->
1038-
case maps:is_key(PublisherId, Publishers) of
1039-
true ->
1043+
lists:foldr(fun({PublisherId, InternalId, PublishingId}, Acc) ->
1044+
case Publishers of
1045+
#{PublisherId := #publisher{internal_id = InternalId}} ->
10401046
case maps:get(PublisherId, Acc, undefined) of
10411047
undefined ->
10421048
Acc#{PublisherId => [PublishingId]};
10431049
Ids ->
10441050
Acc#{PublisherId => [PublishingId | Ids]}
10451051
end;
1046-
false -> Acc
1052+
_ ->
1053+
Acc
10471054
end
10481055
end,
10491056
#{}, CorrelationList),
@@ -1773,7 +1780,8 @@ handle_frame_post_auth(Transport,
17731780
{Connection0, State};
17741781
{ClusterLeader,
17751782
#stream_connection{publishers = Publishers0,
1776-
publisher_to_ids = RefIds0} =
1783+
publisher_to_ids = RefIds0,
1784+
internal_sequence = InternalSequence} =
17771785
Connection1} ->
17781786
{PublisherReference, RefIds1} =
17791787
case WriterRef of
@@ -1791,7 +1799,8 @@ handle_frame_post_auth(Transport,
17911799
leader = ClusterLeader,
17921800
message_counters =
17931801
atomics:new(3,
1794-
[{signed, false}])},
1802+
[{signed, false}]),
1803+
internal_id = InternalSequence},
17951804
response(Transport,
17961805
Connection0,
17971806
declare_publisher,
@@ -1802,12 +1811,10 @@ handle_frame_post_auth(Transport,
18021811
Connection1),
18031812
PublisherId,
18041813
PublisherReference),
1805-
{Connection1#stream_connection{publishers =
1806-
Publishers0#{PublisherId
1807-
=>
1808-
Publisher},
1809-
publisher_to_ids =
1810-
RefIds1},
1814+
{Connection1#stream_connection{
1815+
publishers = Publishers0#{PublisherId => Publisher},
1816+
publisher_to_ids = RefIds1,
1817+
internal_sequence = InternalSequence + 1},
18111818
State}
18121819
end;
18131820
{_, _} ->
@@ -1881,6 +1888,7 @@ handle_frame_post_auth(Transport,
18811888
#{PublisherId := Publisher} ->
18821889
#publisher{stream = Stream,
18831890
reference = Reference,
1891+
internal_id = InternalId,
18841892
leader = Leader,
18851893
message_counters = Counters} =
18861894
Publisher,
@@ -1891,6 +1899,7 @@ handle_frame_post_auth(Transport,
18911899
rabbit_stream_utils:write_messages(Version, Leader,
18921900
Reference,
18931901
PublisherId,
1902+
InternalId,
18941903
Messages),
18951904
sub_credits(Credits, MessageCount),
18961905
{Connection, State};

deps/rabbitmq_stream/src/rabbit_stream_utils.erl

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
%% API
2222
-export([enforce_correct_name/1,
23-
write_messages/5,
23+
write_messages/6,
2424
parse_map/2,
2525
auth_mechanisms/1,
2626
auth_mechanism_to_module/2,
@@ -60,21 +60,23 @@ check_name(<<"">>) ->
6060
check_name(_Name) ->
6161
ok.
6262

63-
write_messages(_Version, _ClusterLeader, _PublisherRef, _PublisherId, <<>>) ->
63+
write_messages(_Version, _ClusterLeader, _PublisherRef, _PublisherId, _InternalId, <<>>) ->
6464
ok;
6565
write_messages(?VERSION_1 = V, ClusterLeader,
6666
PublisherRef,
6767
PublisherId,
68+
InternalId,
6869
<<PublishingId:64,
6970
0:1,
7071
MessageSize:31,
7172
Message:MessageSize/binary,
7273
Rest/binary>>) ->
73-
write_messages0(V, ClusterLeader, PublisherRef, PublisherId,
74+
write_messages0(V, ClusterLeader, PublisherRef, PublisherId, InternalId,
7475
PublishingId, Message, Rest);
7576
write_messages(?VERSION_1 = V, ClusterLeader,
7677
PublisherRef,
7778
PublisherId,
79+
InternalId,
7880
<<PublishingId:64,
7981
1:1,
8082
CompressionType:3,
@@ -85,38 +87,45 @@ write_messages(?VERSION_1 = V, ClusterLeader,
8587
Batch:BatchSize/binary,
8688
Rest/binary>>) ->
8789
Data = {batch, MessageCount, CompressionType, UncompressedSize, Batch},
88-
write_messages0(V, ClusterLeader, PublisherRef, PublisherId,
90+
write_messages0(V, ClusterLeader, PublisherRef, PublisherId, InternalId,
8991
PublishingId, Data, Rest);
9092
write_messages(?VERSION_2 = V, ClusterLeader,
9193
PublisherRef,
9294
PublisherId,
95+
InternalId,
9396
<<PublishingId:64,
9497
-1:16/signed,
9598
0:1,
9699
MessageSize:31,
97100
Message:MessageSize/binary,
98101
Rest/binary>>) ->
99-
write_messages0(V, ClusterLeader, PublisherRef, PublisherId,
102+
write_messages0(V, ClusterLeader, PublisherRef, PublisherId, InternalId,
100103
PublishingId, Message, Rest);
101104
write_messages(?VERSION_2 = V, ClusterLeader,
102105
PublisherRef,
103106
PublisherId,
107+
InternalId,
104108
<<PublishingId:64,
105109
FilterValueLength:16, FilterValue:FilterValueLength/binary,
106110
0:1,
107111
MessageSize:31,
108112
Message:MessageSize/binary,
109113
Rest/binary>>) ->
110-
write_messages0(V, ClusterLeader, PublisherRef, PublisherId,
114+
write_messages0(V, ClusterLeader, PublisherRef, PublisherId, InternalId,
111115
PublishingId, {FilterValue, Message}, Rest).
112116

113-
write_messages0(Vsn, ClusterLeader, PublisherRef, PublisherId, PublishingId, Data, Rest) ->
117+
write_messages0(Vsn, ClusterLeader, PublisherRef, PublisherId, InternalId, PublishingId, Data, Rest) ->
114118
Corr = case PublisherRef of
115-
undefined -> {PublisherId, PublishingId};
116-
_ -> PublishingId
119+
undefined ->
120+
%% we add the internal ID to detect late confirms from a stale publisher
121+
{PublisherId, InternalId, PublishingId};
122+
_ ->
123+
%% we cannot add the internal ID because the correlation ID must be an integer
124+
%% when deduplication is activated.
125+
PublishingId
117126
end,
118127
ok = osiris:write(ClusterLeader, PublisherRef, Corr, Data),
119-
write_messages(Vsn, ClusterLeader, PublisherRef, PublisherId, Rest).
128+
write_messages(Vsn, ClusterLeader, PublisherRef, PublisherId, InternalId, Rest).
120129

121130
parse_map(<<>>, _Count) ->
122131
{#{}, <<>>};

0 commit comments

Comments
 (0)