@@ -219,10 +219,16 @@ consume(Q, #{no_ack := true}, _)
219
219
consume (Q , #{limiter_active := true }, _State )
220
220
when ? amqqueue_is_stream (Q ) ->
221
221
{error , global_qos_not_supported_for_queue_type };
222
+ <<<<<<< HEAD
222
223
consume (Q , Spec , QState0 ) when ? amqqueue_is_stream (Q ) ->
224
+ =======
225
+ consume (Q , Spec ,
226
+ # stream_client {filtering_supported = FilteringSupported } = QState0 )
227
+ when ? amqqueue_is_stream (Q ) ->
228
+ >>>>>>> 19 ab8824ce (Osiris 1.6 .6 )
223
229
% % Messages should include the offset as a custom header.
224
- case check_queue_exists_in_local_node ( Q ) of
225
- ok ->
230
+ case get_local_pid ( QState0 ) of
231
+ { LocalPid , QState } when is_pid ( LocalPid ) ->
226
232
#{no_ack := NoAck ,
227
233
channel_pid := ChPid ,
228
234
prefetch_count := ConsumerPrefetchCount ,
@@ -231,10 +237,12 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
231
237
args := Args ,
232
238
ok_msg := OkMsg } = Spec ,
233
239
QName = amqqueue :get_name (Q ),
234
- case parse_offset_arg (rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
240
+ case parse_offset_arg (
241
+ rabbit_misc :table_lookup (Args , <<" x-stream-offset" >>)) of
235
242
{error , _ } = Err ->
236
243
Err ;
237
244
{ok , OffsetSpec } ->
245
+ <<<<<<< HEAD
238
246
_ = rabbit_stream_coordinator :register_local_member_listener (Q ),
239
247
rabbit_core_metrics :consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
240
248
not NoAck , QName ,
@@ -245,9 +253,31 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
245
253
% % do
246
254
maybe_send_reply (ChPid , OkMsg ),
247
255
begin_stream (QState0 , ConsumerTag , OffsetSpec , ConsumerPrefetchCount )
256
+ =======
257
+ FilterSpec = filter_spec (Args ),
258
+ case {FilterSpec , FilteringSupported } of
259
+ {#{filter_spec := _ }, false } ->
260
+ {protocol_error , precondition_failed ,
261
+ " Filtering is not supported" , []};
262
+ _ ->
263
+ rabbit_core_metrics :consumer_created (ChPid , ConsumerTag ,
264
+ ExclusiveConsume ,
265
+ not NoAck , QName ,
266
+ ConsumerPrefetchCount ,
267
+ false , up , Args ),
268
+ % % reply needs to be sent before the stream
269
+ % % begins sending
270
+ maybe_send_reply (ChPid , OkMsg ),
271
+ _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
272
+ begin_stream (QState , ConsumerTag , OffsetSpec ,
273
+ ConsumerPrefetchCount , FilterSpec )
274
+ end
275
+ >>>>>>> 19 ab8824ce (Osiris 1.6 .6 )
248
276
end ;
249
- Err ->
250
- Err
277
+ {undefined , _ } ->
278
+ {protocol_error , precondition_failed ,
279
+ " queue '~ts ' does not have a running replica on the local node" ,
280
+ [rabbit_misc :rs (amqqueue :get_name (Q ))]}
251
281
end .
252
282
253
283
-spec parse_offset_arg (undefined |
@@ -284,8 +314,7 @@ get_local_pid(#stream_client{local_pid = Pid} = State)
284
314
get_local_pid (# stream_client {leader = Pid } = State )
285
315
when is_pid (Pid ) andalso node (Pid ) == node () ->
286
316
{Pid , State # stream_client {local_pid = Pid }};
287
- get_local_pid (# stream_client {stream_id = StreamId ,
288
- local_pid = undefined } = State ) ->
317
+ get_local_pid (# stream_client {stream_id = StreamId } = State ) ->
289
318
% % query local coordinator to get pid
290
319
case rabbit_stream_coordinator :local_pid (StreamId ) of
291
320
{ok , Pid } ->
@@ -294,6 +323,7 @@ get_local_pid(#stream_client{stream_id = StreamId,
294
323
{undefined , State }
295
324
end .
296
325
326
+ <<<<<<< HEAD
297
327
begin_stream (# stream_client {name = QName , readers = Readers0 } = State0 ,
298
328
Tag , Offset , Max ) ->
299
329
{LocalPid , State } = get_local_pid (State0 ),
@@ -321,6 +351,32 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
321
351
{ok , State # stream_client {local_pid = LocalPid ,
322
352
readers = Readers0 #{Tag => Str0 }}}
323
353
end .
354
+ =======
355
+ begin_stream (# stream_client {name = QName ,
356
+ readers = Readers0 ,
357
+ local_pid = LocalPid } = State ,
358
+ Tag , Offset , Max , Options )
359
+ when is_pid (LocalPid ) ->
360
+ CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
361
+ {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec , Options ),
362
+ NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
363
+ osiris :register_offset_listener (LocalPid , NextOffset ),
364
+ StartOffset = case Offset of
365
+ first -> NextOffset ;
366
+ last -> NextOffset ;
367
+ next -> NextOffset ;
368
+ {timestamp , _ } -> NextOffset ;
369
+ _ -> Offset
370
+ end ,
371
+ Str0 = # stream {credit = Max ,
372
+ start_offset = StartOffset ,
373
+ listening_offset = NextOffset ,
374
+ log = Seg0 ,
375
+ max = Max ,
376
+ reader_options = Options },
377
+ {ok , State # stream_client {local_pid = LocalPid ,
378
+ readers = Readers0 #{Tag => Str0 }}}.
379
+ >>>>>>> 19 ab8824ce (Osiris 1.6 .6 )
324
380
325
381
cancel (_Q , ConsumerTag , OkMsg , ActingUser , # stream_client {readers = Readers0 ,
326
382
name = QName } = State ) ->
@@ -340,8 +396,8 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
340
396
end .
341
397
342
398
credit (QName , CTag , Credit , Drain , # stream_client {readers = Readers0 ,
343
- name = Name ,
344
- local_pid = LocalPid } = State ) ->
399
+ name = Name ,
400
+ local_pid = LocalPid } = State ) ->
345
401
{Readers1 , Msgs } = case Readers0 of
346
402
#{CTag := # stream {credit = Credit0 } = Str0 } ->
347
403
Str1 = Str0 # stream {credit = Credit0 + Credit },
@@ -355,7 +411,8 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
355
411
true ->
356
412
case Readers1 of
357
413
#{CTag := # stream {credit = Credit1 } = Str2 } ->
358
- {Readers0 #{CTag => Str2 # stream {credit = 0 }}, [{send_drained , {CTag , Credit1 }}]};
414
+ {Readers0 #{CTag => Str2 # stream {credit = 0 }},
415
+ [{send_drained , {CTag , Credit1 }}]};
359
416
_ ->
360
417
{Readers1 , []}
361
418
end ;
@@ -376,6 +433,7 @@ deliver(QSs, #delivery{message = Msg, confirm = Confirm} = Delivery) ->
376
433
{[{Q , S } | Qs ], As ++ Actions }
377
434
end , {[], []}, QSs ).
378
435
436
+ <<<<<<< HEAD
379
437
deliver (_Confirm , # delivery {message = Msg , msg_seq_no = MsgId },
380
438
# stream_client {name = Name ,
381
439
leader = LeaderPid ,
@@ -385,6 +443,20 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
385
443
soft_limit = SftLmt ,
386
444
slow = Slow0 } = State ) ->
387
445
ok = osiris :write (LeaderPid , WriterId , Seq , msg_to_iodata (Msg )),
446
+ =======
447
+ deliver0 (MsgId , Msg ,
448
+ # stream_client {name = Name ,
449
+ leader = LeaderPid ,
450
+ writer_id = WriterId ,
451
+ next_seq = Seq ,
452
+ correlation = Correlation0 ,
453
+ soft_limit = SftLmt ,
454
+ slow = Slow0 ,
455
+ filtering_supported = FilteringSupported } = State ,
456
+ Actions0 ) ->
457
+ ok = osiris :write (LeaderPid , WriterId , Seq ,
458
+ stream_message (Msg , FilteringSupported )),
459
+ >>>>>>> 19 ab8824ce (Osiris 1.6 .6 )
388
460
Correlation = case MsgId of
389
461
undefined ->
390
462
Correlation0 ;
@@ -923,17 +995,6 @@ stream_name(#resource{virtual_host = VHost, name = Name}) ->
923
995
recover (Q ) ->
924
996
{ok , Q }.
925
997
926
- check_queue_exists_in_local_node (Q ) ->
927
- #{name := StreamId } = amqqueue :get_type_state (Q ),
928
- case rabbit_stream_coordinator :local_pid (StreamId ) of
929
- {ok , Pid } when is_pid (Pid ) ->
930
- ok ;
931
- _ ->
932
- {protocol_error , precondition_failed ,
933
- " queue '~ts ' does not have a replica on the local node" ,
934
- [rabbit_misc :rs (amqqueue :get_name (Q ))]}
935
- end .
936
-
937
998
maybe_send_reply (_ChPid , undefined ) -> ok ;
938
999
maybe_send_reply (ChPid , Msg ) -> ok = rabbit_channel :send_command (ChPid , Msg ).
939
1000
0 commit comments