Skip to content

Commit 50020f4

Browse files
authored
Merge pull request #10190 from rabbitmq/mergify/bp/v3.12.x/pr-10158
Add internal incremental ID for stream publishers (backport #10158)
2 parents 61568e6 + 7a3f038 commit 50020f4

File tree

2 files changed

+54
-48
lines changed

2 files changed

+54
-48
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@
2525
-type publisher_id() :: byte().
2626
-type publisher_reference() :: binary().
2727
-type subscription_id() :: byte().
28+
-type internal_id() :: integer().
2829

2930
-record(publisher,
3031
{publisher_id :: publisher_id(),
3132
stream :: stream(),
3233
reference :: undefined | publisher_reference(),
3334
leader :: pid(),
34-
message_counters :: atomics:atomics_ref()}).
35+
message_counters :: atomics:atomics_ref(),
36+
%% use to distinguish a stale publisher from a live publisher with the same ID
37+
%% used only for publishers without a reference (dedup off)
38+
internal_id :: internal_id()}).
3539
-record(consumer_configuration,
3640
{socket :: rabbit_net:socket(), %% ranch_transport:socket(),
3741
member_pid :: pid(),
@@ -95,7 +99,9 @@
9599
outstanding_requests :: #{integer() => #request{}},
96100
deliver_version :: rabbit_stream_core:command_version(),
97101
request_timeout :: pos_integer(),
98-
outstanding_requests_timer :: undefined | erlang:reference()}).
102+
outstanding_requests_timer :: undefined | erlang:reference(),
103+
%% internal sequence used for publishers
104+
internal_sequence = 0 :: integer()}).
99105
-record(configuration,
100106
{initial_credits :: integer(),
101107
credits_required_for_unblocking :: integer(),
@@ -1042,16 +1048,17 @@ open(cast,
10421048
config = Configuration} =
10431049
StatemData) ->
10441050
ByPublisher =
1045-
lists:foldr(fun({PublisherId, PublishingId}, Acc) ->
1046-
case maps:is_key(PublisherId, Publishers) of
1047-
true ->
1051+
lists:foldr(fun({PublisherId, InternalId, PublishingId}, Acc) ->
1052+
case Publishers of
1053+
#{PublisherId := #publisher{internal_id = InternalId}} ->
10481054
case maps:get(PublisherId, Acc, undefined) of
10491055
undefined ->
10501056
Acc#{PublisherId => [PublishingId]};
10511057
Ids ->
10521058
Acc#{PublisherId => [PublishingId | Ids]}
10531059
end;
1054-
false -> Acc
1060+
_ ->
1061+
Acc
10551062
end
10561063
end,
10571064
#{}, CorrelationList),
@@ -1680,7 +1687,8 @@ handle_frame_post_auth(Transport,
16801687
{Connection0, State};
16811688
{ClusterLeader,
16821689
#stream_connection{publishers = Publishers0,
1683-
publisher_to_ids = RefIds0} =
1690+
publisher_to_ids = RefIds0,
1691+
internal_sequence = InternalSequence} =
16841692
Connection1} ->
16851693
{PublisherReference, RefIds1} =
16861694
case WriterRef of
@@ -1698,7 +1706,8 @@ handle_frame_post_auth(Transport,
16981706
leader = ClusterLeader,
16991707
message_counters =
17001708
atomics:new(3,
1701-
[{signed, false}])},
1709+
[{signed, false}]),
1710+
internal_id = InternalSequence},
17021711
response(Transport,
17031712
Connection0,
17041713
declare_publisher,
@@ -1709,12 +1718,10 @@ handle_frame_post_auth(Transport,
17091718
Connection1),
17101719
PublisherId,
17111720
PublisherReference),
1712-
{Connection1#stream_connection{publishers =
1713-
Publishers0#{PublisherId
1714-
=>
1715-
Publisher},
1716-
publisher_to_ids =
1717-
RefIds1},
1721+
{Connection1#stream_connection{
1722+
publishers = Publishers0#{PublisherId => Publisher},
1723+
publisher_to_ids = RefIds1,
1724+
internal_sequence = InternalSequence + 1},
17181725
State}
17191726
end;
17201727
{_, _} ->
@@ -1752,6 +1759,7 @@ handle_frame_post_auth(Transport,
17521759
#{PublisherId := Publisher} ->
17531760
#publisher{stream = Stream,
17541761
reference = Reference,
1762+
internal_id = InternalId,
17551763
leader = Leader,
17561764
message_counters = Counters} =
17571765
Publisher,
@@ -1769,6 +1777,7 @@ handle_frame_post_auth(Transport,
17691777
rabbit_stream_utils:write_messages(Leader,
17701778
Reference,
17711779
PublisherId,
1780+
InternalId,
17721781
Messages),
17731782
sub_credits(Credits, MessageCount),
17741783
{Connection, State};

deps/rabbitmq_stream/src/rabbit_stream_utils.erl

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
%% API
2020
-export([enforce_correct_name/1,
21-
write_messages/4,
21+
write_messages/5,
2222
parse_map/2,
2323
auth_mechanisms/1,
2424
auth_mechanism_to_module/2,
@@ -54,26 +54,26 @@ check_name(<<"">>) ->
5454
check_name(_Name) ->
5555
ok.
5656

57-
write_messages(_ClusterLeader, undefined, _PublisherId, <<>>) ->
57+
write_messages(_ClusterLeader, undefined, _PublisherId, _InternalId, <<>>) ->
5858
ok;
5959
write_messages(ClusterLeader,
6060
undefined,
6161
PublisherId,
62+
InternalId,
6263
<<PublishingId:64,
6364
0:1,
6465
MessageSize:31,
6566
Message:MessageSize/binary,
6667
Rest/binary>>) ->
67-
% FIXME handle write error
68-
ok =
69-
osiris:write(ClusterLeader,
70-
undefined,
71-
{PublisherId, PublishingId},
72-
Message),
73-
write_messages(ClusterLeader, undefined, PublisherId, Rest);
68+
ok = osiris:write(ClusterLeader,
69+
undefined,
70+
{PublisherId, InternalId, PublishingId},
71+
Message),
72+
write_messages(ClusterLeader, undefined, PublisherId, InternalId, Rest);
7473
write_messages(ClusterLeader,
7574
undefined,
7675
PublisherId,
76+
InternalId,
7777
<<PublishingId:64,
7878
1:1,
7979
CompressionType:3,
@@ -83,33 +83,32 @@ write_messages(ClusterLeader,
8383
BatchSize:32,
8484
Batch:BatchSize/binary,
8585
Rest/binary>>) ->
86-
% FIXME handle write error
87-
ok =
88-
osiris:write(ClusterLeader,
89-
undefined,
90-
{PublisherId, PublishingId},
91-
{batch,
92-
MessageCount,
93-
CompressionType,
94-
UncompressedSize,
95-
Batch}),
96-
write_messages(ClusterLeader, undefined, PublisherId, Rest);
97-
write_messages(_ClusterLeader, _PublisherRef, _PublisherId, <<>>) ->
86+
ok = osiris:write(ClusterLeader,
87+
undefined,
88+
{PublisherId, InternalId, PublishingId},
89+
{batch,
90+
MessageCount,
91+
CompressionType,
92+
UncompressedSize,
93+
Batch}),
94+
write_messages(ClusterLeader, undefined, PublisherId, InternalId, Rest);
95+
write_messages(_ClusterLeader, _PublisherRef, _PublisherId, _InternalId, <<>>) ->
9896
ok;
9997
write_messages(ClusterLeader,
10098
PublisherRef,
10199
PublisherId,
100+
InternalId,
102101
<<PublishingId:64,
103102
0:1,
104103
MessageSize:31,
105104
Message:MessageSize/binary,
106105
Rest/binary>>) ->
107-
% FIXME handle write error
108106
ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message),
109-
write_messages(ClusterLeader, PublisherRef, PublisherId, Rest);
107+
write_messages(ClusterLeader, PublisherRef, PublisherId, InternalId, Rest);
110108
write_messages(ClusterLeader,
111109
PublisherRef,
112110
PublisherId,
111+
InternalId,
113112
<<PublishingId:64,
114113
1:1,
115114
CompressionType:3,
@@ -119,17 +118,15 @@ write_messages(ClusterLeader,
119118
BatchSize:32,
120119
Batch:BatchSize/binary,
121120
Rest/binary>>) ->
122-
% FIXME handle write error
123-
ok =
124-
osiris:write(ClusterLeader,
125-
PublisherRef,
126-
PublishingId,
127-
{batch,
128-
MessageCount,
129-
CompressionType,
130-
UncompressedSize,
131-
Batch}),
132-
write_messages(ClusterLeader, PublisherRef, PublisherId, Rest).
121+
ok = osiris:write(ClusterLeader,
122+
PublisherRef,
123+
PublishingId,
124+
{batch,
125+
MessageCount,
126+
CompressionType,
127+
UncompressedSize,
128+
Batch}),
129+
write_messages(ClusterLeader, PublisherRef, PublisherId, InternalId, Rest).
133130

134131
parse_map(<<>>, _Count) ->
135132
{#{}, <<>>};

0 commit comments

Comments
 (0)