@@ -237,10 +237,11 @@ consume(Q, #{limiter_active := true}, _State)
237
237
when ? amqqueue_is_stream (Q ) ->
238
238
{error , global_qos_not_supported_for_queue_type };
239
239
consume (Q , Spec ,
240
- # stream_client {filtering_supported = FilteringSupported } = QState0 ) when ? amqqueue_is_stream (Q ) ->
240
+ # stream_client {filtering_supported = FilteringSupported } = QState0 )
241
+ when ? amqqueue_is_stream (Q ) ->
241
242
% % Messages should include the offset as a custom header.
242
- case check_queue_exists_in_local_node ( Q ) of
243
- ok ->
243
+ case get_local_pid ( QState0 ) of
244
+ { LocalPid , QState } when is_pid ( LocalPid ) ->
244
245
#{no_ack := NoAck ,
245
246
channel_pid := ChPid ,
246
247
prefetch_count := ConsumerPrefetchCount ,
@@ -249,30 +250,32 @@ consume(Q, Spec,
249
250
args := Args ,
250
251
ok_msg := OkMsg } = Spec ,
251
252
QName = amqqueue :get_name (Q ),
252
- case parse_offset_arg (rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
253
+ case parse_offset_arg (
254
+ rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
253
255
{error , _ } = Err ->
254
256
Err ;
255
257
{ok , OffsetSpec } ->
256
- _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
257
- rabbit_core_metrics :consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
258
- not NoAck , QName ,
259
- ConsumerPrefetchCount , false ,
260
- up , Args ),
261
- % % FIXME: reply needs to be sent before the stream begins sending
262
- % % really it should be sent by the stream queue process like classic queues
263
- % % do
264
- maybe_send_reply (ChPid , OkMsg ),
265
258
FilterSpec = filter_spec (Args ),
266
259
case {FilterSpec , FilteringSupported } of
267
260
{#{filter_spec := _ }, false } ->
268
- {protocol_error , precondition_failed , " Filtering is not supported" , []};
261
+ {protocol_error , precondition_failed ,
262
+ " Filtering is not supported" , []};
269
263
_ ->
270
- begin_stream (QState0 , ConsumerTag , OffsetSpec ,
264
+ rabbit_core_metrics :consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
265
+ not NoAck , QName ,
266
+ ConsumerPrefetchCount , false ,
267
+ up , Args ),
268
+ % % reply needs to be sent before the stream begins sending
269
+ maybe_send_reply (ChPid , OkMsg ),
270
+ _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
271
+ begin_stream (QState , ConsumerTag , OffsetSpec ,
271
272
ConsumerPrefetchCount , FilterSpec )
272
273
end
273
274
end ;
274
- Err ->
275
- Err
275
+ {undefined , _ } ->
276
+ {protocol_error , precondition_failed ,
277
+ " queue '~ts ' does not have a running replica on the local node" ,
278
+ [rabbit_misc :rs (amqqueue :get_name (Q ))]}
276
279
end .
277
280
278
281
-spec parse_offset_arg (undefined |
@@ -338,8 +341,7 @@ get_local_pid(#stream_client{local_pid = Pid} = State)
338
341
get_local_pid (# stream_client {leader = Pid } = State )
339
342
when is_pid (Pid ) andalso node (Pid ) == node () ->
340
343
{Pid , State # stream_client {local_pid = Pid }};
341
- get_local_pid (# stream_client {stream_id = StreamId ,
342
- local_pid = undefined } = State ) ->
344
+ get_local_pid (# stream_client {stream_id = StreamId } = State ) ->
343
345
% % query local coordinator to get pid
344
346
case rabbit_stream_coordinator :local_pid (StreamId ) of
345
347
{ok , Pid } ->
@@ -348,34 +350,31 @@ get_local_pid(#stream_client{stream_id = StreamId,
348
350
{undefined , State }
349
351
end .
350
352
351
- begin_stream (# stream_client {name = QName , readers = Readers0 } = State0 ,
352
- Tag , Offset , Max , Options ) ->
353
- {LocalPid , State } = get_local_pid (State0 ),
354
- case LocalPid of
355
- undefined ->
356
- {error , no_local_stream_replica_available };
357
- _ ->
358
- CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
359
- {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec , Options ),
360
- NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
361
- osiris :register_offset_listener (LocalPid , NextOffset ),
362
- % % TODO: avoid double calls to the same process
363
- StartOffset = case Offset of
364
- first -> NextOffset ;
365
- last -> NextOffset ;
366
- next -> NextOffset ;
367
- {timestamp , _ } -> NextOffset ;
368
- _ -> Offset
369
- end ,
370
- Str0 = # stream {credit = Max ,
371
- start_offset = StartOffset ,
372
- listening_offset = NextOffset ,
373
- log = Seg0 ,
374
- max = Max ,
375
- reader_options = Options },
376
- {ok , State # stream_client {local_pid = LocalPid ,
377
- readers = Readers0 #{Tag => Str0 }}}
378
- end .
353
+ begin_stream (# stream_client {name = QName ,
354
+ readers = Readers0 ,
355
+ local_pid = LocalPid } = State ,
356
+ Tag , Offset , Max , Options )
357
+ when is_pid (LocalPid ) ->
358
+ CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
359
+ {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec , Options ),
360
+ NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
361
+ osiris :register_offset_listener (LocalPid , NextOffset ),
362
+ % % TODO: avoid double calls to the same process
363
+ StartOffset = case Offset of
364
+ first -> NextOffset ;
365
+ last -> NextOffset ;
366
+ next -> NextOffset ;
367
+ {timestamp , _ } -> NextOffset ;
368
+ _ -> Offset
369
+ end ,
370
+ Str0 = # stream {credit = Max ,
371
+ start_offset = StartOffset ,
372
+ listening_offset = NextOffset ,
373
+ log = Seg0 ,
374
+ max = Max ,
375
+ reader_options = Options },
376
+ {ok , State # stream_client {local_pid = LocalPid ,
377
+ readers = Readers0 #{Tag => Str0 }}}.
379
378
380
379
cancel (_Q , ConsumerTag , OkMsg , ActingUser , # stream_client {readers = Readers0 ,
381
380
name = QName } = State ) ->
@@ -395,8 +394,8 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
395
394
end .
396
395
397
396
credit (QName , CTag , Credit , Drain , # stream_client {readers = Readers0 ,
398
- name = Name ,
399
- local_pid = LocalPid } = State ) ->
397
+ name = Name ,
398
+ local_pid = LocalPid } = State ) ->
400
399
{Readers1 , Msgs } = case Readers0 of
401
400
#{CTag := # stream {credit = Credit0 } = Str0 } ->
402
401
Str1 = Str0 # stream {credit = Credit0 + Credit },
@@ -410,7 +409,8 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
410
409
true ->
411
410
case Readers1 of
412
411
#{CTag := # stream {credit = Credit1 } = Str2 } ->
413
- {Readers0 #{CTag => Str2 # stream {credit = 0 }}, [{send_drained , {CTag , Credit1 }}]};
412
+ {Readers0 #{CTag => Str2 # stream {credit = 0 }},
413
+ [{send_drained , {CTag , Credit1 }}]};
414
414
_ ->
415
415
{Readers1 , []}
416
416
end ;
@@ -443,7 +443,7 @@ deliver0(MsgId, Msg,
443
443
soft_limit = SftLmt ,
444
444
slow = Slow0 ,
445
445
filtering_supported = FilteringSupported } = State ,
446
- Actions0 ) ->
446
+ Actions0 ) ->
447
447
ok = osiris :write (LeaderPid , WriterId , Seq ,
448
448
stream_message (Msg , FilteringSupported )),
449
449
Correlation = case MsgId of
@@ -993,17 +993,6 @@ stream_name(#resource{virtual_host = VHost, name = Name}) ->
993
993
recover (Q ) ->
994
994
{ok , Q }.
995
995
996
- check_queue_exists_in_local_node (Q ) ->
997
- #{name := StreamId } = amqqueue :get_type_state (Q ),
998
- case rabbit_stream_coordinator :local_pid (StreamId ) of
999
- {ok , Pid } when is_pid (Pid ) ->
1000
- ok ;
1001
- _ ->
1002
- {protocol_error , precondition_failed ,
1003
- " queue '~ts ' does not have a replica on the local node" ,
1004
- [rabbit_misc :rs (amqqueue :get_name (Q ))]}
1005
- end .
1006
-
1007
996
maybe_send_reply (_ChPid , undefined ) -> ok ;
1008
997
maybe_send_reply (ChPid , Msg ) -> ok = rabbit_channel :send_command (ChPid , Msg ).
1009
998
0 commit comments