Skip to content

Commit 93eccae

Browse files
author
Loïc Hoguin
committed
Cleanup todos and unnecessary comments
1 parent 9b7cbfd commit 93eccae

File tree

2 files changed

+14
-195
lines changed

2 files changed

+14
-195
lines changed

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 12 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -45,47 +45,31 @@
4545
%% i.e. two pairs, so GC does not go idle when busy
4646
-define(MAXIMUM_SIMULTANEOUS_GC_FILES, 4).
4747

48+
%% We keep track of flying messages for writes and removes. The idea is that
49+
%% when a remove comes in before we could process the write, we skip the
50+
%% write and send a publisher confirm immediately. We later skip the remove
51+
%% as well since we didn't process the write.
52+
%%
4853
%% Flying events. They get added as things happen. The entry in the table
4954
%% may get removed after the server write or after the server remove. When
5055
%% the value is removed after a write, when the value is added again it
51-
%% will take the same value if the value was never removed.
56+
%% will be as if the value was never removed.
5257
%%
5358
%% So the possible values are only:
5459
%% - 1: client write
5560
%% - 3: client and server write
5661
%% - 7: client and server wrote, client remove before entry could be deleted
5762
%%
58-
%% Values 1 and 7 indicate there is a message in flight.
59-
60-
%% @todo
61-
%% We want to keep track of pending messages, we don't care about what the server has
62-
%% done with them (it will delete the object once it's done, if possible).
63-
%% So we only need two values: a write is in flight and a remove is in flight
63+
%% Values 1 and 7 indicate there is a message in flight: a write or a remove.
6464

65-
-define(FLYING_WRITE, 1). %% ets:insert_new? not really necessary but worth doing yes
66-
-define(FLYING_WRITE_DONE, 2). %% ets:update_counter followed by a tentative remove
67-
-define(FLYING_REMOVE, 4). %% ets:update_counter
65+
-define(FLYING_WRITE, 1). %% Message in transit for writing.
66+
-define(FLYING_WRITE_DONE, 2). %% Message written in the store.
67+
-define(FLYING_REMOVE, 4). %% Message removed from the store.
6868
%% Useful states.
6969
-define(FLYING_IS_WRITTEN, ?FLYING_WRITE + ?FLYING_WRITE_DONE). %% Write was handled.
7070
-define(FLYING_IS_IGNORED, ?FLYING_WRITE + ?FLYING_REMOVE). %% Remove before write was handled.
7171
-define(FLYING_IS_REMOVED, ?FLYING_WRITE + ?FLYING_WRITE_DONE + ?FLYING_REMOVE). %% Remove.
7272

73-
%% @todo OK so on write we can just insert_new and on remove we can just update_counter with a default.
74-
%% @todo On server write we lookup and then later we delete_object with value 3
75-
%% We could do a match of CRef + value 1 to get all flying messages to do multi-write
76-
%% if we wanted to, and group the confirms.
77-
%% @todo On server remove we lookup and then delete by key
78-
%%
79-
%% @todo What to do if a value is not as expected? Can this happen? Probably not?
80-
%% Maybe not on store crash but what about queue crash? It's possible that
81-
%% in that case we get a flying entry but not a message so we could leak
82-
%% that way...
83-
84-
%% We keep track of flying messages for writes and removes. The idea is that
85-
%% when a remove comes in before we could process the write, we skip the
86-
%% write and send a publisher confirm immediately. We later skip the remove
87-
%% as well since we didn't process the write.
88-
8973
%%----------------------------------------------------------------------------
9074

9175
-record(msstate,
@@ -511,20 +495,6 @@ read(MsgId,
511495
{{ok, Msg}, CState}
512496
end.
513497

514-
%% write to cache
515-
%% cast {write,...}
516-
%% some other write makes the file roll over and delete everything from the cache NOPE NOT EVERYTHING IT DOESN'T DELETE IF ENTRY WASN'T WRITTEN YET (or ignored)
517-
%% read from cache: entry not there
518-
%% read from index: {write,...} not processed yet
519-
%% crash NOPE!!! SEE ABOVE
520-
521-
%% So the not_found is impossible unless there's a bug.
522-
523-
%% what we want: the cache shouldn't be completely wiped, only the entries that don't match the current file
524-
%% maybe use a timestamp to allow us to only remove what was fully written and set that timestamp in both cache entry and {write,...} but I don't think that is good enough
525-
%% nope. OK what else can be used. We can track what messages were truly written and only remove those from the cache?
526-
%% but doing that in the main process could be slow, maybe do that in the GC process?
527-
528498
-spec read_many([rabbit_types:msg_id()], client_msstate()) -> #{rabbit_types:msg_id() => msg()}.
529499

530500
%% We disable read_many when the index module is not ETS for the time being.
@@ -684,36 +654,6 @@ client_read3(#msg_location { msg_id = MsgId, file = File },
684654
{{ok, Msg}, CState1}
685655
end.
686656

687-
%% @todo Can we merge the flying behavior with the cur_cache refc stuff?
688-
%% If a message is in cur_cache it will be refc so the first time
689-
%% the {write,...} makes it through we can check cur_cache instead?
690-
%% If value has 0 refc we don't write?
691-
%client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
692-
% client_ref = CRef }) ->
693-
% Key = {MsgId, CRef},
694-
% %% @todo Use ets:update_counter with a default value.
695-
% case ets:insert_new(FlyingEts, {Key, Diff}) of
696-
% true -> ok;
697-
% false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) of
698-
% 0 -> ok;
699-
% Diff -> ok;
700-
% Err when Err >= 2 ->
701-
% %% The message must be referenced twice in the queue
702-
% %% index. There is a bug somewhere, but we don't want
703-
% %% to take down anything just because of this. Let's
704-
% %% process the message as if the copies were in
705-
% %% different queues (fan-out).
706-
% ok;
707-
% Err -> throw({bad_flying_ets_update, Diff, Err, Key})
708-
% catch error:badarg ->
709-
% %% this is guaranteed to succeed since the
710-
% %% server only removes and updates flying_ets
711-
% %% entries; it never inserts them
712-
% true = ets:insert_new(FlyingEts, {Key, Diff})
713-
% end,
714-
% ok
715-
% end.
716-
717657
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
718658
dying_clients = DyingClients }) ->
719659
State #msstate { cref_to_msg_ids = maps:remove(CRef, CTM),
@@ -895,73 +835,10 @@ handle_cast({client_delete, CRef},
895835
State1 = State #msstate { clients = maps:remove(CRef, Clients) },
896836
noreply(clear_client(CRef, State1));
897837

898-
%% @todo I think we don't need the flying table as long as we use the value from the cache instead.
899-
%% We only need to write the message if it wasn't already written in the current file (keep keys?)
900-
%% and the cache is > 0. We should NOT worry about messages in previous files (?) well we should
901-
%% because the index can only have a single value so writing to another file doesn't work.
902-
%% Can we just update_counter the index and if it doesn't exist then write?
903-
%%
904-
%% client: - write to ets cache OR increase refc if already exists (which one takes priority? write if not fan-out, refc if fan-out, but can't know yet so write)
905-
%% - send {write...}
906-
%% DON'T update_flying anymore! The refc is doing it for us
907-
%% how does the refc work? we probably should +2 the refc on client_write because it will be decreased by both client ack and write?
908-
%%
909-
%% server: - -1 the cache; if result is 0 then we don't need to write (it was already acked)
910-
%% HMMM but we can't figure things out like this if there's fan-out
911-
%% so if fan-out we just increase/decrease the index (assuming non-zero) like we do before,
912-
%% we will just not have the flying optimisation in that case?
913-
%%
914-
%%
915-
%% remove: how do we know it was not written? we just read from the cache and if value is 0... Nope
916-
%% we want to just -1 client-side and only tell the message store the message is gone if the value is 0 ideally or if the value isn't in the cache (of course)
917-
%%
918-
%%
919-
%% when should values be removed from the cache in that case? when the file rolls over I guess? I guess that's why flying and refc are separate
920-
921-
922-
923-
924-
%% we write to cache everytime we do a {write,...}
925-
%% we want to avoid writing to disk if not necessary
926-
927-
%% write: to cache with +1 refc and +1 pending_write
928-
%% {write,...}: +0 refc -1 pending_write
929-
%% if refc == 0 -> don't write
930-
%% do we need to track pending_write at all?
931-
%% what about confirms? maybe never try to do them early?
932-
%% remove: -1 refc
933-
934-
935-
936-
937-
%% write
938-
%% refc+1
939-
%% ignore
940-
%% confirm?
941-
942-
943-
944-
945838
handle_cast({write, CRef, MsgRef, MsgId, Flow},
946839
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
947840
clients = Clients,
948841
credit_disc_bound = CreditDiscBound }) ->
949-
950-
%% @todo Figure out how multi-write would work out with Flow.
951-
%% With noflow, no problem. With flow, we must ack for
952-
%% each message we end up writing?
953-
%%
954-
%% @todo Should we send a message per write? Probably, but
955-
%% the message should say "there is something to write"
956-
%% and we will look at the ets table?
957-
958-
%% @todo How do we know which messages were handled and which weren't? update_flying?
959-
%% Won't a multi-write prevent the flying (write/remove) optimisation?
960-
961-
%% @todo We don't want to multi-write we want to multi-confirm, so the update_flying
962-
%% we do here we instead want to do it on multiple messages if possible. So
963-
%% get all messages from CRef that would end up ignored and do them all at once.
964-
965842
case Flow of
966843
flow -> {CPid, _} = maps:get(CRef, Clients),
967844
%% We are going to process a message sent by the
@@ -989,10 +866,6 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow},
989866
%% current file then the cache entry will be removed by
990867
%% the normal logic for that in write_message/4 and
991868
%% maybe_roll_to_new_file/2.
992-
%% @todo OK I think this is the core of the issue with
993-
%% the cache. There's got to be a better way that
994-
%% allows keeping the cache while not preventing
995-
%% fast writes.
996869
case index_lookup(MsgId, State1) of
997870
[#msg_location { file = File }]
998871
when File == State1 #msstate.current_file ->
@@ -1162,21 +1035,6 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
11621035
client_confirm(CRef, MsgIds, written, StateN)
11631036
end, State1, CGs).
11641037

