Skip to content

Commit 4ff31a4

Browse files
Merge pull request #947 from perdasilva/workqueue-fix-416
[release-4.16] OCPBUGS-48661: Fix concurrent namespace resolution
2 parents 1b927a4 + f0b6089 commit 4ff31a4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+802
-1153
lines changed

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/runtime"
3131
"k8s.io/apimachinery/pkg/runtime/schema"
3232
"k8s.io/apimachinery/pkg/selection"
33+
"k8s.io/apimachinery/pkg/types"
3334
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3435
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3536
"k8s.io/apimachinery/pkg/util/sets"
@@ -269,7 +270,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
269270
// Wire InstallPlans
270271
ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans()
271272
op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister())
272-
ipQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ips")
273+
ipQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
274+
workqueue.RateLimitingQueueConfig{
275+
Name: "ips",
276+
})
273277
op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue)
274278
ipQueueInformer, err := queueinformer.NewQueueInformer(
275279
ctx,
@@ -288,7 +292,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
288292

289293
operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
290294
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
291-
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ogs")
295+
ogQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
296+
workqueue.RateLimitingQueueConfig{
297+
Name: "ogs",
298+
})
292299
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
293300
operatorGroupQueueInformer, err := queueinformer.NewQueueInformer(
294301
ctx,
@@ -307,15 +314,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
307314
// Wire CatalogSources
308315
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
309316
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
310-
catsrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "catsrcs")
317+
catsrcQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
318+
workqueue.RateLimitingQueueConfig{
319+
Name: "catsrcs",
320+
})
311321
op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue)
312322
catsrcQueueInformer, err := queueinformer.NewQueueInformer(
313323
ctx,
314324
queueinformer.WithMetricsProvider(metrics.NewMetricsCatalogSource(op.lister.OperatorsV1alpha1().CatalogSourceLister())),
315325
queueinformer.WithLogger(op.logger),
316326
queueinformer.WithQueue(catsrcQueue),
317327
queueinformer.WithInformer(catsrcInformer.Informer()),
318-
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncerWithDelete(op.handleCatSrcDeletion)),
328+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncer()),
329+
queueinformer.WithDeletionHandler(op.handleCatSrcDeletion),
319330
)
320331
if err != nil {
321332
return nil, err
@@ -333,7 +344,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
333344
subIndexer := subInformer.Informer().GetIndexer()
334345
op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer
335346

336-
subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "subs")
347+
subQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
348+
workqueue.RateLimitingQueueConfig{
349+
Name: "subs",
350+
})
337351
op.subQueueSet.Set(metav1.NamespaceAll, subQueue)
338352
subSyncer, err := subscription.NewSyncer(
339353
ctx,
@@ -344,7 +358,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
344358
subscription.WithCatalogInformer(catsrcInformer.Informer()),
345359
subscription.WithInstallPlanInformer(ipInformer.Informer()),
346360
subscription.WithSubscriptionQueue(subQueue),
347-
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions, nil)),
361+
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)),
348362
subscription.WithRegistryReconcilerFactory(op.reconciler),
349363
subscription.WithGlobalCatalogNamespace(op.namespace),
350364
subscription.WithSourceProvider(op.sourceInvalidator),
@@ -659,13 +673,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
659673
}
660674

661675
// Generate and register QueueInformers for k8s resources
662-
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
676+
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()
663677
for _, informer := range sharedIndexInformers {
664678
queueInformer, err := queueinformer.NewQueueInformer(
665679
ctx,
666680
queueinformer.WithLogger(op.logger),
667681
queueinformer.WithInformer(informer),
668682
queueinformer.WithSyncer(k8sSyncer),
683+
queueinformer.WithDeletionHandler(op.handleDeletion),
669684
)
670685
if err != nil {
671686
return nil, err
@@ -713,7 +728,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
713728
ctx,
714729
queueinformer.WithLogger(op.logger),
715730
queueinformer.WithInformer(crdInformer),
716-
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)),
731+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()),
732+
queueinformer.WithDeletionHandler(op.handleDeletion),
717733
)
718734
if err != nil {
719735
return nil, err
@@ -734,7 +750,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
734750
// Namespace sync for resolving subscriptions
735751
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
736752
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
737-
op.nsResolveQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver")
753+
op.nsResolveQueue = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
754+
workqueue.RateLimitingQueueConfig{
755+
Name: "resolve",
756+
})
738757
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
739758
ctx,
740759
queueinformer.WithLogger(op.logger),
@@ -773,12 +792,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
773792

774793
if err == nil {
775794
for ns := range namespaces {
776-
o.nsResolveQueue.Add(ns)
795+
o.nsResolveQueue.Add(types.NamespacedName{Name: ns})
777796
}
778797
}
779798
}
780799

