@@ -369,8 +369,10 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
369
369
[] ->
370
370
{ok , no_route };
371
371
Routes ->
372
- % % FIXME filter non-stream resources
373
- {ok , [Stream || # resource {name = Stream } <- Routes ]}
372
+ {ok ,
373
+ [Stream
374
+ || # resource {name = Stream } = R <- Routes ,
375
+ is_resource_stream_queue (R )]}
374
376
end
375
377
catch
376
378
exit :Error ->
@@ -383,10 +385,11 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
383
385
ExchangeName = rabbit_misc :r (VirtualHost , exchange , SuperStream ),
384
386
Res = try
385
387
rabbit_exchange :lookup_or_die (ExchangeName ),
386
- % % FIXME make sure queue is a stream
387
- % % TODO bindings could be sorted by partition number, by using a binding argument
388
- % % this would make the spreading of messages stable
389
- UnorderedBindings = rabbit_binding :list_for_source (ExchangeName ),
388
+ UnorderedBindings =
389
+ [Binding
390
+ || Binding = # binding {destination = D }
391
+ <- rabbit_binding :list_for_source (ExchangeName ),
392
+ is_resource_stream_queue (D )],
390
393
OrderedBindings =
391
394
rabbit_stream_utils :sort_partitions (UnorderedBindings ),
392
395
{ok ,
@@ -448,3 +451,13 @@ is_stream_queue(Q) ->
448
451
_ ->
449
452
false
450
453
end .
454
+
455
+ is_resource_stream_queue (# resource {kind = queue } = Resource ) ->
456
+ case rabbit_amqqueue :lookup (Resource ) of
457
+ {ok , Q } ->
458
+ is_stream_queue (Q );
459
+ _ ->
460
+ false
461
+ end ;
462
+ is_resource_stream_queue (_ ) ->
463
+ false .
0 commit comments