@@ -240,7 +240,7 @@ consume(Q, Spec,
240
240
# stream_client {filtering_supported = FilteringSupported } = QState0 ) when ? amqqueue_is_stream (Q ) ->
241
241
% % Messages should include the offset as a custom header.
242
242
case check_queue_exists_in_local_node (Q ) of
243
- ok ->
243
+ { ok , LocalPid } ->
244
244
#{no_ack := NoAck ,
245
245
channel_pid := ChPid ,
246
246
prefetch_count := ConsumerPrefetchCount ,
@@ -253,21 +253,20 @@ consume(Q, Spec,
253
253
{error , _ } = Err ->
254
254
Err ;
255
255
{ok , OffsetSpec } ->
256
- _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
257
- rabbit_core_metrics :consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
258
- not NoAck , QName ,
259
- ConsumerPrefetchCount , false ,
260
- up , Args ),
261
- % % FIXME: reply needs to be sent before the stream begins sending
262
- % % really it should be sent by the stream queue process like classic queues
263
- % % do
264
- maybe_send_reply (ChPid , OkMsg ),
265
256
FilterSpec = filter_spec (Args ),
266
257
case {FilterSpec , FilteringSupported } of
267
258
{#{filter_spec := _ }, false } ->
268
259
{protocol_error , precondition_failed , " Filtering is not supported" , []};
269
260
_ ->
270
- begin_stream (QState0 , ConsumerTag , OffsetSpec ,
261
+ QState = QState0 # stream_client {local_pid = LocalPid },
262
+ rabbit_core_metrics :consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
263
+ not NoAck , QName ,
264
+ ConsumerPrefetchCount , false ,
265
+ up , Args ),
266
+ % % reply needs to be sent before the stream begins sending
267
+ maybe_send_reply (ChPid , OkMsg ),
268
+ _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
269
+ begin_stream (QState , ConsumerTag , OffsetSpec ,
271
270
ConsumerPrefetchCount , FilterSpec )
272
271
end
273
272
end ;
@@ -410,7 +409,8 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
410
409
true ->
411
410
case Readers1 of
412
411
#{CTag := # stream {credit = Credit1 } = Str2 } ->
413
- {Readers0 #{CTag => Str2 # stream {credit = 0 }}, [{send_drained , {CTag , Credit1 }}]};
412
+ {Readers0 #{CTag => Str2 # stream {credit = 0 }},
413
+ [{send_drained , {CTag , Credit1 }}]};
414
414
_ ->
415
415
{Readers1 , []}
416
416
end ;
@@ -997,7 +997,7 @@ check_queue_exists_in_local_node(Q) ->
997
997
#{name := StreamId } = amqqueue :get_type_state (Q ),
998
998
case rabbit_stream_coordinator :local_pid (StreamId ) of
999
999
{ok , Pid } when is_pid (Pid ) ->
1000
- ok ;
1000
+ { ok , Pid } ;
1001
1001
_ ->
1002
1002
{protocol_error , precondition_failed ,
1003
1003
" queue '~ts ' does not have a replica on the local node" ,
0 commit comments