@@ -219,14 +219,8 @@ consume(Q, #{no_ack := true}, _)
219
219
consume (Q , #{limiter_active := true }, _State )
220
220
when ? amqqueue_is_stream (Q ) ->
221
221
{error , global_qos_not_supported_for_queue_type };
222
- <<<<<<< HEAD
223
222
consume (Q , Spec , QState0 ) when ? amqqueue_is_stream (Q ) ->
224
- =======
225
- consume (Q , Spec ,
226
- # stream_client {filtering_supported = FilteringSupported } = QState0 )
227
- when ? amqqueue_is_stream (Q ) ->
228
- >>>>>>> 19 ab8824ce (Osiris 1.6 .6 )
229
- % % Messages should include the offset as a custom header.
223
+ % % Messages should include the offset as a custom header.
230
224
case get_local_pid (QState0 ) of
231
225
{LocalPid , QState } when is_pid (LocalPid ) ->
232
226
#{no_ack := NoAck ,
@@ -242,7 +236,6 @@ consume(Q, Spec,
242
236
{error , _ } = Err ->
243
237
Err ;
244
238
{ok , OffsetSpec } ->
245
- <<<<<<< HEAD
246
239
_ = rabbit_stream_coordinator :register_local_member_listener (Q ),
247
240
rabbit_core_metrics :consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
248
241
not NoAck , QName ,
@@ -252,28 +245,8 @@ consume(Q, Spec,
252
245
% % really it should be sent by the stream queue process like classic queues
253
246
% % do
254
247
maybe_send_reply (ChPid , OkMsg ),
255
- begin_stream (QState0 , ConsumerTag , OffsetSpec , ConsumerPrefetchCount )
256
- =======
257
- FilterSpec = filter_spec (Args ),
258
- case {FilterSpec , FilteringSupported } of
259
- {#{filter_spec := _ }, false } ->
260
- {protocol_error , precondition_failed ,
261
- " Filtering is not supported" , []};
262
- _ ->
263
- rabbit_core_metrics :consumer_created (ChPid , ConsumerTag ,
264
- ExclusiveConsume ,
265
- not NoAck , QName ,
266
- ConsumerPrefetchCount ,
267
- false , up , Args ),
268
- % % reply needs to be sent before the stream
269
- % % begins sending
270
- maybe_send_reply (ChPid , OkMsg ),
271
- _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
272
- begin_stream (QState , ConsumerTag , OffsetSpec ,
273
- ConsumerPrefetchCount , FilterSpec )
274
- end
275
- >>>>>>> 19 ab8824ce (Osiris 1.6 .6 )
276
- end ;
248
+ begin_stream (QState , ConsumerTag , OffsetSpec , ConsumerPrefetchCount )
249
+ end ;
277
250
{undefined , _ } ->
278
251
{protocol_error , precondition_failed ,
279
252
" queue '~ts ' does not have a running replica on the local node" ,
@@ -323,42 +296,13 @@ get_local_pid(#stream_client{stream_id = StreamId} = State) ->
323
296
{undefined , State }
324
297
end .
325
298
326
- <<<<<<< HEAD
327
- begin_stream (# stream_client {name = QName , readers = Readers0 } = State0 ,
328
- Tag , Offset , Max ) ->
329
- {LocalPid , State } = get_local_pid (State0 ),
330
- case LocalPid of
331
- undefined ->
332
- {error , no_local_stream_replica_available };
333
- _ ->
334
- CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
335
- {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec ),
336
- NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
337
- osiris :register_offset_listener (LocalPid , NextOffset ),
338
- % % TODO: avoid double calls to the same process
339
- StartOffset = case Offset of
340
- first -> NextOffset ;
341
- last -> NextOffset ;
342
- next -> NextOffset ;
343
- {timestamp , _ } -> NextOffset ;
344
- _ -> Offset
345
- end ,
346
- Str0 = # stream {credit = Max ,
347
- start_offset = StartOffset ,
348
- listening_offset = NextOffset ,
349
- log = Seg0 ,
350
- max = Max },
351
- {ok , State # stream_client {local_pid = LocalPid ,
352
- readers = Readers0 #{Tag => Str0 }}}
353
- end .
354
- =======
355
299
begin_stream (# stream_client {name = QName ,
356
300
readers = Readers0 ,
357
301
local_pid = LocalPid } = State ,
358
- Tag , Offset , Max , Options )
302
+ Tag , Offset , Max )
359
303
when is_pid (LocalPid ) ->
360
304
CounterSpec = {{? MODULE , QName , Tag , self ()}, []},
361
- {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec , Options ),
305
+ {ok , Seg0 } = osiris :init_reader (LocalPid , Offset , CounterSpec ),
362
306
NextOffset = osiris_log :next_offset (Seg0 ) - 1 ,
363
307
osiris :register_offset_listener (LocalPid , NextOffset ),
364
308
StartOffset = case Offset of
@@ -372,11 +316,9 @@ begin_stream(#stream_client{name = QName,
372
316
start_offset = StartOffset ,
373
317
listening_offset = NextOffset ,
374
318
log = Seg0 ,
375
- max = Max ,
376
- reader_options = Options },
319
+ max = Max },
377
320
{ok , State # stream_client {local_pid = LocalPid ,
378
321
readers = Readers0 #{Tag => Str0 }}}.
379
- >>>>>>> 19 ab8824ce (Osiris 1.6 .6 )
380
322
381
323
cancel (_Q , ConsumerTag , OkMsg , ActingUser , # stream_client {readers = Readers0 ,
382
324
name = QName } = State ) ->
@@ -433,7 +375,6 @@ deliver(QSs, #delivery{message = Msg, confirm = Confirm} = Delivery) ->
433
375
{[{Q , S } | Qs ], As ++ Actions }
434
376
end , {[], []}, QSs ).
435
377
436
- <<<<<<< HEAD
437
378
deliver (_Confirm , # delivery {message = Msg , msg_seq_no = MsgId },
438
379
# stream_client {name = Name ,
439
380
leader = LeaderPid ,
@@ -443,21 +384,7 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
443
384
soft_limit = SftLmt ,
444
385
slow = Slow0 } = State ) ->
445
386
ok = osiris :write (LeaderPid , WriterId , Seq , msg_to_iodata (Msg )),
446
- =======
447
- deliver0 (MsgId , Msg ,
448
- # stream_client {name = Name ,
449
- leader = LeaderPid ,
450
- writer_id = WriterId ,
451
- next_seq = Seq ,
452
- correlation = Correlation0 ,
453
- soft_limit = SftLmt ,
454
- slow = Slow0 ,
455
- filtering_supported = FilteringSupported } = State ,
456
- Actions0 ) ->
457
- ok = osiris :write (LeaderPid , WriterId , Seq ,
458
- stream_message (Msg , FilteringSupported )),
459
- >>>>>>> 19 ab8824ce (Osiris 1.6 .6 )
460
- Correlation = case MsgId of
387
+ Correlation = case MsgId of
461
388
undefined ->
462
389
Correlation0 ;
463
390
_ when is_number (MsgId ) ->
0 commit comments