17
17
% % * rabbit_event
18
18
-behaviour (rabbit_tracking ).
19
19
20
- -export ([boot /0 ,
21
- update_tracked /1 ,
20
+ -export ([update_tracked /1 ,
22
21
handle_cast /1 ,
23
22
register_tracked /1 ,
24
23
unregister_tracked /1 ,
25
24
count_tracked_items_in /1 ,
26
- clear_tracking_tables /0 ,
27
25
shutdown_tracked_items /2 ]).
28
26
29
27
-export ([list /0 , list_of_user /1 , list_on_node /1 ,
34
32
35
33
-export ([count_local_tracked_items_of_user /1 ]).
36
34
37
- -ifdef (TEST ).
38
- -export ([get_all_tracked_channel_table_names_for_node /1 ]).
39
- -endif .
40
-
41
35
-include_lib (" rabbit_common/include/rabbit.hrl" ).
42
36
43
37
-import (rabbit_misc , [pget /2 ]).
49
43
% % API
50
44
% %
51
45
52
- % % Sets up and resets channel tracking tables for this node.
53
- -spec boot () -> ok .
54
-
55
- boot () ->
56
- ensure_tracked_channels_table_for_this_node (),
57
- ensure_per_user_tracked_channels_table_for_node (),
58
- clear_tracking_tables (),
59
- ok .
60
-
61
46
-spec update_tracked (term ()) -> ok .
62
47
63
48
update_tracked (Event ) ->
@@ -115,30 +100,13 @@ handle_cast({user_deleted, Details}) ->
115
100
% % Schedule user entry deletion, allowing time for connections to close
116
101
_ = timer :apply_after (? TRACKING_EXECUTION_TIMEOUT , ? MODULE ,
117
102
delete_tracked_channel_user_entry , [Username ]),
118
- ok ;
119
- handle_cast ({node_deleted , Details }) ->
120
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
121
- true ->
122
- ok ;
123
- false ->
124
- Node = pget (node , Details ),
125
- rabbit_log_channel :info (
126
- " Node '~ts ' was removed from the cluster, deleting"
127
- " its channel tracking tables..." , [Node ]),
128
- delete_tracked_channels_table_for_node (Node ),
129
- delete_per_user_tracked_channels_table_for_node (Node )
130
- end .
103
+ ok .
131
104
132
105
-spec register_tracked (rabbit_types :tracked_channel ()) -> ok .
133
106
-dialyzer ([{nowarn_function , [register_tracked / 1 ]}]).
134
107
135
- register_tracked (TrackedCh = # tracked_channel {node = Node }) when Node == node () ->
136
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
137
- true -> register_tracked_ets (TrackedCh );
138
- false -> register_tracked_mnesia (TrackedCh )
139
- end .
140
-
141
- register_tracked_ets (TrackedCh = # tracked_channel {pid = ChPid , username = Username }) ->
108
+ register_tracked (TrackedCh = # tracked_channel {pid = ChPid , username = Username ,
109
+ node = Node }) when Node == node () ->
142
110
case ets :lookup (? TRACKED_CHANNEL_TABLE , ChPid ) of
143
111
[] ->
144
112
ets :insert (? TRACKED_CHANNEL_TABLE , TrackedCh ),
@@ -149,85 +117,31 @@ register_tracked_ets(TrackedCh = #tracked_channel{pid = ChPid, username = Userna
149
117
end ,
150
118
ok .
151
119
152
- register_tracked_mnesia (TrackedCh =
153
- # tracked_channel {node = Node , name = Name , username = Username }) ->
154
- ChId = rabbit_tracking :id (Node , Name ),
155
- TableName = tracked_channel_table_name_for (Node ),
156
- PerUserChTableName = tracked_channel_per_user_table_name_for (Node ),
157
- case mnesia :dirty_read (TableName , ChId ) of
158
- [] ->
159
- mnesia :dirty_write (TableName , TrackedCh ),
160
- mnesia :dirty_update_counter (PerUserChTableName , Username , 1 ),
161
- ok ;
162
- [# tracked_channel {}] ->
163
- ok
164
- end ,
165
- ok .
166
-
167
120
-spec unregister_tracked_by_pid (pid ()) -> any ().
168
121
unregister_tracked_by_pid (ChPid ) when node (ChPid ) == node () ->
169
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
170
- true -> unregister_tracked_by_pid_ets (ChPid );
171
- false -> unregister_tracked_by_pid_mnesia (ChPid )
172
- end .
173
-
174
- unregister_tracked_by_pid_ets (ChPid ) ->
175
122
case ets :lookup (? TRACKED_CHANNEL_TABLE , ChPid ) of
176
123
[] -> ok ;
177
124
[# tracked_channel {username = Username }] ->
178
125
ets :update_counter (? TRACKED_CHANNEL_TABLE_PER_USER , Username , - 1 ),
179
126
ets :delete (? TRACKED_CHANNEL_TABLE , ChPid )
180
127
end .
181
128
182
- unregister_tracked_by_pid_mnesia (ChPid ) ->
183
- case get_tracked_channel_by_pid_mnesia (ChPid ) of
184
- [] -> ok ;
185
- [# tracked_channel {id = ChId , node = Node , username = Username }] ->
186
- TableName = tracked_channel_table_name_for (Node ),
187
- PerUserChannelTableName = tracked_channel_per_user_table_name_for (Node ),
188
-
189
- mnesia :dirty_update_counter (PerUserChannelTableName , Username , - 1 ),
190
- mnesia :dirty_delete (TableName , ChId )
191
- end .
192
-
193
129
% % @doc This function is exported and implements a rabbit_tracking
194
130
% % callback, however it is not used in rabbitmq-server any more. It is
195
131
% % only kept for backwards compatibility if 3rd-party code would rely
196
132
% % on it.
197
133
-spec unregister_tracked (rabbit_types :tracked_channel_id ()) -> ok .
198
134
unregister_tracked (ChId = {Node , _Name }) when Node == node () ->
199
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
200
- true -> unregister_tracked_ets (ChId );
201
- false -> unregister_tracked_mnesia (ChId )
202
- end .
203
-
204
- unregister_tracked_ets (ChId ) ->
205
- case get_tracked_channel_by_id_ets (ChId ) of
135
+ case get_tracked_channel_by_id (ChId ) of
206
136
[] -> ok ;
207
137
[# tracked_channel {pid = ChPid , username = Username }] ->
208
138
ets :update_counter (? TRACKED_CHANNEL_TABLE_PER_USER , Username , - 1 ),
209
139
ets :delete (? TRACKED_CHANNEL_TABLE , ChPid )
210
140
end .
211
141
212
- unregister_tracked_mnesia (ChId = {Node , _Name }) when Node =:= node () ->
213
- TableName = tracked_channel_table_name_for (Node ),
214
- PerUserChannelTableName = tracked_channel_per_user_table_name_for (Node ),
215
- case mnesia :dirty_read (TableName , ChId ) of
216
- [] -> ok ;
217
- [# tracked_channel {username = Username }] ->
218
- mnesia :dirty_update_counter (PerUserChannelTableName , Username , - 1 ),
219
- mnesia :dirty_delete (TableName , ChId )
220
- end .
221
-
222
142
-spec count_tracked_items_in ({atom (), rabbit_types :username ()}) -> non_neg_integer ().
223
143
224
- count_tracked_items_in (Type ) ->
225
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
226
- true -> count_tracked_items_in_ets (Type );
227
- false -> count_tracked_items_in_mnesia (Type )
228
- end .
229
-
230
- count_tracked_items_in_ets ({user , Username }) ->
144
+ count_tracked_items_in ({user , Username }) ->
231
145
rabbit_tracking :count_on_all_nodes (
232
146
? MODULE , count_local_tracked_items_of_user , [Username ],
233
147
[" channels of user " , Username ]).
@@ -236,20 +150,6 @@ count_tracked_items_in_ets({user, Username}) ->
236
150
count_local_tracked_items_of_user (Username ) ->
237
151
rabbit_tracking :read_ets_counter (? TRACKED_CHANNEL_TABLE_PER_USER , Username ).
238
152
239
- count_tracked_items_in_mnesia ({user , Username }) ->
240
- rabbit_tracking :count_tracked_items_mnesia (
241
- fun tracked_channel_per_user_table_name_for /1 ,
242
- # tracked_channel_per_user .channel_count , Username ,
243
- " channels of user" ).
244
-
245
- -spec clear_tracking_tables () -> ok .
246
-
247
- clear_tracking_tables () ->
248
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
249
- true -> ok ;
250
- false -> clear_tracked_channel_tables_for_this_node ()
251
- end .
252
-
253
153
-spec shutdown_tracked_items (list (), term ()) -> ok .
254
154
255
155
shutdown_tracked_items (TrackedItems , _Args ) ->
@@ -267,50 +167,18 @@ list() ->
267
167
-spec list_of_user (rabbit_types :username ()) -> [rabbit_types :tracked_channel ()].
268
168
269
169
list_of_user (Username ) ->
270
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
271
- true -> list_of_user_ets (Username );
272
- false -> list_of_user_mnesia (Username )
273
- end .
274
-
275
- list_of_user_ets (Username ) ->
276
- rabbit_tracking :match_tracked_items_ets (
170
+ rabbit_tracking :match_tracked_items (
277
171
? TRACKED_CHANNEL_TABLE ,
278
172
# tracked_channel {username = Username , _ = '_' }).
279
173
280
- list_of_user_mnesia (Username ) ->
281
- rabbit_tracking :match_tracked_items_mnesia (
282
- fun tracked_channel_table_name_for /1 ,
283
- # tracked_channel {username = Username , _ = '_' }).
284
-
285
174
-spec list_on_node (node ()) -> [rabbit_types :tracked_channel ()].
175
+ list_on_node (Node ) when Node == node () ->
176
+ ets :tab2list (? TRACKED_CHANNEL_TABLE );
286
177
list_on_node (Node ) ->
287
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
288
- true when Node == node () ->
289
- list_on_node_ets ();
290
- true ->
291
- case rabbit_misc :rpc_call (Node , ? MODULE , list_on_node , [Node ]) of
292
- List when is_list (List ) ->
293
- List ;
294
- _ ->
295
- []
296
- end ;
297
- false ->
298
- list_on_node_mnesia (Node )
299
- end .
300
-
301
- list_on_node_ets () ->
302
- ets :tab2list (? TRACKED_CHANNEL_TABLE ).
303
-
304
- list_on_node_mnesia (Node ) ->
305
- try mnesia :dirty_match_object (
306
- tracked_channel_table_name_for (Node ),
307
- # tracked_channel {_ = '_' })
308
- catch exit :{aborted , {no_exists , _ }} ->
309
- % % The table might not exist yet (or is already gone)
310
- % % between the time rabbit_nodes:list_running() runs and
311
- % % returns a specific node, and
312
- % % mnesia:dirty_match_object() is called for that node's
313
- % % table.
178
+ case rabbit_misc :rpc_call (Node , ? MODULE , list_on_node , [Node ]) of
179
+ List when is_list (List ) ->
180
+ List ;
181
+ _ ->
314
182
[]
315
183
end .
316
184
@@ -326,118 +194,32 @@ tracked_channel_per_user_table_name_for(Node) ->
326
194
" tracked_channel_table_per_user_on_node_~ts " , [Node ])).
327
195
328
196
ensure_tracked_tables_for_this_node () ->
329
- _ = ensure_tracked_channels_table_for_this_node_ets (),
330
- _ = ensure_per_user_tracked_channels_table_for_this_node_ets (),
197
+ _ = ensure_tracked_channels_table_for_this_node (),
198
+ _ = ensure_per_user_tracked_channels_table_for_this_node (),
331
199
ok .
332
200
333
- % % internal
334
- ensure_tracked_channels_table_for_this_node () ->
335
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
336
- true ->
337
- ok ;
338
- false ->
339
- ensure_tracked_channels_table_for_this_node_mnesia ()
340
- end .
341
-
342
- ensure_per_user_tracked_channels_table_for_node () ->
343
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
344
- true ->
345
- ok ;
346
- false ->
347
- ensure_per_user_tracked_channels_table_for_this_node_mnesia ()
348
- end .
349
-
350
201
% % Create tables
351
- ensure_tracked_channels_table_for_this_node_ets () ->
202
+ ensure_tracked_channels_table_for_this_node () ->
352
203
rabbit_log :info (" Setting up a table for channel tracking on this node: ~tp " ,
353
204
[? TRACKED_CHANNEL_TABLE ]),
354
205
ets :new (? TRACKED_CHANNEL_TABLE , [named_table , public , {write_concurrency , true },
355
206
{keypos , # tracked_channel .pid }]).
356
207
357
- ensure_tracked_channels_table_for_this_node_mnesia () ->
358
- Node = node (),
359
- TableName = tracked_channel_table_name_for (Node ),
360
- case mnesia :create_table (TableName , [{record_name , tracked_channel },
361
- {attributes , record_info (fields , tracked_channel )}]) of
362
- {atomic , ok } ->
363
- rabbit_log :info (" Setting up a table for channel tracking on this node: ~tp " ,
364
- [TableName ]),
365
- ok ;
366
- {aborted , {already_exists , _ }} ->
367
- rabbit_log :info (" Setting up a table for channel tracking on this node: ~tp " ,
368
- [TableName ]),
369
- ok ;
370
- {aborted , Error } ->
371
- rabbit_log :error (" Failed to create a tracked channel table for node ~tp : ~tp " , [Node , Error ]),
372
- ok
373
- end .
374
-
375
- ensure_per_user_tracked_channels_table_for_this_node_ets () ->
208
+ ensure_per_user_tracked_channels_table_for_this_node () ->
376
209
rabbit_log :info (" Setting up a table for channel tracking on this node: ~tp " ,
377
210
[? TRACKED_CHANNEL_TABLE_PER_USER ]),
378
211
ets :new (? TRACKED_CHANNEL_TABLE_PER_USER , [named_table , public , {write_concurrency , true }]).
379
212
380
- ensure_per_user_tracked_channels_table_for_this_node_mnesia () ->
381
- Node = node (),
382
- TableName = tracked_channel_per_user_table_name_for (Node ),
383
- case mnesia :create_table (TableName , [{record_name , tracked_channel_per_user },
384
- {attributes , record_info (fields , tracked_channel_per_user )}]) of
385
- {atomic , ok } ->
386
- rabbit_log :info (" Setting up a table for channel tracking on this node: ~tp " ,
387
- [TableName ]),
388
- ok ;
389
- {aborted , {already_exists , _ }} ->
390
- rabbit_log :info (" Setting up a table for channel tracking on this node: ~tp " ,
391
- [TableName ]),
392
- ok ;
393
- {aborted , Error } ->
394
- rabbit_log :error (" Failed to create a per-user tracked channel table for node ~tp : ~tp " , [Node , Error ]),
395
- ok
396
- end .
397
-
398
- clear_tracked_channel_tables_for_this_node () ->
399
- [rabbit_tracking :clear_tracking_table (T )
400
- || T <- get_all_tracked_channel_table_names_for_node (node ())].
401
-
402
- delete_tracked_channels_table_for_node (Node ) ->
403
- TableName = tracked_channel_table_name_for (Node ),
404
- rabbit_tracking :delete_tracking_table (TableName , Node , " tracked channel" ).
405
-
406
- delete_per_user_tracked_channels_table_for_node (Node ) ->
407
- TableName = tracked_channel_per_user_table_name_for (Node ),
408
- rabbit_tracking :delete_tracking_table (TableName , Node ,
409
- " per-user tracked channels" ).
410
-
411
- get_all_tracked_channel_table_names_for_node (Node ) ->
412
- [tracked_channel_table_name_for (Node ),
413
- tracked_channel_per_user_table_name_for (Node )].
414
-
415
213
get_tracked_channels_by_connection_pid (ConnPid ) ->
416
- case rabbit_feature_flags :is_enabled (tracking_records_in_ets ) of
417
- true -> get_tracked_channels_by_connection_pid_ets (ConnPid );
418
- false -> get_tracked_channels_by_connection_pid_mnesia (ConnPid )
419
- end .
420
-
421
- get_tracked_channels_by_connection_pid_ets (ConnPid ) ->
422
214
rabbit_tracking :match_tracked_items_local (
423
- ? TRACKED_CHANNEL_TABLE ,
424
- # tracked_channel {connection = ConnPid , _ = '_' }).
425
-
426
- get_tracked_channels_by_connection_pid_mnesia (ConnPid ) ->
427
- rabbit_tracking :match_tracked_items_mnesia (
428
- fun tracked_channel_table_name_for /1 ,
429
- # tracked_channel {connection = ConnPid , _ = '_' }).
215
+ ? TRACKED_CHANNEL_TABLE ,
216
+ # tracked_channel {connection = ConnPid , _ = '_' }).
430
217
431
- get_tracked_channel_by_id_ets (ChId ) ->
432
- rabbit_tracking :match_tracked_items_ets (
218
+ get_tracked_channel_by_id (ChId ) ->
219
+ rabbit_tracking :match_tracked_items (
433
220
? TRACKED_CHANNEL_TABLE ,
434
221
# tracked_channel {id = ChId , _ = '_' }).
435
222
436
- get_tracked_channel_by_pid_mnesia (ChPid ) ->
437
- rabbit_tracking :match_tracked_items_mnesia (
438
- fun tracked_channel_table_name_for /1 ,
439
- # tracked_channel {pid = ChPid , _ = '_' }).
440
-
441
223
delete_tracked_channel_user_entry (Username ) ->
442
224
rabbit_tracking :delete_tracked_entry (
443
225
{rabbit_auth_backend_internal , exists , [Username ]},
0 commit comments