@@ -366,60 +366,54 @@ record_death(Reason, SourceQueue,
366
366
? ANN_ROUTING_KEYS := RoutingKeys } = Anns0 ,
367
367
Timestamp = os :system_time (millisecond ),
368
368
Ttl = maps :get (ttl , Anns0 , undefined ),
369
- DeathAnns = rabbit_misc :maps_put_truthy (ttl , Ttl , #{first_time => Timestamp ,
370
- last_time => Timestamp }),
371
- case Anns0 of
372
- #{deaths := Deaths = # deaths {records = Rs0 }} ->
373
- Rs = if is_list (Rs0 ) ->
374
- % % records are ordered by recency
375
- case lists :keytake (Key , 1 , Rs0 ) of
376
- {value , {Key , D0 }, Rs1 } ->
377
- D = update_death (D0 , Timestamp ),
378
- [{Key , D } | Rs1 ];
369
+ DeathAnns = rabbit_misc :maps_put_truthy (
370
+ ttl , Ttl , #{first_time => Timestamp ,
371
+ last_time => Timestamp }),
372
+ NewDeath = # death {exchange = Exchange ,
373
+ routing_keys = RoutingKeys ,
374
+ count = 1 ,
375
+ anns = DeathAnns },
376
+ Anns = case Anns0 of
377
+ #{deaths := Deaths = # deaths {records = Rs0 }} ->
378
+ Rs = case is_list (Rs0 ) of
379
+ true ->
380
+ % % records are ordered by recency
381
+ case lists :keytake (Key , 1 , Rs0 ) of
382
+ {value , {Key , D0 }, Rs1 } ->
383
+ D = update_death (D0 , Timestamp ),
384
+ [{Key , D } | Rs1 ];
385
+ false ->
386
+ [{Key , NewDeath } | Rs0 ]
387
+ end ;
379
388
false ->
380
- [{Key , # death {exchange = Exchange ,
381
- routing_keys = RoutingKeys ,
382
- count = 1 ,
383
- anns = DeathAnns }} | Rs0 ]
384
- end ;
385
- is_map (Rs0 ) ->
386
- case Rs0 of
387
- #{Key := Death } ->
388
- Rs0 #{Key := update_death (Death , Timestamp )};
389
- _ ->
390
- Rs0 #{Key => # death {exchange = Exchange ,
391
- routing_keys = RoutingKeys ,
392
- count = 1 ,
393
- anns = DeathAnns }}
394
- end
395
- end ,
396
- Anns = Anns0 #{<<" x-last-death-reason" >> := atom_to_binary (Reason ),
389
+ maps :update_with (
390
+ Key ,
391
+ fun (Death ) -> update_death (Death , Timestamp ) end ,
392
+ NewDeath ,
393
+ Rs0 )
394
+ end ,
395
+ Anns0 #{<<" x-last-death-reason" >> := atom_to_binary (Reason ),
397
396
<<" x-last-death-queue" >> := SourceQueue ,
398
397
<<" x-last-death-exchange" >> := Exchange ,
399
398
deaths := Deaths # deaths {last = Key ,
400
- records = Rs }},
401
- State #? MODULE {annotations = Anns };
402
- _ ->
403
- Death = # death {exchange = Exchange ,
404
- routing_keys = RoutingKeys ,
405
- count = 1 ,
406
- anns = DeathAnns },
407
- Rs = case rabbit_feature_flags :is_enabled (message_containers_deaths_v2 ) of
408
- true -> [{Key , Death }];
409
- false -> #{Key => Death }
410
- end ,
411
- ReasonBin = atom_to_binary (Reason ),
412
- Anns = Anns0 #{<<" x-first-death-reason" >> => ReasonBin ,
399
+ records = Rs }};
400
+ _ ->
401
+ Rs = case rabbit_feature_flags :is_enabled (message_containers_deaths_v2 ) of
402
+ true -> [{Key , NewDeath }];
403
+ false -> #{Key => NewDeath }
404
+ end ,
405
+ ReasonBin = atom_to_binary (Reason ),
406
+ Anns0 #{<<" x-first-death-reason" >> => ReasonBin ,
413
407
<<" x-first-death-queue" >> => SourceQueue ,
414
408
<<" x-first-death-exchange" >> => Exchange ,
415
409
<<" x-last-death-reason" >> => ReasonBin ,
416
410
<<" x-last-death-queue" >> => SourceQueue ,
417
411
<<" x-last-death-exchange" >> => Exchange ,
418
412
deaths => # deaths {last = Key ,
419
413
first = Key ,
420
- records = Rs }},
421
- State # ? MODULE { annotations = Anns }
422
- end ;
414
+ records = Rs }}
415
+ end ,
416
+ State # ? MODULE { annotations = Anns } ;
423
417
record_death (Reason , SourceQueue , BasicMsg ) ->
424
418
mc_compat :record_death (Reason , SourceQueue , BasicMsg ).
425
419
@@ -429,27 +423,27 @@ update_death(#death{count = Count,
429
423
anns = DeathAnns #{last_time := Timestamp }}.
430
424
431
425
-spec is_death_cycle (rabbit_misc :resource_name (), state ()) -> boolean ().
432
- is_death_cycle (TargetQueue , #? MODULE {annotations = #{deaths := # deaths {records = Recs }}})
433
- when is_list (Recs ) ->
434
- is_cycle_v2 (TargetQueue , Recs );
435
- is_death_cycle ( TargetQueue , # ? MODULE { annotations = #{ deaths : = # deaths { records = Recs }}})
436
- when is_map ( Recs ) ->
437
- is_cycle_v1 ( TargetQueue , maps : keys ( Recs )) ;
426
+ is_death_cycle (TargetQueue , #? MODULE {annotations = #{deaths := # deaths {records = Rs }}}) ->
427
+ if is_list (Rs ) ->
428
+ is_cycle_v2 (TargetQueue , Rs );
429
+ is_map ( Rs ) ->
430
+ is_cycle_v1 ( TargetQueue , maps : keys ( Rs ))
431
+ end ;
438
432
is_death_cycle (_TargetQueue , #? MODULE {}) ->
439
433
false ;
440
434
is_death_cycle (TargetQueue , BasicMsg ) ->
441
435
mc_compat :is_death_cycle (TargetQueue , BasicMsg ).
442
436
443
437
% % Returns death queue names ordered by recency.
444
438
-spec death_queue_names (state ()) -> [rabbit_misc :resource_name ()].
445
- death_queue_names (#? MODULE {annotations = #{deaths := # deaths {records = Records }}})
446
- when is_list (Records ) ->
447
- lists :map (fun ({{Queue , _Reason }, _Death }) ->
448
- Queue
449
- end , Records );
450
- death_queue_names (# ? MODULE { annotations = #{ deaths : = # deaths { records = Records }}})
451
- when is_map ( Records ) ->
452
- proplists : get_keys ( maps : keys ( Records )) ;
439
+ death_queue_names (#? MODULE {annotations = #{deaths := # deaths {records = Rs }}}) ->
440
+ if is_list (Rs ) ->
441
+ lists :map (fun ({{Queue , _Reason }, _Death }) ->
442
+ Queue
443
+ end , Rs );
444
+ is_map ( Rs ) ->
445
+ proplists : get_keys ( maps : keys ( Rs ))
446
+ end ;
453
447
death_queue_names (#? MODULE {}) ->
454
448
[];
455
449
death_queue_names (BasicMsg ) ->
@@ -474,7 +468,7 @@ is_cycle_v2(TargetQueue, Deaths) ->
474
468
% % There is a cycle, but we only want to drop the message
475
469
% % if the cycle is "fully automatic", i.e. without a client
476
470
% % expliclity rejecting the message somewhere in the cycle.
477
- lists :all (fun ({{_SourceQueue , Reason }, # death {} }) ->
471
+ lists :all (fun ({{_SourceQueue , Reason }, _Death }) ->
478
472
Reason =/= rejected
479
473
end , [H | L ])
480
474
end .
0 commit comments