Skip to content

Optimize publication path when messages are expiring #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rabbitmq-components.mk
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dep_rabbitmq_event_exchange = git_rmq rabbitmq-event-exchange $(curren
dep_rabbitmq_federation = git_rmq rabbitmq-federation $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_federation_management = git_rmq rabbitmq-federation-management $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_java_client = git_rmq rabbitmq-java-client $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_jms_topic_exchange = git_rmq rabbitmq-jms-topic-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_lvc = git_rmq rabbitmq-lvc-plugin $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_management = git_rmq rabbitmq-management $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_management_agent = git_rmq rabbitmq-management-agent $(current_rmq_ref) $(base_rmq_ref) master
Expand All @@ -62,6 +63,7 @@ dep_rabbitmq_stomp = git_rmq rabbitmq-stomp $(current_rmq_ref
dep_rabbitmq_toke = git_rmq rabbitmq-toke $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_top = git_rmq rabbitmq-top $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_tracing = git_rmq rabbitmq-tracing $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_trust_store = git_rmq rabbitmq-trust-store $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_test = git_rmq rabbitmq-test $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_web_dispatch = git_rmq rabbitmq-web-dispatch $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_web_stomp = git_rmq rabbitmq-web-stomp $(current_rmq_ref) $(base_rmq_ref) master
Expand Down Expand Up @@ -99,6 +101,7 @@ RABBITMQ_COMPONENTS = amqp_client \
rabbitmq_federation \
rabbitmq_federation_management \
rabbitmq_java_client \
rabbitmq_jms_topic_exchange \
rabbitmq_lvc \
rabbitmq_management \
rabbitmq_management_agent \
Expand All @@ -118,6 +121,7 @@ RABBITMQ_COMPONENTS = amqp_client \
rabbitmq_toke \
rabbitmq_top \
rabbitmq_tracing \
rabbitmq_trust_store \
rabbitmq_web_dispatch \
rabbitmq_web_mqtt \
rabbitmq_web_mqtt_examples \
Expand Down
31 changes: 11 additions & 20 deletions src/rabbit_delayed_message.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
-define(SERVER, ?MODULE).
-define(TABLE_NAME, append_to_atom(?MODULE, node())).
-define(INDEX_TABLE_NAME, append_to_atom(?TABLE_NAME, "_index")).
-define(IDLE_DELAY_MS, 60 * 60 * 1000).

-record(state, {timer}).

Expand Down Expand Up @@ -127,14 +128,14 @@ handle_call({delay_message, Exchange, Delivery, Delay},
handle_call(_Req, _From, State) ->
{reply, unknown_request, State}.

handle_cast(go, State = #state{timer = CurrTimer}) ->
maybe_delay_first(CurrTimer),
{noreply, State};
handle_cast(go, State = #state{}) ->
delay_first(),
{noreply, State#state{timer = delay_first()}};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two delay_first in a row. Is that on purpose?

handle_cast(_C, State) ->
{noreply, State}.

handle_info({timeout, _TimerRef, {deliver, Key}},
State = #state{timer = CurrTimer}) ->
State = #state{}) ->
case mnesia:dirty_read(?TABLE_NAME, Key) of
[] ->
ok;
Expand All @@ -144,7 +145,7 @@ handle_info({timeout, _TimerRef, {deliver, Key}},
mnesia:dirty_delete(?INDEX_TABLE_NAME, Key)
end,

{noreply, State#state{timer = maybe_delay_first(CurrTimer)}};
{noreply, State#state{timer = delay_first()}};
handle_info(_I, State) ->
{noreply, State}.

Expand All @@ -155,16 +156,17 @@ code_change(_, State, _) -> {ok, State}.

%%--------------------------------------------------------------------

maybe_delay_first(CurrTimer) ->
delay_first() ->
case mnesia:dirty_first(?INDEX_TABLE_NAME) of
%% destructuring to prevent matching '$end_of_table'
#delay_key{timestamp = FirstTS} = Key2 ->
%% there are messages that expired and need to be delivered
Now = time_compat:erlang_system_time(milli_seconds),
start_timer(FirstTS - Now, Key2);
_ ->
%% nothing to do
CurrTimer
%% To avoid ugly special-cases on the enqueue path, always
%% keep a timer. We set it far in the future to keep it cheap.
start_timer(?IDLE_DELAY_MS, 'idle_timeout')
end.

route(#delay_key{exchange = Ex}, Deliveries) ->
Expand All @@ -185,18 +187,7 @@ internal_delay_message(CurrTimer, Exchange, Delivery, Delay) ->
make_delay(DelayTS, Exchange, Delivery)),
case erlang:read_timer(CurrTimer) of
false ->
%% last timer expired, we set a new timer for
%% the next message to be delivered
case mnesia:dirty_first(?INDEX_TABLE_NAME) of
%% destructuring to prevent matching '$end_of_table'
#delay_key{timestamp = FirstTS} = Key
when FirstTS < DelayTS ->
%% there are messages that expired and need to be delivered
{ok, start_timer(FirstTS - Now, Key)};
_ ->
%% empty table or DelayTS <= FirstTS
{ok, start_timer(Delay, make_key(DelayTS, Exchange))}
end;
{ok, CurrTimer};
Copy link
Contributor

@hairyhum hairyhum Jun 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If for some reason (e.g. supervisor restart) go is never called, a timer will never be started. This can make the plugin completely unusable.
Maybe the long-running timer should be started here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something, but I don't see how that would work.

As I understand it, there are two cases where we hit this code path.

The first case is where no timer has ever been started and we need to start one to get the system moving. I thought that my changes to the go handler would address that. I don't fully understand how the plugin lifecycle works in all scenarios, so I'm willing to take your word for it that my change does not fix it.

The second case is where there was a timer, but it has expired because the message(s) at the head of the queue are ready for publication. The timer will be restarted when the timeout handler is called, but in the meantime there could be many requests to publish messages. I want to avoid starting a timer in that case, since it makes the publication code path very slow. Also, those extra timers probably aren't good for performance, either.

My PR was intended to fix the second case while not breaking the first. Based on your comment, it sounds like I have actually broken the first. Your suggestion might help fix the first case, but I don't see how it applies to the second.


Perhaps I need to take the more complicated approach I had considered earlier. We could store an extra boolean in the state that tells us whether or not a timer has been started. That would help differentiate the two cases so we could handle them differently, though it is a little more complicated.

It's not my favorite solution, but I think it could work. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go is called only on application startup. But gen_server can be restarted any moment (by supervisor) without calling go, which is not the best approach, but it worked before this changes just because timer could be started by publishing message.
When starting a new gen_server, timer in state is being set to make_ref, which mimics expired timer. That also is not a good solution because we don't know if timer is expired already or have never been started.
Instead of adding a new state variable you can set timer to something meaningful for not yet started timer. Since this new version is using timer state field only once in internal_delay_message it can be checked there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #53 to explore this approach. When we track the unset timer explicitly, we no longer need the idle timer, so the change looks quite different from what's proposed in this PR.

CurrMS when Delay < CurrMS ->
%% Current timer lasts longer that new message delay
erlang:cancel_timer(CurrTimer),
Expand Down