@@ -237,10 +237,11 @@ consume(Q, #{limiter_active := true}, _State)
237
237
when ? amqqueue_is_stream (Q ) ->
238
238
{error , global_qos_not_supported_for_queue_type };
239
239
consume (Q , Spec ,
240
- # stream_client {filtering_supported = FilteringSupported } = QState0 ) when ? amqqueue_is_stream (Q ) ->
240
+ # stream_client {filtering_supported = FilteringSupported } = QState0 )
241
+ when ? amqqueue_is_stream (Q ) ->
241
242
% % Messages should include the offset as a custom header.
242
- case check_queue_exists_in_local_node ( Q ) of
243
- {ok , LocalPid } ->
243
+ case get_local_pid ( QState0 ) of
244
+ {LocalPid , QState } when is_pid ( LocalPid ) ->
244
245
#{no_ack := NoAck ,
245
246
channel_pid := ChPid ,
246
247
prefetch_count := ConsumerPrefetchCount ,
@@ -249,16 +250,17 @@ consume(Q, Spec,
249
250
args := Args ,
250
251
ok_msg := OkMsg } = Spec ,
251
252
QName = amqqueue :get_name (Q ),
252
- case parse_offset_arg (rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
253
+ case parse_offset_arg (
254
+ rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
253
255
{error , _ } = Err ->
254
256
Err ;
255
257
{ok , OffsetSpec } ->
256
258
FilterSpec = filter_spec (Args ),
257
259
case {FilterSpec , FilteringSupported } of
258
260
{#{filter_spec := _ }, false } ->
259
- {protocol_error , precondition_failed , " Filtering is not supported" , []};
261
+ {protocol_error , precondition_failed ,
262
+ " Filtering is not supported" , []};
260
263
_ ->
261
- QState = QState0 # stream_client {local_pid = LocalPid },
262
264
rabbit_core_metrics :consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
263
265
not NoAck , QName ,
264
266
ConsumerPrefetchCount , false ,
@@ -270,8 +272,10 @@ consume(Q, Spec,
270
272
ConsumerPrefetchCount , FilterSpec )
271
273
end
272
274
end ;
273
- Err ->
274
- Err
275
+ {undefined , _ } ->
276
+ {protocol_error , precondition_failed ,
277
+ " queue '~ts ' does not have a running replica on the local node" ,
278
+ [rabbit_misc :rs (amqqueue :get_name (Q ))]}
275
279
end .
276
280
277
281
-spec parse_offset_arg (undefined |
@@ -337,8 +341,7 @@ get_local_pid(#stream_client{local_pid = Pid} = State)
337
341
get_local_pid (# stream_client {leader = Pid } = State )
338
342
when is_pid (Pid ) andalso node (Pid ) == node () ->
339
343
{Pid , State # stream_client {local_pid = Pid }};
340
- get_local_pid (# stream_client {stream_id = StreamId ,
341
- local_pid = undefined } = State ) ->
344
+ get_local_pid (# stream_client {stream_id = StreamId } = State ) ->
342
345
% % query local coordinator to get pid
343
346
case rabbit_stream_coordinator :local_pid (StreamId ) of
344
347
{ok , Pid } ->
@@ -347,34 +350,31 @@ get_local_pid(#stream_client{stream_id = StreamId,
347
350
{undefined , State }
348
351
end .
349
352
350
- begin_stream (# stream_client {name = QName , readers = Readers0 } = State0 ,
351
- Tag , Offset , Max , Options ) ->
352
- {LocalPid , State } = get_local_pid (State0 ),
353
- case LocalPid of
354
- undefined ->
355
- {error , no_local_stream_replica_available };
356
- _ ->
357
- CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
358
- {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec , Options ),
359
- NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
360
- osiris :register_offset_listener (LocalPid , NextOffset ),
361
- % % TODO: avoid double calls to the same process
362
- StartOffset = case Offset of
363
- first -> NextOffset ;
364
- last -> NextOffset ;
365
- next -> NextOffset ;
366
- {timestamp , _ } -> NextOffset ;
367
- _ -> Offset
368
- end ,
369
- Str0 = # stream {credit = Max ,
370
- start_offset = StartOffset ,
371
- listening_offset = NextOffset ,
372
- log = Seg0 ,
373
- max = Max ,
374
- reader_options = Options },
375
- {ok , State # stream_client {local_pid = LocalPid ,
376
- readers = Readers0 #{Tag => Str0 }}}
377
- end .
353
+ begin_stream (# stream_client {name = QName ,
354
+ readers = Readers0 ,
355
+ local_pid = LocalPid } = State ,
356
+ Tag , Offset , Max , Options )
357
+ when is_pid (LocalPid ) ->
358
+ CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
359
+ {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec , Options ),
360
+ NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
361
+ osiris :register_offset_listener (LocalPid , NextOffset ),
362
+ % % TODO: avoid double calls to the same process
363
+ StartOffset = case Offset of
364
+ first -> NextOffset ;
365
+ last -> NextOffset ;
366
+ next -> NextOffset ;
367
+ {timestamp , _ } -> NextOffset ;
368
+ _ -> Offset
369
+ end ,
370
+ Str0 = # stream {credit = Max ,
371
+ start_offset = StartOffset ,
372
+ listening_offset = NextOffset ,
373
+ log = Seg0 ,
374
+ max = Max ,
375
+ reader_options = Options },
376
+ {ok , State # stream_client {local_pid = LocalPid ,
377
+ readers = Readers0 #{Tag => Str0 }}}.
378
378
379
379
cancel (_Q , ConsumerTag , OkMsg , ActingUser , # stream_client {readers = Readers0 ,
380
380
name = QName } = State ) ->
@@ -394,8 +394,8 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
394
394
end .
395
395
396
396
credit (QName , CTag , Credit , Drain , # stream_client {readers = Readers0 ,
397
- name = Name ,
398
- local_pid = LocalPid } = State ) ->
397
+ name = Name ,
398
+ local_pid = LocalPid } = State ) ->
399
399
{Readers1 , Msgs } = case Readers0 of
400
400
#{CTag := # stream {credit = Credit0 } = Str0 } ->
401
401
Str1 = Str0 # stream {credit = Credit0 + Credit },
@@ -443,7 +443,7 @@ deliver0(MsgId, Msg,
443
443
soft_limit = SftLmt ,
444
444
slow = Slow0 ,
445
445
filtering_supported = FilteringSupported } = State ,
446
- Actions0 ) ->
446
+ Actions0 ) ->
447
447
ok = osiris :write (LeaderPid , WriterId , Seq ,
448
448
stream_message (Msg , FilteringSupported )),
449
449
Correlation = case MsgId of
@@ -993,17 +993,6 @@ stream_name(#resource{virtual_host = VHost, name = Name}) ->
993
993
recover (Q ) ->
994
994
{ok , Q }.
995
995
996
- check_queue_exists_in_local_node (Q ) ->
997
- #{name := StreamId } = amqqueue :get_type_state (Q ),
998
- case rabbit_stream_coordinator :local_pid (StreamId ) of
999
- {ok , Pid } when is_pid (Pid ) ->
1000
- {ok , Pid };
1001
- _ ->
1002
- {protocol_error , precondition_failed ,
1003
- " queue '~ts ' does not have a replica on the local node" ,
1004
- [rabbit_misc :rs (amqqueue :get_name (Q ))]}
1005
- end .
1006
-
1007
996
maybe_send_reply (_ChPid , undefined ) -> ok ;
1008
997
maybe_send_reply (ChPid , Msg ) -> ok = rabbit_channel :send_command (ChPid , Msg ).
1009
998
0 commit comments