781-
o.nsResolveQueue.Add(state.Key.Namespace)
800+
o.nsResolveQueue.Add(types.NamespacedName{Name: state.Key.Namespace})
782801
}
783802
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
784803
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
@@ -859,18 +878,16 @@ func (o *Operator) handleDeletion(obj interface{}) {
859878
func (o *Operator) handleCatSrcDeletion(obj interface{}) {
860879
catsrc, ok := obj.(metav1.Object)
861880
if !ok {
881+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
862882
if !ok {
863-
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
864-
if !ok {
865-
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
866-
return
867-
}
883+
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
884+
return
885+
}
868886

869-
catsrc, ok = tombstone.Obj.(metav1.Object)
870-
if !ok {
871-
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
872-
return
873-
}
887+
catsrc, ok = tombstone.Obj.(metav1.Object)
888+
if !ok {
889+
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
890+
return
874891
}
875892
}
876893
sourceKey := registry.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
@@ -1397,7 +1414,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
13971414
}
13981415

13991416
logger.Info("unpacking is not complete yet, requeueing")
1400-
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
1417+
o.nsResolveQueue.AddAfter(types.NamespacedName{Name: namespace}, 5*time.Second)
14011418
return nil
14021419
}
14031420
}
@@ -1479,7 +1496,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
14791496
return fmt.Errorf("casting Subscription failed")
14801497
}
14811498

1482-
o.nsResolveQueue.Add(sub.GetNamespace())
1499+
o.nsResolveQueue.Add(types.NamespacedName{Name: sub.GetNamespace()})
14831500

14841501
return nil
14851502
}
@@ -1493,7 +1510,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
14931510
return fmt.Errorf("casting OperatorGroup failed")
14941511
}
14951512

1496-
o.nsResolveQueue.Add(og.GetNamespace())
1513+
o.nsResolveQueue.Add(types.NamespacedName{Name: og.GetNamespace()})
14971514

14981515
return nil
14991516
}

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2156,12 +2156,15 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
21562156
client: clientFake,
21572157
lister: lister,
21582158
namespace: namespace,
2159-
nsResolveQueue: workqueue.NewNamedRateLimitingQueue(
2159+
nsResolveQueue: workqueue.NewRateLimitingQueueWithConfig(
21602160
workqueue.NewMaxOfRateLimiter(
21612161
workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 1000*time.Second),
21622162
// 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
21632163
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
2164-
), "resolver"),
2164+
),
2165+
workqueue.RateLimitingQueueConfig{
2166+
Name: "resolver",
2167+
}),
21652168
resolver: config.resolver,
21662169
reconciler: config.reconciler,
21672170
recorder: config.recorder,

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,14 @@ import (
2828

2929
// ReconcilerFromLegacySyncHandler returns a reconciler that invokes the given legacy sync handler and on delete funcs.
3030
// Since the reconciler does not return an updated kubestate, it MUST be the last reconciler in a given chain.
31-
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler, onDelete func(obj interface{})) kubestate.Reconciler {
31+
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler) kubestate.Reconciler {
3232
var rec kubestate.ReconcilerFunc = func(ctx context.Context, in kubestate.State) (out kubestate.State, err error) {
3333
out = in
3434
switch s := in.(type) {
3535
case SubscriptionExistsState:
3636
if sync != nil {
3737
err = sync(s.Subscription())
3838
}
39-
case SubscriptionDeletedState:
40-
if onDelete != nil {
41-
onDelete(s.Subscription())
42-
}
4339
case SubscriptionState:
4440
if sync != nil {
4541
err = sync(s.Subscription())

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/state.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ type SubscriptionState interface {
2525
Subscription() *v1alpha1.Subscription
2626
Add() SubscriptionExistsState
2727
Update() SubscriptionExistsState
28-
Delete() SubscriptionDeletedState
2928
}
3029

3130
// SubscriptionExistsState describes subscription states in which the subscription exists on the cluster.
@@ -49,13 +48,6 @@ type SubscriptionUpdatedState interface {
4948
isSubscriptionUpdatedState()
5049
}
5150

52-
// SubscriptionDeletedState describes subscription states in which the subscription no longer exists and was deleted from the cluster.
53-
type SubscriptionDeletedState interface {
54-
SubscriptionState
55-
56-
isSubscriptionDeletedState()
57-
}
58-
5951
// CatalogHealthState describes subscription states that represent a subscription with respect to catalog health.
6052
type CatalogHealthState interface {
6153
SubscriptionExistsState
@@ -176,12 +168,6 @@ func (s *subscriptionState) Update() SubscriptionExistsState {
176168
}
177169
}
178170

179-
func (s *subscriptionState) Delete() SubscriptionDeletedState {
180-
return &subscriptionDeletedState{
181-
SubscriptionState: s,
182-
}
183-
}
184-
185171
func NewSubscriptionState(sub *v1alpha1.Subscription) SubscriptionState {
186172
return &subscriptionState{
187173
State: kubestate.NewState(),
@@ -207,12 +193,6 @@ type subscriptionUpdatedState struct {
207193

208194
func (c *subscriptionUpdatedState) isSubscriptionUpdatedState() {}
209195

210-
type subscriptionDeletedState struct {
211-
SubscriptionState
212-
}
213-
214-
func (c *subscriptionDeletedState) isSubscriptionDeletedState() {}
215-
216196
type catalogHealthState struct {
217197
SubscriptionExistsState
218198
}

0 commit comments

Comments
 (0)