@@ -238,10 +238,11 @@ consume(Q, #{limiter_active := true}, _State)
238
238
when ? amqqueue_is_stream (Q ) ->
239
239
{error , global_qos_not_supported_for_queue_type };
240
240
consume (Q , Spec ,
241
- # stream_client {filtering_supported = FilteringSupported } = QState0 ) when ? amqqueue_is_stream (Q ) ->
241
+ # stream_client {filtering_supported = FilteringSupported } = QState0 )
242
+ when ? amqqueue_is_stream (Q ) ->
242
243
% % Messages should include the offset as a custom header.
243
- case check_queue_exists_in_local_node ( Q ) of
244
- ok ->
244
+ case get_local_pid ( QState0 ) of
245
+ { LocalPid , QState } when is_pid ( LocalPid ) ->
245
246
#{no_ack := NoAck ,
246
247
channel_pid := ChPid ,
247
248
prefetch_count := ConsumerPrefetchCount ,
@@ -250,30 +251,34 @@ consume(Q, Spec,
250
251
args := Args ,
251
252
ok_msg := OkMsg } = Spec ,
252
253
QName = amqqueue :get_name (Q ),
253
- case parse_offset_arg (rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
254
+ case parse_offset_arg (
255
+ rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
254
256
{error , _ } = Err ->
255
257
Err ;
256
258
{ok , OffsetSpec } ->
257
- _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
258
- rabbit_core_metrics :consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
259
- not NoAck , QName ,
260
- ConsumerPrefetchCount , false ,
261
- up , Args ),
262
- % % FIXME: reply needs to be sent before the stream begins sending
263
- % % really it should be sent by the stream queue process like classic queues
264
- % % do
265
- maybe_send_reply (ChPid , OkMsg ),
266
259
FilterSpec = filter_spec (Args ),
267
260
case {FilterSpec , FilteringSupported } of
268
261
{#{filter_spec := _ }, false } ->
269
- {protocol_error , precondition_failed , " Filtering is not supported" , []};
262
+ {protocol_error , precondition_failed ,
263
+ " Filtering is not supported" , []};
270
264
_ ->
271
- begin_stream (QState0 , ConsumerTag , OffsetSpec ,
265
+ rabbit_core_metrics :consumer_created (ChPid , ConsumerTag ,
266
+ ExclusiveConsume ,
267
+ not NoAck , QName ,
268
+ ConsumerPrefetchCount ,
269
+ false , up , Args ),
270
+ % % reply needs to be sent before the stream
271
+ % % begins sending
272
+ maybe_send_reply (ChPid , OkMsg ),
273
+ _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
274
+ begin_stream (QState , ConsumerTag , OffsetSpec ,
272
275
ConsumerPrefetchCount , FilterSpec )
273
276
end
274
277
end ;
275
- Err ->
276
- Err
278
+ {undefined , _ } ->
279
+ {protocol_error , precondition_failed ,
280
+ " queue '~ts ' does not have a running replica on the local node" ,
281
+ [rabbit_misc :rs (amqqueue :get_name (Q ))]}
277
282
end .
278
283
279
284
-spec parse_offset_arg (undefined |
@@ -339,8 +344,7 @@ get_local_pid(#stream_client{local_pid = Pid} = State)
339
344
get_local_pid (# stream_client {leader = Pid } = State )
340
345
when is_pid (Pid ) andalso node (Pid ) == node () ->
341
346
{Pid , State # stream_client {local_pid = Pid }};
342
- get_local_pid (# stream_client {stream_id = StreamId ,
343
- local_pid = undefined } = State ) ->
347
+ get_local_pid (# stream_client {stream_id = StreamId } = State ) ->
344
348
% % query local coordinator to get pid
345
349
case rabbit_stream_coordinator :local_pid (StreamId ) of
346
350
{ok , Pid } ->
@@ -349,34 +353,30 @@ get_local_pid(#stream_client{stream_id = StreamId,
349
353
{undefined , State }
350
354
end .
351
355
352
- begin_stream (# stream_client {name = QName , readers = Readers0 } = State0 ,
353
- Tag , Offset , Max , Options ) ->
354
- {LocalPid , State } = get_local_pid (State0 ),
355
- case LocalPid of
356
- undefined ->
357
- {error , no_local_stream_replica_available };
358
- _ ->
359
- CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
360
- {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec , Options ),
361
- NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
362
- osiris :register_offset_listener (LocalPid , NextOffset ),
363
- % % TODO: avoid double calls to the same process
364
- StartOffset = case Offset of
365
- first -> NextOffset ;
366
- last -> NextOffset ;
367
- next -> NextOffset ;
368
- {timestamp , _ } -> NextOffset ;
369
- _ -> Offset
370
- end ,
371
- Str0 = # stream {credit = Max ,
372
- start_offset = StartOffset ,
373
- listening_offset = NextOffset ,
374
- log = Seg0 ,
375
- max = Max ,
376
- reader_options = Options },
377
- {ok , State # stream_client {local_pid = LocalPid ,
378
- readers = Readers0 #{Tag => Str0 }}}
379
- end .
356
+ begin_stream (# stream_client {name = QName ,
357
+ readers = Readers0 ,
358
+ local_pid = LocalPid } = State ,
359
+ Tag , Offset , Max , Options )
360
+ when is_pid (LocalPid ) ->
361
+ CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
362
+ {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec , Options ),
363
+ NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
364
+ osiris :register_offset_listener (LocalPid , NextOffset ),
365
+ StartOffset = case Offset of
366
+ first -> NextOffset ;
367
+ last -> NextOffset ;
368
+ next -> NextOffset ;
369
+ {timestamp , _ } -> NextOffset ;
370
+ _ -> Offset
371
+ end ,
372
+ Str0 = # stream {credit = Max ,
373
+ start_offset = StartOffset ,
374
+ listening_offset = NextOffset ,
375
+ log = Seg0 ,
376
+ max = Max ,
377
+ reader_options = Options },
378
+ {ok , State # stream_client {local_pid = LocalPid ,
379
+ readers = Readers0 #{Tag => Str0 }}}.
380
380
381
381
cancel (_Q , ConsumerTag , OkMsg , ActingUser , # stream_client {readers = Readers0 ,
382
382
name = QName } = State ) ->
@@ -396,8 +396,8 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
396
396
end .
397
397
398
398
credit (QName , CTag , Credit , Drain , # stream_client {readers = Readers0 ,
399
- name = Name ,
400
- local_pid = LocalPid } = State ) ->
399
+ name = Name ,
400
+ local_pid = LocalPid } = State ) ->
401
401
{Readers1 , Msgs } = case Readers0 of
402
402
#{CTag := # stream {credit = Credit0 } = Str0 } ->
403
403
Str1 = Str0 # stream {credit = Credit0 + Credit },
@@ -411,7 +411,8 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
411
411
true ->
412
412
case Readers1 of
413
413
#{CTag := # stream {credit = Credit1 } = Str2 } ->
414
- {Readers0 #{CTag => Str2 # stream {credit = 0 }}, [{send_drained , {CTag , Credit1 }}]};
414
+ {Readers0 #{CTag => Str2 # stream {credit = 0 }},
415
+ [{send_drained , {CTag , Credit1 }}]};
415
416
_ ->
416
417
{Readers1 , []}
417
418
end ;
@@ -444,7 +445,7 @@ deliver0(MsgId, Msg,
444
445
soft_limit = SftLmt ,
445
446
slow = Slow0 ,
446
447
filtering_supported = FilteringSupported } = State ,
447
- Actions0 ) ->
448
+ Actions0 ) ->
448
449
ok = osiris :write (LeaderPid , WriterId , Seq ,
449
450
stream_message (Msg , FilteringSupported )),
450
451
Correlation = case MsgId of
@@ -1020,17 +1021,6 @@ stream_name(#resource{virtual_host = VHost, name = Name}) ->
1020
1021
recover (Q ) ->
1021
1022
{ok , Q }.
1022
1023
1023
- check_queue_exists_in_local_node (Q ) ->
1024
- #{name := StreamId } = amqqueue :get_type_state (Q ),
1025
- case rabbit_stream_coordinator :local_pid (StreamId ) of
1026
- {ok , Pid } when is_pid (Pid ) ->
1027
- ok ;
1028
- _ ->
1029
- {protocol_error , precondition_failed ,
1030
- " queue '~ts ' does not have a replica on the local node" ,
1031
- [rabbit_misc :rs (amqqueue :get_name (Q ))]}
1032
- end .
1033
-
1034
1024
maybe_send_reply (_ChPid , undefined ) -> ok ;
1035
1025
maybe_send_reply (ChPid , Msg ) -> ok = rabbit_channel :send_command (ChPid , Msg ).
1036
1026
0 commit comments