@@ -108,12 +108,16 @@ warn_file_limit() ->
108
108
ok
109
109
end .
110
110
111
- -spec recover (rabbit_types :vhost ()) -> [amqqueue :amqqueue ()].
111
+ -spec recover (rabbit_types :vhost ()) ->
112
+ {RecoveredClassic :: [amqqueue :amqqueue ()],
113
+ FailedClassic :: [amqqueue :amqqueue ()],
114
+ Quorum :: [amqqueue :amqqueue ()]}.
112
115
113
116
recover (VHost ) ->
114
- Classic = find_local_durable_classic_queues (VHost ),
117
+ AllClassic = find_local_durable_classic_queues (VHost ),
115
118
Quorum = find_local_quorum_queues (VHost ),
116
- recover_classic_queues (VHost , Classic ) ++ rabbit_quorum_queue :recover (Quorum ).
119
+ {RecoveredClassic , FailedClassic } = recover_classic_queues (VHost , AllClassic ),
120
+ {RecoveredClassic , FailedClassic , rabbit_quorum_queue :recover (Quorum )}.
117
121
118
122
recover_classic_queues (VHost , Queues ) ->
119
123
{ok , BQ } = application :get_env (rabbit , backing_queue_module ),
@@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) ->
124
128
BQ :start (VHost , [amqqueue :get_name (Q ) || Q <- Queues ]),
125
129
case rabbit_amqqueue_sup_sup :start_for_vhost (VHost ) of
126
130
{ok , _ } ->
127
- recover_durable_queues (lists :zip (Queues , OrderedRecoveryTerms ));
131
+ RecoveredQs = recover_durable_queues (lists :zip (Queues , OrderedRecoveryTerms )),
132
+ RecoveredNames = [amqqueue :get_name (Q ) || Q <- RecoveredQs ],
133
+ FailedQueues = [Q || Q <- Queues ,
134
+ not lists :member (amqqueue :get_name (Q ), RecoveredNames )],
135
+ {RecoveredQs , FailedQueues };
128
136
{error , Reason } ->
129
137
rabbit_log :error (" Failed to start queue supervisor for vhost '~s ': ~s " , [VHost , Reason ]),
130
138
throw ({error , Reason })
131
139
end .
132
140
133
- filter_per_type (Queues ) ->
134
- lists :partition (fun (Q ) -> amqqueue :is_classic (Q ) end , Queues ).
135
-
136
141
filter_pid_per_type (QPids ) ->
137
142
lists :partition (fun (QPid ) -> ? IS_CLASSIC (QPid ) end , QPids ).
138
143
@@ -156,12 +161,14 @@ stop(VHost) ->
156
161
- spec start ([amqqueue :amqqueue ()]) -> 'ok' .
157
162
158
163
start (Qs ) ->
159
- {Classic , _Quorum } = filter_per_type (Qs ),
160
164
% % At this point all recovered queues and their bindings are
161
165
% % visible to routing, so now it is safe for them to complete
162
166
% % their initialisation (which may involve interacting with other
163
167
% % queues).
164
- _ = [amqqueue :get_pid (Q ) ! {self (), go } || Q <- Classic ],
168
+ _ = [amqqueue :get_pid (Q ) ! {self (), go }
169
+ || Q <- Qs ,
170
+ % % All queues are supposed to be classic here.
171
+ amqqueue :is_classic (Q )],
165
172
ok .
166
173
167
174
mark_local_durable_queues_stopped (VHost ) ->
0 commit comments