81
81
% % in enqueue messages. Used to ensure ordering of messages send from the
82
82
% % same process
83
83
84
- -type msg_header () :: #{delivery_count => non_neg_integer ()}.
84
+ -type msg_header () :: #{size := msg_size (),
85
+ delivery_count => non_neg_integer ()}.
85
86
% % The message header map:
86
87
% % delivery_count: the number of unsuccessful delivery attempts.
87
88
% % A non-zero value indicates a previous attempt.
94
95
95
96
-type indexed_msg () :: {ra_index (), msg ()}.
96
97
97
- -type prefix_msg () :: {'$prefix_msg' , msg_size ()}.
98
+ -type prefix_msg () :: {'$prefix_msg' , msg_header ()}.
98
99
99
100
-type delivery_msg () :: {msg_id (), msg ()}.
100
101
% % A tuple consisting of the message id and the headered message.
242
243
% % overflow calculations).
243
244
% % This is done so that consumers are still served in a deterministic
244
245
% % order on recovery.
245
- prefix_msgs = {[], []} :: {Return :: [msg_size ()],
246
- PrefixMsgs :: [msg_size ()]},
246
+ prefix_msgs = {[], []} :: {Return :: [msg_header ()],
247
+ PrefixMsgs :: [msg_header ()]},
247
248
msg_bytes_enqueue = 0 :: non_neg_integer (),
248
249
msg_bytes_checkout = 0 :: non_neg_integer (),
249
250
max_length :: maybe (non_neg_integer ()),
@@ -970,11 +971,9 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
970
971
end .
971
972
972
973
apply_enqueue (#{index := RaftIdx } = Meta , From , Seq , RawMsg , State0 ) ->
973
- Bytes = message_size (RawMsg ),
974
974
case maybe_enqueue (RaftIdx , From , Seq , RawMsg , [], State0 ) of
975
975
{ok , State1 , Effects1 } ->
976
- State2 = append_to_master_index (RaftIdx ,
977
- add_bytes_enqueue (Bytes , State1 )),
976
+ State2 = append_to_master_index (RaftIdx , State1 ),
978
977
{State , ok , Effects } = checkout (Meta , State2 , Effects1 ),
979
978
{maybe_store_dehydrated_state (RaftIdx , State ), ok , Effects };
980
979
{duplicate , State , Effects } ->
@@ -991,7 +990,7 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
991
990
Effects = dead_letter_effects (maxlen , maps :put (none , FullMsg , #{}),
992
991
State , Effects0 ),
993
992
{State , Effects };
994
- {{'$prefix_msg' , Bytes }, State1 } ->
993
+ {{'$prefix_msg' , #{ size : = Bytes } }, State1 } ->
995
994
State = add_bytes_drop (Bytes , State1 ),
996
995
{State , Effects0 };
997
996
empty ->
@@ -1001,12 +1000,14 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
1001
1000
enqueue (RaftIdx , RawMsg , # state {messages = Messages ,
1002
1001
low_msg_num = LowMsgNum ,
1003
1002
next_msg_num = NextMsgNum } = State0 ) ->
1004
- Msg = {RaftIdx , {#{}, RawMsg }}, % indexed message with header map
1005
- State0 # state {messages = Messages #{NextMsgNum => Msg },
1006
- % this is probably only done to record it when low_msg_num
1007
- % is undefined
1008
- low_msg_num = min (LowMsgNum , NextMsgNum ),
1009
- next_msg_num = NextMsgNum + 1 }.
1003
+ Size = message_size (RawMsg ),
1004
+ Msg = {RaftIdx , {#{size => Size }, RawMsg }}, % indexed message with header map
1005
+ State = add_bytes_enqueue (Size , State0 ),
1006
+ State # state {messages = Messages #{NextMsgNum => Msg },
1007
+ % this is probably only done to record it when low_msg_num
1008
+ % is undefined
1009
+ low_msg_num = min (LowMsgNum , NextMsgNum ),
1010
+ next_msg_num = NextMsgNum + 1 }.
1010
1011
1011
1012
append_to_master_index (RaftIdx ,
1012
1013
# state {ra_indexes = Indexes0 } = State0 ) ->
@@ -1088,11 +1089,14 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
1088
1089
credit = increase_credit (Con0 , length (MsgNumMsgs ))},
1089
1090
{Cons , SQ , Effects1 } = update_or_remove_sub (ConsumerId , Con , Cons0 ,
1090
1091
SQ0 , Effects0 ),
1091
- {State1 , Effects2 } = lists :foldl (fun ({'$prefix_msg' , _ } = Msg , {S0 , E0 }) ->
1092
- return_one (0 , Msg , S0 , E0 , ConsumerId , Con );
1093
- ({MsgNum , Msg }, {S0 , E0 }) ->
1094
- return_one (MsgNum , Msg , S0 , E0 , ConsumerId , Con )
1095
- end , {State0 , Effects1 }, MsgNumMsgs ),
1092
+ {State1 , Effects2 } = lists :foldl (
1093
+ fun ({'$prefix_msg' , _ } = Msg , {S0 , E0 }) ->
1094
+ return_one (0 , Msg , S0 , E0 ,
1095
+ ConsumerId , Con );
1096
+ ({MsgNum , Msg }, {S0 , E0 }) ->
1097
+ return_one (MsgNum , Msg , S0 , E0 ,
1098
+ ConsumerId , Con )
1099
+ end , {State0 , Effects1 }, MsgNumMsgs ),
1096
1100
checkout (Meta , State1 # state {consumers = Cons ,
1097
1101
service_queue = SQ },
1098
1102
Effects2 ).
@@ -1152,9 +1156,8 @@ dead_letter_effects(_Reason, _Discarded,
1152
1156
Effects ;
1153
1157
dead_letter_effects (Reason , Discarded ,
1154
1158
# state {dead_letter_handler = {Mod , Fun , Args }}, Effects ) ->
1155
- DeadLetters = maps :fold (fun (_ , {_ , {_ , {_Header , Msg }}},
1156
- % MsgId, MsgIdID, RaftId, Header
1157
- Acc ) -> [{Reason , Msg } | Acc ]
1159
+ DeadLetters = maps :fold (fun (_ , {_ , {_ , {_Header , Msg }}}, Acc ) ->
1160
+ [{Reason , Msg } | Acc ]
1158
1161
end , [], Discarded ),
1159
1162
[{mod_call , Mod , Fun , Args ++ [DeadLetters ]} | Effects ].
1160
1163
@@ -1200,24 +1203,44 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
1200
1203
{Potential , Cursors0 }
1201
1204
end .
1202
1205
1203
- return_one (0 , {'$prefix_msg' , _ } = Msg ,
1204
- # state {returns = Returns } = State0 , Effects , _ConsumerId , _Con ) ->
1205
- {add_bytes_return (Msg ,
1206
- State0 # state {returns = lqueue :in (Msg , Returns )}), Effects };
1206
+ return_one (0 , {'$prefix_msg' , Header0 },
1207
+ # state {returns = Returns ,
1208
+ delivery_limit = DeliveryLimit } = State0 , Effects0 ,
1209
+ ConsumerId , Con ) ->
1210
+ Header = maps :update_with (delivery_count ,
1211
+ fun (C ) -> C + 1 end ,
1212
+ 1 , Header0 ),
1213
+ Msg = {'$prefix_msg' , Header },
1214
+ case maps :get (delivery_count , Header ) of
1215
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
1216
+ Checked = Con # consumer .checked_out ,
1217
+ {State1 , Effects } = complete (ConsumerId , [], 1 , Con , Checked ,
1218
+ Effects0 , State0 ),
1219
+ {add_bytes_settle (Msg , State1 ), Effects };
1220
+ _ ->
1221
+ % % this should not affect the release cursor in any way
1222
+ {add_bytes_return (Msg ,
1223
+ State0 # state {returns = lqueue :in (Msg , Returns )}),
1224
+ Effects0 }
1225
+ end ;
1207
1226
return_one (MsgNum , {RaftId , {Header0 , RawMsg }},
1208
1227
# state {returns = Returns ,
1209
- delivery_limit = DeliveryLimit } = State0 , Effects0 , ConsumerId , Con ) ->
1228
+ delivery_limit = DeliveryLimit } = State0 ,
1229
+ Effects0 , ConsumerId , Con ) ->
1210
1230
Header = maps :update_with (delivery_count ,
1211
1231
fun (C ) -> C + 1 end ,
1212
1232
1 , Header0 ),
1233
+ Msg = {RaftId , {Header , RawMsg }},
1213
1234
case maps :get (delivery_count , Header ) of
1214
1235
DeliveryCount when DeliveryCount > DeliveryLimit ->
1215
- Effects = dead_letter_effects (rejected , maps :put (none , {MsgNum , {RaftId , {Header , RawMsg }}}, #{}), State0 , Effects0 ),
1216
- Checked = maps :without ([MsgNum ], Con # consumer .checked_out ),
1217
- {State1 , Effects1 } = complete (ConsumerId , [RaftId ], 1 , Con , Checked , Effects , State0 ),
1236
+ DlMsg = {MsgNum , Msg },
1237
+ Effects = dead_letter_effects (rejected , maps :put (none , DlMsg , #{}),
1238
+ State0 , Effects0 ),
1239
+ Checked = Con # consumer .checked_out ,
1240
+ {State1 , Effects1 } = complete (ConsumerId , [RaftId ], 1 , Con , Checked ,
1241
+ Effects , State0 ),
1218
1242
{add_bytes_settle (RawMsg , State1 ), Effects1 };
1219
1243
_ ->
1220
- Msg = {RaftId , {Header , RawMsg }},
1221
1244
% % this should not affect the release cursor in any way
1222
1245
{add_bytes_return (RawMsg ,
1223
1246
State0 # state {returns = lqueue :in ({MsgNum , Msg }, Returns )}), Effects0 }
@@ -1293,9 +1316,9 @@ append_send_msg_effects(Effects0, AccMap) ->
1293
1316
% %
1294
1317
% % When we return it is always done to the current return queue
1295
1318
% % for both prefix messages and current messages
1296
- take_next_msg (# state {prefix_msgs = {[Bytes | Rem ], P }} = State ) ->
1319
+ take_next_msg (# state {prefix_msgs = {[Header | Rem ], P }} = State ) ->
1297
1320
% % there are prefix returns, these should be served first
1298
- {{'$prefix_msg' , Bytes },
1321
+ {{'$prefix_msg' , Header },
1299
1322
State # state {prefix_msgs = {Rem , P }}};
1300
1323
take_next_msg (# state {returns = Returns ,
1301
1324
low_msg_num = Low0 ,
@@ -1325,9 +1348,9 @@ take_next_msg(#state{returns = Returns,
1325
1348
end
1326
1349
end ;
1327
1350
empty ->
1328
- [Bytes | Rem ] = P ,
1351
+ [Header | Rem ] = P ,
1329
1352
% % There are prefix msgs
1330
- {{'$prefix_msg' , Bytes },
1353
+ {{'$prefix_msg' , Header },
1331
1354
State # state {prefix_msgs = {R , Rem }}}
1332
1355
end .
1333
1356
@@ -1486,15 +1509,15 @@ dehydrate_state(#state{messages = Messages,
1486
1509
returns = Returns ,
1487
1510
prefix_msgs = {PrefRet0 , PrefMsg0 }} = State ) ->
1488
1511
% % TODO: optimise this function as far as possible
1489
- PrefRet = lists :foldl (fun ({'$prefix_msg' , Bytes }, Acc ) ->
1490
- [Bytes | Acc ];
1491
- ({_ , {_ , {_ , Raw }}}, Acc ) ->
1492
- [message_size ( Raw ) | Acc ]
1512
+ PrefRet = lists :foldl (fun ({'$prefix_msg' , Header }, Acc ) ->
1513
+ [Header | Acc ];
1514
+ ({_ , {_ , {Header , _ }}}, Acc ) ->
1515
+ [Header | Acc ]
1493
1516
end ,
1494
1517
lists :reverse (PrefRet0 ),
1495
1518
lqueue :to_list (Returns )),
1496
- PrefMsgs = lists :foldl (fun ({_ , {_RaftIdx , {_H , Raw }}}, Acc ) ->
1497
- [message_size ( Raw ) | Acc ]
1519
+ PrefMsgs = lists :foldl (fun ({_ , {_RaftIdx , {Header , _ }}}, Acc ) ->
1520
+ [Header | Acc ]
1498
1521
end ,
1499
1522
lists :reverse (PrefMsg0 ),
1500
1523
lists :sort (maps :to_list (Messages ))),
@@ -1512,8 +1535,8 @@ dehydrate_state(#state{messages = Messages,
1512
1535
dehydrate_consumer (# consumer {checked_out = Checked0 } = Con ) ->
1513
1536
Checked = maps :map (fun (_ , {'$prefix_msg' , _ } = M ) ->
1514
1537
M ;
1515
- (_ , {_ , {_ , {_ , Raw }}}) ->
1516
- {'$prefix_msg' , message_size ( Raw ) }
1538
+ (_ , {_ , {_ , {Header , _ }}}) ->
1539
+ {'$prefix_msg' , Header }
1517
1540
end , Checked0 ),
1518
1541
Con # consumer {checked_out = Checked }.
1519
1542
@@ -1591,7 +1614,7 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
1591
1614
message_size (# basic_message {content = Content }) ->
1592
1615
# content {payload_fragments_rev = PFR } = Content ,
1593
1616
iolist_size (PFR );
1594
- message_size ({'$prefix_msg' , B }) ->
1617
+ message_size ({'$prefix_msg' , #{ size : = B } }) ->
1595
1618
B ;
1596
1619
message_size (B ) when is_binary (B ) ->
1597
1620
byte_size (B );
0 commit comments