Skip to content

Commit 269a2c1

Browse files
Merge pull request #1281 from rabbitmq/rabbitmq-common-208-master
Use delegate:invoke and delegate:invoke_no_result
2 parents 572475e + 1b60636 commit 269a2c1

File tree

2 files changed

+38
-32
lines changed

2 files changed

+38
-32
lines changed

src/rabbit_amqqueue.erl

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -627,12 +627,12 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
627627
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
628628

629629
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
630-
info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info).
630+
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
631631

632632
info(Q = #amqqueue{ state = crashed }, Items) ->
633633
info_down(Q, Items, crashed);
634634
info(#amqqueue{ pid = QPid }, Items) ->
635-
case delegate:call(QPid, {info, Items}) of
635+
case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
636636
{ok, Res} -> Res;
637637
{error, Error} -> throw(Error)
638638
end.
@@ -693,7 +693,8 @@ force_event_refresh(Ref) ->
693693
notify_policy_changed(#amqqueue{pid = QPid}) ->
694694
gen_server2:cast(QPid, policy_changed).
695695

696-
consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
696+
consumers(#amqqueue{ pid = QPid }) ->
697+
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
697698

698699
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
699700

@@ -721,7 +722,7 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) ->
721722
AckRequired, Prefetch, Args]) ||
722723
{ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)].
723724

724-
stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
725+
stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).
725726

726727
pid_of(#amqqueue{pid = Pid}) -> Pid.
727728
pid_of(VHost, QueueName) ->
@@ -739,7 +740,7 @@ delete_immediately(QPids) ->
739740
ok.
740741

741742
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) ->
742-
delegate:call(QPid, {delete, IfUnused, IfEmpty, ActingUser}).
743+
delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}).
743744

744745
delete_crashed(Q) ->
745746
delete_crashed(Q, ?INTERNAL_USER).
@@ -752,21 +753,24 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }, ActingUser) ->
752753
BQ:delete_crashed(Q),
753754
ok = internal_delete(QName, ActingUser).
754755

755-
purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
756+
purge(#amqqueue{ pid = QPid }) ->
757+
delegate:invoke(QPid, {gen_server2, call, [purge, infinity]}).
756758

757-
requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
759+
requeue(QPid, MsgIds, ChPid) ->
760+
delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}).
758761

759-
ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
762+
ack(QPid, MsgIds, ChPid) ->
763+
delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}).
760764

761765
reject(QPid, Requeue, MsgIds, ChPid) ->
762-
delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}).
766+
delegate:invoke_no_result(QPid, {gen_server2, cast, [{reject, Requeue, MsgIds, ChPid}]}).
763767

764768
notify_down_all(QPids, ChPid) ->
765769
notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT).
766770

767771
notify_down_all(QPids, ChPid, Timeout) ->
768-
case rpc:call(node(), delegate, call,
769-
[QPids, {notify_down, ChPid}], Timeout) of
772+
case rpc:call(node(), delegate, invoke,
773+
[QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of
770774
{badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}};
771775
{badrpc, Reason} -> {error, Reason};
772776
{_, Bads} ->
@@ -782,35 +786,37 @@ notify_down_all(QPids, ChPid, Timeout) ->
782786
end.
783787

784788
activate_limit_all(QPids, ChPid) ->
785-
delegate:cast(QPids, {activate_limit, ChPid}).
789+
delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}).
786790

787791
credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
788-
delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}).
792+
delegate:invoke_no_result(QPid, {gen_server2, cast, [{credit, ChPid, CTag, Credit, Drain}]}).
789793

790794
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
791-
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
795+
delegate:invoke(QPid, {gen_server2, call, [{basic_get, ChPid, NoAck, LimiterPid}, infinity]}).
792796

793797
basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
794798
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
795799
ExclusiveConsume, Args, OkMsg, ActingUser) ->
796800
ok = check_consume_arguments(QName, Args),
797-
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
798-
ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
799-
Args, OkMsg, ActingUser}).
801+
delegate:invoke(QPid, {gen_server2, call,
802+
[{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
803+
ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
804+
Args, OkMsg, ActingUser}, infinity]}).
800805

801806
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg, ActingUser) ->
802-
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}).
807+
delegate:invoke(QPid, {gen_server2, call,
808+
[{basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, infinity]}).
803809

804810
notify_decorators(#amqqueue{pid = QPid}) ->
805-
delegate:cast(QPid, notify_decorators).
811+
delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}).
806812

807813
notify_sent(QPid, ChPid) ->
808814
rabbit_amqqueue_common:notify_sent(QPid, ChPid).
809815

810816
notify_sent_queue_down(QPid) ->
811817
rabbit_amqqueue_common:notify_sent_queue_down(QPid).
812818

813-
resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
819+
resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{resume, ChPid}]}).
814820

815821
internal_delete1(QueueName, OnlyDurable) ->
816822
ok = mnesia:delete({rabbit_queue, QueueName}),
@@ -907,12 +913,17 @@ set_ram_duration_target(QPid, Duration) ->
907913
set_maximum_since_use(QPid, Age) ->
908914
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
909915

910-
update_mirroring(QPid) -> ok = delegate:cast(QPid, update_mirroring).
916+
update_mirroring(QPid) ->
917+
ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}).
911918

912-
sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, sync_mirrors);
913-
sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
914-
cancel_sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, cancel_sync_mirrors);
915-
cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
919+
sync_mirrors(#amqqueue{pid = QPid}) ->
920+
delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]});
921+
sync_mirrors(QPid) ->
922+
delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}).
923+
cancel_sync_mirrors(#amqqueue{pid = QPid}) ->
924+
delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]});
925+
cancel_sync_mirrors(QPid) ->
926+
delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}).
916927

917928
is_mirrored(Q) ->
918929
rabbit_mirror_queue_misc:is_mirrored(Q).
@@ -1031,8 +1042,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) ->
10311042
%% done with it.
10321043
MMsg = {deliver, Delivery, false},
10331044
SMsg = {deliver, Delivery, true},
1034-
delegate:cast(MPids, MMsg),
1035-
delegate:cast(SPids, SMsg),
1045+
delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}),
1046+
delegate:invoke_no_result(SPids, {gen_server2, cast, [SMsg]}),
10361047
QPids.
10371048

10381049
qpids([]) -> {[], []}; %% optimisation

test/cluster_SUITE.erl

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,6 @@ delegates_async1(_Config, SecondaryNode) ->
123123
ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
124124
await_response(2),
125125

126-
LocalPids = spawn_responders(node(), Responder, 10),
127-
RemotePids = spawn_responders(SecondaryNode, Responder, 10),
128-
ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender),
129-
await_response(20),
130-
131126
passed.
132127

133128
delegates_sync(Config) ->

0 commit comments

Comments
 (0)