Skip to content

Commit 5b963ba

Browse files
committed
Add Qpid JMS client message selector tests
1 parent 82c4cde commit 5b963ba

File tree

11 files changed

+752
-50
lines changed

11 files changed

+752
-50
lines changed

deps/rabbit/src/rabbit_amqp_util.erl

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@
1414
jms_header_to_amqp_field_name/1
1515
]).
1616

17+
-type field_name() :: durable | priority |
18+
message_id | user_id | to | subject | reply_to |
19+
correlation_id | content_type | content_encoding |
20+
absolute_expiry_time | creation_time | group_id |
21+
group_sequence | reply_to_group_id.
22+
23+
-export_type([field_name/0]).
24+
1725
-spec protocol_error(term(), io:format(), [term()]) ->
1826
no_return().
1927
protocol_error(Condition, Msg, Args) ->
@@ -30,12 +38,13 @@ capabilities(Capabilities) ->
3038
Caps = [{symbol, C} || C <- Capabilities],
3139
{array, symbol, Caps}.
3240

41+
-spec section_field_name_to_atom(binary()) -> field_name().
3342
%% header section
3443
section_field_name_to_atom(<<"durable">>) -> durable;
3544
section_field_name_to_atom(<<"priority">>) -> priority;
36-
section_field_name_to_atom(<<"ttl">>) -> ttl;
37-
section_field_name_to_atom(<<"first-acquirer">>) -> first_acquirer;
38-
section_field_name_to_atom(<<"delivery-count">>) -> delivery_count;
45+
%% ttl, first-acquirer, and delivery-count are unsupported
46+
%% because setting a JMS message selector on these fields is invalid.
47+
3948
%% properties section
4049
section_field_name_to_atom(<<"message-id">>) -> message_id;
4150
section_field_name_to_atom(<<"user-id">>) -> user_id;
@@ -49,9 +58,10 @@ section_field_name_to_atom(<<"absolute-expiry-time">>) -> absolute_expiry_time;
4958
section_field_name_to_atom(<<"creation-time">>) -> creation_time;
5059
section_field_name_to_atom(<<"group-id">>) -> group_id;
5160
section_field_name_to_atom(<<"group-sequence">>) -> group_sequence;
52-
section_field_name_to_atom(<<"reply-to-group-id">>) -> reply_to_group_id.
53-
61+
section_field_name_to_atom(<<"reply-to-group-id">>) -> reply_to_group_id;
62+
section_field_name_to_atom(Other) -> erlang:error({unsupported_field_name, Other}).
5463

64+
-spec jms_header_to_amqp_field_name(binary()) -> field_name() | binary().
5565
%% "Message header field references are restricted to
5666
%% JMSDeliveryMode, JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and JMSType."
5767
%% https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax
@@ -64,7 +74,6 @@ jms_header_to_amqp_field_name(<<"JMSCorrelationID">>) -> correlation_id;
6474
jms_header_to_amqp_field_name(<<"JMSType">>) -> subject;
6575
%% amqp-bindmap-jms-v1.0-wd10 § 3.2.2 JMS-defined ’JMSX’ Properties
6676
jms_header_to_amqp_field_name(<<"JMSXUserID">>) -> user_id;
67-
jms_header_to_amqp_field_name(<<"JMSXDeliveryCount">>) -> delivery_count;
6877
jms_header_to_amqp_field_name(<<"JMSXGroupID">>) -> group_id;
6978
jms_header_to_amqp_field_name(<<"JMSXGroupSeq">>) -> group_sequence;
7079
jms_header_to_amqp_field_name(Other) -> Other.

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@
4444
-define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
4545
%% controls the timer for closing cached segments
4646
-define(CACHE_SEG_TIMEOUT, 5000).
47+
-define(DEFAULT_MSG_PRIORITY, 4). %% defined in both AMQP and JMS
4748

4849
-type seq() :: non_neg_integer().
4950
-type milliseconds() :: non_neg_integer().
50-
-type filter() :: none | [atom()].
51+
-type filter() :: none | [rabbit_amqp_util:field_name()].
5152