1165-
%% ets:match({{_, CRef}, 0})
1166-
%% Delete all these objects.
1167-
%% Check if MsgId is in there. Hmm no we can't assume we process in that case...
1168-
%% OK we NEVER remove the entry before we look in the case of write. So for write we will always have MsgId in there if it has 0.
1169-
%% So if MsgId is in there we delete the objects and we ignore.
1170-
%% But we can't delete the objects randomly since the counter may get updated we have to delete the objects we found explicitly (and only if they still say 0).
1171-
%% OK but how do we avoid sending confirms multiple times then? The write message is coming anyway...
1172-
%% Hmmm....
1173-
%% We need to get rid of gen_server2? Probably too hard right now.
1174-
%% We could keep track of which MsgIds were already processed and ignore the message in that case.
1175-
%% Or since we delete_object we could just update_flying like before? But if the message was
1176-
%% already processed we will ignore it and we don't want to send a confirm again... Tough.
1177-
%% (ignore) But we can do the ets:lookup and then if it's an ignore we match for the other ones?
1178-
%% (ignore) But we'll have MORE ets operations in that case because we still have incoming write/remove...
1179-
11801038
flying_write(Key, #msstate { flying_ets = FlyingEts }) ->
11811039
case ets:lookup(FlyingEts, Key) of
11821040
[{_, ?FLYING_WRITE}] ->
@@ -1200,36 +1058,6 @@ flying_remove(Key, #msstate { flying_ets = FlyingEts }) ->
12001058
true = ets:delete(FlyingEts, Key),
12011059
Res.
12021060

1203-
%update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
1204-
% Key = {MsgId, CRef},
1205-
% NDiff = -Diff,
1206-
% case ets:lookup(FlyingEts, Key) of
1207-
% [] -> ignore; %% @todo How is that possible?! If we have a message we must have an entry...
1208-
% [{_, Diff}] -> ignore; %% [1]
1209-
% [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}),
1210-
% true = ets:delete_object(FlyingEts, {Key, 0}),
1211-
% process;
1212-
% [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}),
1213-
% ignore;
1214-
% [{_, Err}] when Err >= 2 ->
1215-
% %% The message must be referenced twice in the queue index. There
1216-
% %% is a bug somewhere, but we don't want to take down anything
1217-
% %% just because of this. Let's process the message as if the
1218-
% %% copies were in different queues (fan-out).
1219-
% ets:update_counter(FlyingEts, Key, {2, Diff}),
1220-
% true = ets:delete_object(FlyingEts, {Key, 0}),
1221-
% process;
1222-
% [{_, Err}] -> throw({bad_flying_ets_record, Diff, Err, Key})
1223-
% end.
1224-
%% [1] We can get here, for example, in the following scenario: There
1225-
%% is a write followed by a remove in flight. The counter will be 0,
1226-
%% so on processing the write the server attempts to delete the
1227-
%% entry. If at that point the client injects another write it will
1228-
%% either insert a new entry, containing +1, or increment the existing
1229-
%% entry to +1, thus preventing its removal. Either way therefore when
1230-
%% the server processes the read, the counter will be +1.
1231-
%% @todo But why would the client insert the same MsgId twice?
1232-
12331061
%% The general idea is that when a client of a transient message store is dying,
12341062
%% we want to avoid writing messages that would end up being removed immediately
12351063
%% after.
@@ -1281,9 +1109,6 @@ write_message(MsgId, Msg, CRef,
12811109
{write, State1} ->
12821110
write_message(MsgId, Msg,
12831111
record_pending_confirm(CRef, MsgId, State1));
1284-
%% @todo Where does the confirm gets sent?
1285-
%% They aren't because those messages will not get written or confirmed
1286-
%% because the queue that sent the messages is shutting down.
12871112
{ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
12881113
State1;
12891114
{ignore, _File, State1} ->
@@ -1377,9 +1202,6 @@ read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
13771202
file:open(form_filename(Dir, filenum_to_name(File)),
13781203
[binary, read, raw])
13791204
end,
1380-
1381-
% {ok, Hdl} = file:open(form_filename(Dir, filenum_to_name(File)),
1382-
% [binary, read, raw]),
13831205
{ok, {MsgId, Msg}} =
13841206
case rabbit_msg_file:pread(Hdl, Offset, TotalSize) of
13851207
{ok, {MsgId, _}} = Obj ->
@@ -1393,7 +1215,6 @@ read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
13931215
{proc_dict, get()}
13941216
]}}
13951217
end,
1396-
% ok = file:close(Hdl),
13971218
{Msg, State#client_msstate{ current_file = {File, Hdl}}}.
13981219

13991220
contains_message(MsgId, From, State) ->
@@ -1506,7 +1327,6 @@ open_file(Dir, FileName, Mode) ->
15061327
mark_handle_open(FileHandlesEts, File, Ref) ->
15071328
%% This is fine to fail (already exists). Note it could fail with
15081329
%% the value being close, and not have it updated to open.
1509-
%% @todo Should it fail? Probably not anymore.
15101330
ets:insert_new(FileHandlesEts, {{Ref, File}, erlang:monotonic_time()}),
15111331
true.
15121332

@@ -1810,9 +1630,8 @@ maybe_roll_to_new_file(
18101630
valid_total_size = 0,
18111631
file_size = 0,
18121632
locked = false }),
1813-
%% @todo We only delete those that have no reference???
1814-
%% Does that mean the cache can grow unbounded? No because we decrease the reference on {write,...}
1815-
%% But we do get potential memory issues if the store is hammered.
1633+
%% We only delete messages from the cache that were written to disk
1634+
%% in the previous file.
18161635
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
18171636
State1 #msstate { current_file_handle = NextHdl,
18181637
current_file = NextFile,

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2361,8 +2361,8 @@ sets_subtract(Set1, Set2) ->
23612361
end.
23622362

23632363
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
2364-
%% @todo Why does this behave like when msgs AND indices are written? indices may not be written yet here?
2365-
%% Right that's because the queue already acked it so it doesn't matter whether it's written to index.
2364+
%% The message was already acked so it doesn't matter if it was never written
2365+
%% to the index, we can process the confirm.
23662366
Callback(?MODULE,
23672367
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
23682368
msgs_written_to_disk(Callback, MsgIdSet, written) ->

0 commit comments

Comments
 (0)