@@ -221,9 +221,9 @@ consume(Q, #{limiter_active := true}, _State)
221
221
when ? amqqueue_is_stream (Q ) ->
222
222
{error , global_qos_not_supported_for_queue_type };
223
223
consume (Q , Spec , QState0 ) when ? amqqueue_is_stream (Q ) ->
224
- % % Messages should include the offset as a custom header.
225
- case check_queue_exists_in_local_node ( Q ) of
226
- ok ->
224
+ % % Messages should include the offset as a custom header.
225
+ case get_local_pid ( QState0 ) of
226
+ { LocalPid , QState } when is_pid ( LocalPid ) ->
227
227
#{no_ack := NoAck ,
228
228
channel_pid := ChPid ,
229
229
prefetch_count := ConsumerPrefetchCount ,
@@ -232,7 +232,8 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
232
232
args := Args ,
233
233
ok_msg := OkMsg } = Spec ,
234
234
QName = amqqueue :get_name (Q ),
235
- case parse_offset_arg (rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
235
+ case parse_offset_arg (
236
+ rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
236
237
{error , _ } = Err ->
237
238
Err ;
238
239
{ok , OffsetSpec } ->
@@ -245,10 +246,12 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
245
246
% % really it should be sent by the stream queue process like classic queues
246
247
% % do
247
248
maybe_send_reply (ChPid , OkMsg ),
248
- begin_stream (QState0 , ConsumerTag , OffsetSpec , ConsumerPrefetchCount )
249
- end ;
250
- Err ->
251
- Err
249
+ begin_stream (QState , ConsumerTag , OffsetSpec , ConsumerPrefetchCount )
250
+ end ;
251
+ {undefined , _ } ->
252
+ {protocol_error , precondition_failed ,
253
+ " queue '~ts ' does not have a running replica on the local node" ,
254
+ [rabbit_misc :rs (amqqueue :get_name (Q ))]}
252
255
end .
253
256
254
257
-spec parse_offset_arg (undefined |
@@ -285,8 +288,7 @@ get_local_pid(#stream_client{local_pid = Pid} = State)
285
288
get_local_pid (# stream_client {leader = Pid } = State )
286
289
when is_pid (Pid ) andalso node (Pid ) == node () ->
287
290
{Pid , State # stream_client {local_pid = Pid }};
288
- get_local_pid (# stream_client {stream_id = StreamId ,
289
- local_pid = undefined } = State ) ->
291
+ get_local_pid (# stream_client {stream_id = StreamId } = State ) ->
290
292
% % query local coordinator to get pid
291
293
case rabbit_stream_coordinator :local_pid (StreamId ) of
292
294
{ok , Pid } ->
@@ -295,33 +297,29 @@ get_local_pid(#stream_client{stream_id = StreamId,
295
297
{undefined , State }
296
298
end .
297
299
298
- begin_stream (# stream_client {name = QName , readers = Readers0 } = State0 ,
299
- Tag , Offset , Max ) ->
300
- {LocalPid , State } = get_local_pid (State0 ),
301
- case LocalPid of
302
- undefined ->
303
- {error , no_local_stream_replica_available };
304
- _ ->
305
- CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
306
- {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec ),
307
- NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
308
- osiris :register_offset_listener (LocalPid , NextOffset ),
309
- % % TODO: avoid double calls to the same process
310
- StartOffset = case Offset of
311
- first -> NextOffset ;
312
- last -> NextOffset ;
313
- next -> NextOffset ;
314
- {timestamp , _ } -> NextOffset ;
315
- _ -> Offset
316
- end ,
317
- Str0 = # stream {credit = Max ,
318
- start_offset = StartOffset ,
319
- listening_offset = NextOffset ,
320
- log = Seg0 ,
321
- max = Max },
322
- {ok , State # stream_client {local_pid = LocalPid ,
323
- readers = Readers0 #{Tag => Str0 }}}
324
- end .
300
+ begin_stream (# stream_client {name = QName ,
301
+ readers = Readers0 ,
302
+ local_pid = LocalPid } = State ,
303
+ Tag , Offset , Max )
304
+ when is_pid (LocalPid ) ->
305
+ CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
306
+ {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec ),
307
+ NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
308
+ osiris :register_offset_listener (LocalPid , NextOffset ),
309
+ StartOffset = case Offset of
310
+ first -> NextOffset ;
311
+ last -> NextOffset ;
312
+ next -> NextOffset ;
313
+ {timestamp , _ } -> NextOffset ;
314
+ _ -> Offset
315
+ end ,
316
+ Str0 = # stream {credit = Max ,
317
+ start_offset = StartOffset ,
318
+ listening_offset = NextOffset ,
319
+ log = Seg0 ,
320
+ max = Max },
321
+ {ok , State # stream_client {local_pid = LocalPid ,
322
+ readers = Readers0 #{Tag => Str0 }}}.
325
323
326
324
cancel (_Q , ConsumerTag , OkMsg , ActingUser , # stream_client {readers = Readers0 ,
327
325
name = QName } = State ) ->
@@ -341,8 +339,8 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
341
339
end .
342
340
343
341
credit (QName , CTag , Credit , Drain , # stream_client {readers = Readers0 ,
344
- name = Name ,
345
- local_pid = LocalPid } = State ) ->
342
+ name = Name ,
343
+ local_pid = LocalPid } = State ) ->
346
344
{Readers1 , Msgs } = case Readers0 of
347
345
#{CTag := # stream {credit = Credit0 } = Str0 } ->
348
346
Str1 = Str0 # stream {credit = Credit0 + Credit },
@@ -356,7 +354,8 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
356
354
true ->
357
355
case Readers1 of
358
356
#{CTag := # stream {credit = Credit1 } = Str2 } ->
359
- {Readers0 #{CTag => Str2 # stream {credit = 0 }}, [{send_drained , {CTag , Credit1 }}]};
357
+ {Readers0 #{CTag => Str2 # stream {credit = 0 }},
358
+ [{send_drained , {CTag , Credit1 }}]};
360
359
_ ->
361
360
{Readers1 , []}
362
361
end ;
@@ -386,7 +385,7 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
386
385
soft_limit = SftLmt ,
387
386
slow = Slow0 } = State ) ->
388
387
ok = osiris :write (LeaderPid , WriterId , Seq , msg_to_iodata (Msg )),
389
- Correlation = case MsgId of
388
+ Correlation = case MsgId of
390
389
undefined ->
391
390
Correlation0 ;
392
391
_ when is_number (MsgId ) ->
@@ -950,17 +949,6 @@ stream_name(#resource{virtual_host = VHost, name = Name}) ->
950
949
recover (Q ) ->
951
950
{ok , Q }.
952
951
953
- check_queue_exists_in_local_node (Q ) ->
954
- #{name := StreamId } = amqqueue :get_type_state (Q ),
955
- case rabbit_stream_coordinator :local_pid (StreamId ) of
956
- {ok , Pid } when is_pid (Pid ) ->
957
- ok ;
958
- _ ->
959
- {protocol_error , precondition_failed ,
960
- " queue '~ts ' does not have a replica on the local node" ,
961
- [rabbit_misc :rs (amqqueue :get_name (Q ))]}
962
- end .
963
-
964
952
maybe_send_reply (_ChPid , undefined ) -> ok ;
965
953
maybe_send_reply (ChPid , Msg ) -> ok = rabbit_channel :send_command (ChPid , Msg ).
966
954
0 commit comments