Skip to content

Commit 8e5973b

Browse files
committed
Support x-stream-match-unfiltered in Stomp
1 parent 02d1d86 commit 8e5973b

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
-define(HEADER_PREFETCH_COUNT, "prefetch-count").
2929
-define(HEADER_X_STREAM_OFFSET, "x-stream-offset").
3030
-define(HEADER_X_STREAM_FILTER, "x-stream-filter").
31+
-define(HEADER_X_STREAM_MATCH_UNFILTERED, "x-stream-match-unfiltered").
3132
-define(HEADER_PRIORITY, "priority").
3233
-define(HEADER_RECEIPT, "receipt").
3334
-define(HEADER_REDELIVERED, "redelivered").

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,9 @@ do_subscribe(Destination, DestHdr, Frame,
717717
end.
718718

719719
subscribe_arguments(Frame) ->
720-
subscribe_arguments([?HEADER_X_STREAM_OFFSET, ?HEADER_X_STREAM_FILTER], Frame, []).
720+
subscribe_arguments([?HEADER_X_STREAM_OFFSET,
721+
?HEADER_X_STREAM_FILTER,
722+
?HEADER_X_STREAM_MATCH_UNFILTERED], Frame, []).
721723

722724
subscribe_arguments([], _Frame , Acc) ->
723725
Acc;
@@ -731,15 +733,23 @@ subscribe_argument(?HEADER_X_STREAM_OFFSET, Frame, Acc) ->
731733
not_found ->
732734
Acc;
733735
{OffsetType, OffsetValue} ->
734-
[{<<"x-stream-offset">>, OffsetType, OffsetValue}] ++ Acc
736+
[{list_to_binary(?HEADER_X_STREAM_OFFSET), OffsetType, OffsetValue}] ++ Acc
735737
end;
736738
subscribe_argument(?HEADER_X_STREAM_FILTER, Frame, Acc) ->
737739
StreamFilter = rabbit_stomp_frame:stream_filter_header(Frame),
738740
case StreamFilter of
739741
not_found ->
740742
Acc;
741743
{FilterType, FilterValue} ->
742-
[{<<"x-stream-filter">>, FilterType, FilterValue}] ++ Acc
744+
[{list_to_binary(?HEADER_X_STREAM_FILTER), FilterType, FilterValue}] ++ Acc
745+
end;
746+
subscribe_argument(?HEADER_X_STREAM_MATCH_UNFILTERED, Frame, Acc) ->
747+
MatchUnfiltered = rabbit_stomp_frame:boolean_header(Frame, ?HEADER_X_STREAM_MATCH_UNFILTERED),
748+
case MatchUnfiltered of
749+
{ok, MU} ->
750+
[{list_to_binary(?HEADER_X_STREAM_MATCH_UNFILTERED), bool, MU}] ++ Acc;
751+
not_found ->
752+
Acc
743753
end.
744754

745755
check_subscription_access(Destination = {topic, _Topic},

deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def test_stream_queue(self):
2828
'x-max-age' : '10h',
2929
'x-stream-max-segment-size-bytes' : 1048576,
3030
'x-stream-filter-size-bytes' : 32,
31+
'x-stream-match-unfiltered' : True,
3132
'durable': True,
3233
'auto-delete': False,
3334
'id': 1234,

0 commit comments

Comments
 (0)