5253
-record(consumer, {key :: rabbit_fifo:consumer_key(),
5354
% status = up :: up | cancelled,
@@ -1133,12 +1134,31 @@ msg_meta(_Msg, none) ->
11331134
#{};
11341135
msg_meta(Msg, FieldNames) ->
11351136
Fields = lists:filtermap(fun(Name) ->
1136-
case mc:property(Name, Msg) of
1137-
{_Type, Val} ->
1138-
{true, {Name, Val}};
1137+
case get_field_value(Name, Msg) of
11391138
undefined ->
1140-
false
1139+
false;
1140+
Val ->
1141+
{true, {Name, Val}}
11411142
end
11421143
end, FieldNames),
11431144
maps:merge(mc:routing_headers(Msg, []),
11441145
maps:from_list(Fields)).
1146+
1147+
get_field_value(durable, Msg) ->
1148+
mc:is_persistent(Msg);
1149+
get_field_value(priority, Msg) ->
1150+
case mc:priority(Msg) of
1151+
undefined ->
1152+
?DEFAULT_MSG_PRIORITY;
1153+
P ->
1154+
P
1155+
end;
1156+
get_field_value(creation_time, Msg) ->
1157+
mc:timestamp(Msg);
1158+
get_field_value(Name, Msg) ->
1159+
case mc:property(Name, Msg) of
1160+
{_Type, Val} ->
1161+
Val;
1162+
undefined ->
1163+
undefined
1164+
end.

deps/rabbit/src/rabbit_fifo_filter_jms.erl

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,24 @@
99

1010
-export([eval/2]).
1111

12+
%% "When used in a message selector JMSDeliveryMode is treated as having
13+
%% the values 'PERSISTENT' and 'NON_PERSISTENT'."
14+
%% https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#special-notes
15+
-define(DELIVERY_MODE_PERSISTENT, <<"PERSISTENT">>).
16+
-define(DELIVERY_MODE_NON_PERSISTENT, <<"NON_PERSISTENT">>).
17+
-define(IS_DELIVERY_MODE(Val),
18+
Val =:= ?DELIVERY_MODE_PERSISTENT orelse
19+
Val =:= ?DELIVERY_MODE_NON_PERSISTENT).
20+
21+
%% "For PERSISTENT messages, the durable field of header MUST be set to true.
22+
%% For NON PERSISTENT messages, the durable field of header MUST be either
23+
%% set to false or omitted."
24+
%% amqp-bindmap-jms-v1.0-wd10
25+
is_durable(?DELIVERY_MODE_PERSISTENT) ->
26+
true;
27+
is_durable(?DELIVERY_MODE_NON_PERSISTENT) ->
28+
false.
29+
1230
%% Evaluates a parsed JMS message selector expression against message metadata.
1331
-spec eval(term(), #{atom() | binary() => atom() | binary() | number()}) ->
1432
boolean().
@@ -81,9 +99,9 @@ eval0({'not', Expr}, Headers) ->
8199

82100
%% Comparison operators
83101
eval0({'=' = Op, Expr1, Expr2}, Headers) ->
84-
compare(Op, eval0(Expr1, Headers), eval0(Expr2, Headers));
102+
compare_eq(Op, eval0(Expr1, Headers), eval0(Expr2, Headers));
85103
eval0({'<>' = Op, Expr1, Expr2}, Headers) ->
86-
compare(Op, eval0(Expr1, Headers), eval0(Expr2, Headers));
104+
compare_eq(Op, eval0(Expr1, Headers), eval0(Expr2, Headers));
87105
eval0({'>' = Op, Expr1, Expr2}, Headers) ->
88106
compare(Op, eval0(Expr1, Headers), eval0(Expr2, Headers));
89107
eval0({'<' = Op, Expr1, Expr2}, Headers) ->
@@ -199,6 +217,13 @@ compare(_, _, _) ->
199217
%% the value of the operation is false."
200218
false.
201219

220+
compare_eq(Op, Left, Right) when is_boolean(Left) andalso ?IS_DELIVERY_MODE(Right) ->
221+
compare(Op, Left, is_durable(Right));
222+
compare_eq(Op, Left, Right) when is_boolean(Right) andalso ?IS_DELIVERY_MODE(Left) ->
223+
compare(Op, is_durable(Left), Right);
224+
compare_eq(Op, Left, Right) ->
225+
compare(Op, Left, Right).
226+
202227
arithmetic(_, undefined, _) ->
203228
undefined;
204229
arithmetic(_, _, undefined) ->

deps/rabbit/src/rabbit_jms_selector_parser.erl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@
66

77
extract_value({_Token, _Line, Value}) -> Value.
88

9+
process_identifier({_Token, Line, <<"JMSXDeliveryCount">>}) ->
10+
%% "A clarification has been added to state that the effect of setting a
11+
%% message selector on a property (such as JMSXDeliveryCount) which is set
12+
%% by the provider on receive is undefined."
13+
return_error(Line, "setting message selector on JMSXDeliveryCount is disallowed");
14+
process_identifier({_Token, _Line, Value}) ->
15+
rabbit_amqp_util:jms_header_to_amqp_field_name(Value).
16+
917
process_like_pattern({string, Line, Value}) ->
1018
case unicode:characters_to_list(Value) of
1119
L when is_list(L) ->
@@ -210,7 +218,7 @@ yecctoken2string1(Other) ->
210218

211219

212220

213-
-file("rabbit_jms_selector_parser.erl", 213).
221+
-file("rabbit_jms_selector_parser.erl", 221).
214222

215223
-dialyzer({nowarn_function, yeccpars2/7}).
216224
-compile({nowarn_unused_function, yeccpars2/7}).
@@ -1499,7 +1507,7 @@ yeccpars2_21_(__Stack0) ->
14991507
[___1 | __Stack] = __Stack0,
15001508
[begin
15011509

1502-
{identifier, rabbit_amqp_util:jms_header_to_amqp_field_name(extract_value(___1))}
1510+
{identifier, process_identifier(___1)}
15031511
end | __Stack].
15041512

15051513
-compile({inline,yeccpars2_22_/1}).
@@ -1827,4 +1835,4 @@ yeccpars2_86_(__Stack0) ->
18271835
end | __Stack].
18281836

18291837

1830-
-file("rabbit_jms_selector_parser.yrl", 141).
1838+
-file("rabbit_jms_selector_parser.yrl", 149).

deps/rabbit/src/rabbit_jms_selector_parser.yrl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ primary -> identifier_expr : '$1'.
111111

112112
%% Identifiers (header fields or property references)
113113
identifier_expr -> identifier :
114-
{identifier, rabbit_amqp_util:jms_header_to_amqp_field_name(extract_value('$1'))}.
114+
{identifier, process_identifier('$1')}.
115115

116116
%% Literals
117117
literal -> integer : {integer, extract_value('$1')}.
@@ -123,6 +123,14 @@ Erlang code.
123123

124124
extract_value({_Token, _Line, Value}) -> Value.
125125

126+
process_identifier({_Token, Line, <<"JMSXDeliveryCount">>}) ->
127+
%% "A clarification has been added to state that the effect of setting a
128+
%% message selector on a property (such as JMSXDeliveryCount) which is set
129+
%% by the provider on receive is undefined."
130+
return_error(Line, "setting message selector on JMSXDeliveryCount is disallowed");
131+
process_identifier({_Token, _Line, Value}) ->
132+
rabbit_amqp_util:jms_header_to_amqp_field_name(Value).
133+
126134
process_like_pattern({string, Line, Value}) ->
127135
case unicode:characters_to_list(Value) of
128136
L when is_list(L) ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,6 @@ consumer_filter(Spec, Args, Q) ->
450450
error
451451
end.
452452

453-
%%TODO map JMS fields to AMQP fields
454453
parse_jms_selector(JmsSelector) ->
455454
String = unicode:characters_to_list(JmsSelector),
456455
case rabbit_jms_selector_lexer:string(String) of
@@ -461,13 +460,13 @@ parse_jms_selector(JmsSelector) ->
461460
% [?MODULE, ?FUNCTION_NAME, JmsSelector, Tokens, _Expr]),
462461
Ok;
463462
{error, Reason} ->
464-
rabbit_log:warning("failed to parse JMS Selector '~s': ~p",
463+
rabbit_log:warning("failed to parse JMS message selector '~s': ~p",
465464
[JmsSelector, Reason]),
466465
error
467466
end;
468467
{error, {_Line, _Mod, ErrDescriptor}, _Locaction} ->
469468
Reason = lists:flatten(leex:format_error(ErrDescriptor)),
470-
rabbit_log:warning("failed to scan JMS selector '~s': ~p",
469+
rabbit_log:warning("failed to scan JMS message selector '~s': ~p",
471470
[JmsSelector, Reason]),
472471
error
473472
end.

deps/rabbit/test/amqp_jms_SUITE.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ groups() ->
4242
message_types_jms_to_jms,
4343
message_types_jms_to_amqp,
4444
temporary_queue_rpc,
45-
temporary_queue_delete
45+
temporary_queue_delete,
46+
message_selector_application_properties,
47+
message_selector_header_fields
4648
]
4749
}].
4850

@@ -134,6 +136,12 @@ temporary_queue_rpc(Config) ->
134136
temporary_queue_delete(Config) ->
135137
ok = run_jms_test(?FUNCTION_NAME, Config).
136138

139+
message_selector_application_properties(Config) ->
140+
ok = run_jms_test(?FUNCTION_NAME, Config).
141+
142+
message_selector_header_fields(Config) ->
143+
ok = run_jms_test(?FUNCTION_NAME, Config).
144+
137145
%% -------------------------------------------------------------------
138146
%% Helpers
139147
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)