@@ -237,10 +237,11 @@ consume(Q, #{limiter_active := true}, _State)
237
237
when ? amqqueue_is_stream (Q ) ->
238
238
{error , global_qos_not_supported_for_queue_type };
239
239
consume (Q , Spec ,
240
- # stream_client {filtering_supported = FilteringSupported } = QState0 ) when ? amqqueue_is_stream (Q ) ->
240
+ # stream_client {filtering_supported = FilteringSupported } = QState0 )
241
+ when ? amqqueue_is_stream (Q ) ->
241
242
% % Messages should include the offset as a custom header.
242
- case check_queue_exists_in_local_node ( Q ) of
243
- ok ->
243
+ case get_local_pid ( QState0 ) of
244
+ { LocalPid , QState } when is_pid ( LocalPid ) ->
244
245
#{no_ack := NoAck ,
245
246
channel_pid := ChPid ,
246
247
prefetch_count := ConsumerPrefetchCount ,
@@ -249,30 +250,34 @@ consume(Q, Spec,
249
250
args := Args ,
250
251
ok_msg := OkMsg } = Spec ,
251
252
QName = amqqueue :get_name (Q ),
252
- case parse_offset_arg (rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
253
+ case parse_offset_arg (
254
+ rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
253
255
{error , _ } = Err ->
254
256
Err ;
255
257
{ok , OffsetSpec } ->
256
- _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
257
- rabbit_core_metrics :consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
258
- not NoAck , QName ,
259
- ConsumerPrefetchCount , false ,
260
- up , Args ),
261
- % % FIXME: reply needs to be sent before the stream begins sending
262
- % % really it should be sent by the stream queue process like classic queues
263
- % % do
264
- maybe_send_reply (ChPid , OkMsg ),
265
258
FilterSpec = filter_spec (Args ),
266
259
case {FilterSpec , FilteringSupported } of
267
260
{#{filter_spec := _ }, false } ->
268
- {protocol_error , precondition_failed , " Filtering is not supported" , []};
261
+ {protocol_error , precondition_failed ,
262
+ " Filtering is not supported" , []};
269
263
_ ->
270
- begin_stream (QState0 , ConsumerTag , OffsetSpec ,
264
+ rabbit_core_metrics :consumer_created (ChPid , ConsumerTag ,
265
+ ExclusiveConsume ,
266
+ not NoAck , QName ,
267
+ ConsumerPrefetchCount ,
268
+ false , up , Args ),
269
+ % % reply needs to be sent before the stream
270
+ % % begins sending
271
+ maybe_send_reply (ChPid , OkMsg ),
272
+ _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
273
+ begin_stream (QState , ConsumerTag , OffsetSpec ,
271
274
ConsumerPrefetchCount , FilterSpec )
272
275
end
273
276
end ;
274
- Err ->
275
- Err
277
+ {undefined , _ } ->
278
+ {protocol_error , precondition_failed ,
279
+ " queue '~ts ' does not have a running replica on the local node" ,
280
+ [rabbit_misc :rs (amqqueue :get_name (Q ))]}
276
281
end .
277
282
278
283
-spec parse_offset_arg (undefined |
@@ -338,8 +343,7 @@ get_local_pid(#stream_client{local_pid = Pid} = State)
338
343
get_local_pid (# stream_client {leader = Pid } = State )
339
344
when is_pid (Pid ) andalso node (Pid ) == node () ->
340
345
{Pid , State # stream_client {local_pid = Pid }};
341
- get_local_pid (# stream_client {stream_id = StreamId ,
342
- local_pid = undefined } = State ) ->
346
+ get_local_pid (# stream_client {stream_id = StreamId } = State ) ->
343
347
% % query local coordinator to get pid
344
348
case rabbit_stream_coordinator :local_pid (StreamId ) of
345
349
{ok , Pid } ->
@@ -348,34 +352,30 @@ get_local_pid(#stream_client{stream_id = StreamId,
348
352
{undefined , State }
349
353
end .
350
354
351
- begin_stream (# stream_client {name = QName , readers = Readers0 } = State0 ,
352
- Tag , Offset , Max , Options ) ->
353
- {LocalPid , State } = get_local_pid (State0 ),
354
- case LocalPid of
355
- undefined ->
356
- {error , no_local_stream_replica_available };
357
- _ ->
358
- CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
359
- {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec , Options ),
360
- NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
361
- osiris :register_offset_listener (LocalPid , NextOffset ),
362
- % % TODO: avoid double calls to the same process
363
- StartOffset = case Offset of
364
- first -> NextOffset ;
365
- last -> NextOffset ;
366
- next -> NextOffset ;
367
- {timestamp , _ } -> NextOffset ;
368
- _ -> Offset
369
- end ,
370
- Str0 = # stream {credit = Max ,
371
- start_offset = StartOffset ,
372
- listening_offset = NextOffset ,
373
- log = Seg0 ,
374
- max = Max ,
375
- reader_options = Options },
376
- {ok , State # stream_client {local_pid = LocalPid ,
377
- readers = Readers0 #{Tag => Str0 }}}
378
- end .
355
+ begin_stream (# stream_client {name = QName ,
356
+ readers = Readers0 ,
357
+ local_pid = LocalPid } = State ,
358
+ Tag , Offset , Max , Options )
359
+ when is_pid (LocalPid ) ->
360
+ CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
361
+ {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec , Options ),
362
+ NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
363
+ osiris :register_offset_listener (LocalPid , NextOffset ),
364
+ StartOffset = case Offset of
365
+ first -> NextOffset ;
366
+ last -> NextOffset ;
367
+ next -> NextOffset ;
368
+ {timestamp , _ } -> NextOffset ;
369
+ _ -> Offset
370
+ end ,
371
+ Str0 = # stream {credit = Max ,
372
+ start_offset = StartOffset ,
373
+ listening_offset = NextOffset ,
374
+ log = Seg0 ,
375
+ max = Max ,
376
+ reader_options = Options },
377
+ {ok , State # stream_client {local_pid = LocalPid ,
378
+ readers = Readers0 #{Tag => Str0 }}}.
379
379
380
380
cancel (_Q , ConsumerTag , OkMsg , ActingUser , # stream_client {readers = Readers0 ,
381
381
name = QName } = State ) ->
@@ -395,8 +395,8 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
395
395
end .
396
396
397
397
credit (QName , CTag , Credit , Drain , # stream_client {readers = Readers0 ,
398
- name = Name ,
399
- local_pid = LocalPid } = State ) ->
398
+ name = Name ,
399
+ local_pid = LocalPid } = State ) ->
400
400
{Readers1 , Msgs } = case Readers0 of
401
401
#{CTag := # stream {credit = Credit0 } = Str0 } ->
402
402
Str1 = Str0 # stream {credit = Credit0 + Credit },
@@ -410,7 +410,8 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
410
410
true ->
411
411
case Readers1 of
412
412
#{CTag := # stream {credit = Credit1 } = Str2 } ->
413
- {Readers0 #{CTag => Str2 # stream {credit = 0 }}, [{send_drained , {CTag , Credit1 }}]};
413
+ {Readers0 #{CTag => Str2 # stream {credit = 0 }},
414
+ [{send_drained , {CTag , Credit1 }}]};
414
415
_ ->
415
416
{Readers1 , []}
416
417
end ;
@@ -443,7 +444,7 @@ deliver0(MsgId, Msg,
443
444
soft_limit = SftLmt ,
444
445
slow = Slow0 ,
445
446
filtering_supported = FilteringSupported } = State ,
446
- Actions0 ) ->
447
+ Actions0 ) ->
447
448
ok = osiris :write (LeaderPid , WriterId , Seq ,
448
449
stream_message (Msg , FilteringSupported )),
449
450
Correlation = case MsgId of
@@ -993,17 +994,6 @@ stream_name(#resource{virtual_host = VHost, name = Name}) ->
993
994
recover (Q ) ->
994
995
{ok , Q }.
995
996
996
- check_queue_exists_in_local_node (Q ) ->
997
- #{name := StreamId } = amqqueue :get_type_state (Q ),
998
- case rabbit_stream_coordinator :local_pid (StreamId ) of
999
- {ok , Pid } when is_pid (Pid ) ->
1000
- ok ;
1001
- _ ->
1002
- {protocol_error , precondition_failed ,
1003
- " queue '~ts ' does not have a replica on the local node" ,
1004
- [rabbit_misc :rs (amqqueue :get_name (Q ))]}
1005
- end .
1006
-
1007
997
maybe_send_reply (_ChPid , undefined ) -> ok ;
1008
998
maybe_send_reply (ChPid , Msg ) -> ok = rabbit_channel :send_command (ChPid , Msg ).
1009
999
0 commit comments