-
Notifications
You must be signed in to change notification settings - Fork 154
Optimize publication path when messages are expiring #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,6 +59,7 @@ | |
-define(SERVER, ?MODULE). | ||
-define(TABLE_NAME, append_to_atom(?MODULE, node())). | ||
-define(INDEX_TABLE_NAME, append_to_atom(?TABLE_NAME, "_index")). | ||
-define(IDLE_DELAY_MS, 60 * 60 * 1000). | ||
|
||
-record(state, {timer}). | ||
|
||
|
@@ -127,14 +128,14 @@ handle_call({delay_message, Exchange, Delivery, Delay}, | |
handle_call(_Req, _From, State) -> | ||
{reply, unknown_request, State}. | ||
|
||
handle_cast(go, State = #state{timer = CurrTimer}) -> | ||
maybe_delay_first(CurrTimer), | ||
{noreply, State}; | ||
handle_cast(go, State = #state{}) -> | ||
delay_first(), | ||
{noreply, State#state{timer = delay_first()}}; | ||
handle_cast(_C, State) -> | ||
{noreply, State}. | ||
|
||
handle_info({timeout, _TimerRef, {deliver, Key}}, | ||
State = #state{timer = CurrTimer}) -> | ||
State = #state{}) -> | ||
case mnesia:dirty_read(?TABLE_NAME, Key) of | ||
[] -> | ||
ok; | ||
|
@@ -144,7 +145,7 @@ handle_info({timeout, _TimerRef, {deliver, Key}}, | |
mnesia:dirty_delete(?INDEX_TABLE_NAME, Key) | ||
end, | ||
|
||
{noreply, State#state{timer = maybe_delay_first(CurrTimer)}}; | ||
{noreply, State#state{timer = delay_first()}}; | ||
handle_info(_I, State) -> | ||
{noreply, State}. | ||
|
||
|
@@ -155,16 +156,17 @@ code_change(_, State, _) -> {ok, State}. | |
|
||
%%-------------------------------------------------------------------- | ||
|
||
maybe_delay_first(CurrTimer) -> | ||
delay_first() -> | ||
case mnesia:dirty_first(?INDEX_TABLE_NAME) of | ||
%% destructuring to prevent matching '$end_of_table' | ||
#delay_key{timestamp = FirstTS} = Key2 -> | ||
%% there are messages that expired and need to be delivered | ||
Now = time_compat:erlang_system_time(milli_seconds), | ||
start_timer(FirstTS - Now, Key2); | ||
_ -> | ||
%% nothing to do | ||
CurrTimer | ||
%% To avoid ugly special-cases on the enqueue path, always | ||
%% keep a timer. We set it far in the future to keep it cheap. | ||
start_timer(?IDLE_DELAY_MS, 'idle_timeout') | ||
end. | ||
|
||
route(#delay_key{exchange = Ex}, Deliveries) -> | ||
|
@@ -185,18 +187,7 @@ internal_delay_message(CurrTimer, Exchange, Delivery, Delay) -> | |
make_delay(DelayTS, Exchange, Delivery)), | ||
case erlang:read_timer(CurrTimer) of | ||
false -> | ||
%% last timer expired, we set a new timer for | ||
%% the next message to be delivered | ||
case mnesia:dirty_first(?INDEX_TABLE_NAME) of | ||
%% destructuring to prevent matching '$end_of_table' | ||
#delay_key{timestamp = FirstTS} = Key | ||
when FirstTS < DelayTS -> | ||
%% there are messages that expired and need to be delivered | ||
{ok, start_timer(FirstTS - Now, Key)}; | ||
_ -> | ||
%% empty table or DelayTS <= FirstTS | ||
{ok, start_timer(Delay, make_key(DelayTS, Exchange))} | ||
end; | ||
{ok, CurrTimer}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If for some reason (e.g. supervisor restart) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm probably missing something, but I don't see how that would work. As I understand it, there are two cases where we hit this code path. The first case is where no timer has ever been started and we need to start one to get the system moving. I thought that my changes to the The second case is where there was a timer, but it has expired because the message(s) at the head of the queue are ready for publication. The timer will be restarted when the timeout handler is called, but in the meantime there could be many requests to publish messages. I want to avoid starting a timer in that case, since it makes the publication code path very slow. Also, those extra timers probably aren't good for performance, either. My PR was intended to fix the second case while not breaking the first. Based on your comment, it sounds like I have actually broken the first. Your suggestion might help fix the first case, but I don't see how it applies to the second. Perhaps I need to take the more complicated approach I had considered earlier. We could store an extra boolean in the state that tells us whether or not a timer has been started. That would help differentiate the two cases so we could handle them differently, though it is a little more complicated. It's not my favorite solution, but I think it could work. Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I filed #53 to explore this approach. When we track the unset timer explicitly, we no longer need the idle timer, so the change looks quite different from what's proposed in this PR. |
||
CurrMS when Delay < CurrMS -> | ||
%% Current timer lasts longer that new message delay | ||
erlang:cancel_timer(CurrTimer), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two
delay_first
in a row. Is that on purpose?