Skip to content

Commit 1f5f396

Browse files
committed
fix(queues): requeue events against all-namespace queue if present
1 parent 9ffa1fd commit 1f5f396

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

pkg/controller/operators/olm/operator.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -702,13 +702,17 @@ func (a *Operator) syncObject(obj interface{}) (syncError error) {
702702
a.requeueOwnerCSVs(metaObj)
703703
} else {
704704
switch metaObj.(type) {
705-
case *rbacv1.ClusterRole, *rbacv1.ClusterRoleBinding:
705+
case *rbacv1.ClusterRole, *rbacv1.ClusterRoleBinding, *admissionregistrationv1.MutatingWebhookConfiguration, *admissionregistrationv1.ValidatingWebhookConfiguration:
706706
resourceEvent := kubestate.NewResourceEvent(
707707
kubestate.ResourceUpdated,
708708
metaObj,
709709
)
710-
syncError = a.objGCQueueSet.RequeueEvent(ns, resourceEvent)
711-
logger.Debugf("syncObject - requeued update event for %v, res=%v", resourceEvent, syncError)
710+
if syncError = a.objGCQueueSet.RequeueEvent(ns, resourceEvent); syncError != nil {
711+
logger.WithError(syncError).Warnf("failed to requeue gc event: %v", resourceEvent)
712+
}
713+
// if syncError = a.objGCQueueSet.RequeueEvent("", resourceEvent); syncError != nil {
714+
// logger.WithError(syncError).Warnf("failed to requeue gc event: %v", resourceEvent)
715+
// }
712716
return
713717
}
714718
}
@@ -950,17 +954,19 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
950954
logger.WithError(err).Warn("cannot list cluster role bindings")
951955
}
952956
for _, crb := range crbs {
953-
syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, crb))
954-
logger.Debugf("handleCSVdeletion - requeued update event for %v, res=%v", crb, syncError)
957+
if err := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, crb)); err != nil {
958+
logger.WithError(err).Warnf("failed to requeue gc event: %v", crb)
959+
}
955960
}
956961

957962
crs, err := a.lister.RbacV1().ClusterRoleLister().List(ownerSelector)
958963
if err != nil {
959964
logger.WithError(err).Warn("cannot list cluster roles")
960965
}
961966
for _, cr := range crs {
962-
syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, cr))
963-
logger.Debugf("handleCSVdeletion - requeued update event for %v, res=%v", cr, syncError)
967+
if err := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, cr)); err != nil {
968+
logger.WithError(err).Warnf("failed to requeue gc event: %v", cr)
969+
}
964970
}
965971

966972
webhookSelector := labels.SelectorFromSet(ownerutil.OwnerLabel(clusterServiceVersion, v1alpha1.ClusterServiceVersionKind)).String()
@@ -969,17 +975,21 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
969975
logger.WithError(err).Warn("cannot list MutatingWebhookConfigurations")
970976
}
971977
for _, webhook := range mWebhooks.Items {
972-
syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, &webhook))
973-
logger.Debugf("handleCSVdeletion - requeued update event for %v, res=%v", webhook, syncError)
978+
w := webhook
979+
if err := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, &w)); err != nil {
980+
logger.WithError(err).Warnf("failed to requeue gc event: %v", webhook)
981+
}
974982
}
975983

976984
vWebhooks, err := a.opClient.KubernetesInterface().AdmissionregistrationV1().ValidatingWebhookConfigurations().List(context.TODO(), metav1.ListOptions{LabelSelector: webhookSelector})
977985
if err != nil {
978986
logger.WithError(err).Warn("cannot list ValidatingWebhookConfigurations")
979987
}
980988
for _, webhook := range vWebhooks.Items {
981-
syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, &webhook))
982-
logger.Debugf("handleCSVdeletion - requeued update event for %v, res=%v", webhook, syncError)
989+
w := webhook
990+
if err := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, &w)); err != nil {
991+
logger.WithError(err).Warnf("failed to requeue gc event: %v", webhook)
992+
}
983993
}
984994
}
985995

pkg/lib/queueinformer/resourcequeue.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ func (r *ResourceQueueSet) RequeueEvent(namespace string, resourceEvent kubestat
3939
r.mutex.RLock()
4040
defer r.mutex.RUnlock()
4141

42+
if queue, ok := r.queueSet[metav1.NamespaceAll]; len(r.queueSet) == 1 && ok {
43+
queue.AddRateLimited(resourceEvent)
44+
return nil
45+
}
46+
4247
if queue, ok := r.queueSet[namespace]; ok {
4348
queue.AddRateLimited(resourceEvent)
4449
return nil

0 commit comments

Comments
 (0)