Skip to content

Commit d1795f1

Browse files
simonkrengerPer Goncalves da Silva
authored andcommitted
catalog-operator: Delete Pods that were evicted (operator-framework#3459)
This change adds another reason why a Pod could be detected as "dead", namely when it was evicted by the kubelet. This can happen when there is resource pressure on the Node. Then the reason will be "TerminationByKubelet". This addresses the issue described in https://issues.redhat.com/browse/OCPBUGS-45490 Signed-off-by: Simon Krenger <[email protected]> Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent 3911cac commit d1795f1

File tree

6 files changed

+67
-20
lines changed

6 files changed

+67
-20
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
238238
Namespace: csv.Namespace,
239239
Labels: csv.Labels,
240240
Annotations: csv.Annotations,
241+
UID: csv.UID,
241242
},
242243
Spec: v1alpha1.ClusterServiceVersionSpec{
243244
CustomResourceDefinitions: csv.Spec.CustomResourceDefinitions,
@@ -745,16 +746,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
745746
// Namespace sync for resolving subscriptions
746747
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
747748
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
748-
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
749+
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](
750+
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 30*time.Second),
749751
workqueue.TypedRateLimitingQueueConfig[any]{
750752
Name: "resolve",
751753
})
754+
//op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
755+
// workqueue.TypedRateLimitingQueueConfig[any]{
756+
// Name: "resolve",
757+
// })
752758
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
753759
ctx,
754760
queueinformer.WithLogger(op.logger),
755761
queueinformer.WithQueue(op.nsResolveQueue),
756762
queueinformer.WithInformer(namespaceInformer.Informer()),
757-
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncResolvingNamespace).ToSyncer()),
763+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncResolvingNamespace).ToSyncerWithDelete(op.handleDeletion)),
758764
)
759765
if err != nil {
760766
return nil, err
@@ -1313,6 +1319,9 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
13131319
// from users/admins. Resyncing the namespace again is unlikely to resolve
13141320
// not-satisfiable error
13151321
if _, ok := err.(solver.NotSatisfiable); ok {
1322+
if err := o.ResyncInformers(); err != nil {
1323+
logger.WithError(err).Infof("error resyncing informers")
1324+
}
13161325
logger.WithError(err).Debug("resolution failed")
13171326
_, updateErr := o.updateSubscriptionStatuses(
13181327
o.setSubsCond(subs, v1alpha1.SubscriptionCondition{
@@ -1325,7 +1334,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
13251334
logger.WithError(updateErr).Debug("failed to update subs conditions")
13261335
return updateErr
13271336
}
1328-
return nil
1337+
return err
13291338
}
13301339

13311340
_, updateErr := o.updateSubscriptionStatuses(
@@ -1736,7 +1745,8 @@ func (o *Operator) setSubsCond(subs []*v1alpha1.Subscription, cond v1alpha1.Subs
17361745

17371746
for _, sub := range subs {
17381747
subCond := sub.Status.GetCondition(cond.Type)
1739-
if subCond.Equals(cond) {
1748+
1749+
if subCond.Type == cond.Type && subCond.Status == cond.Status && subCond.Reason == cond.Reason {
17401750
continue
17411751
}
17421752
sub.Status.LastUpdated = lastUpdated

pkg/controller/registry/reconciler/grpc.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ func imageChanged(logger *logrus.Entry, updatePod *corev1.Pod, servingPods []*co
531531
func isPodDead(pod *corev1.Pod) bool {
532532
for _, check := range []func(*corev1.Pod) bool{
533533
isPodDeletedByTaintManager,
534+
isPodTerminatedByKubelet,
534535
} {
535536
if check(pod) {
536537
return true
@@ -551,6 +552,19 @@ func isPodDeletedByTaintManager(pod *corev1.Pod) bool {
551552
return false
552553
}
553554

555+
// This reason is set when the Pod was evicted due to resource pressure on the Node
556+
func isPodTerminatedByKubelet(pod *corev1.Pod) bool {
557+
if pod.DeletionTimestamp == nil {
558+
return false
559+
}
560+
for _, condition := range pod.Status.Conditions {
561+
if condition.Type == corev1.DisruptionTarget && condition.Reason == "TerminationByKubelet" && condition.Status == corev1.ConditionTrue {
562+
return true
563+
}
564+
}
565+
return false
566+
}
567+
554568
// imageID returns the ImageID of the primary catalog source container or an empty string if the image ID isn't available yet.
555569
// Note: the pod must be running and the container in a ready status to return a valid ImageID.
556570
func imageID(pod *corev1.Pod) string {

pkg/controller/registry/resolver/source_csvs.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ func (csp *csvSourceProvider) Sources(namespaces ...string) map[cache.SourceKey]
4040
listSubscriptions: func(ctx context.Context) (*v1alpha1.SubscriptionList, error) {
4141
return csp.client.OperatorsV1alpha1().Subscriptions(namespace).List(ctx, metav1.ListOptions{})
4242
},
43+
//getCSV: func(ctx context.Context, namespace string, name string) (*v1alpha1.ClusterServiceVersion, error) {
44+
// return csp.client.OperatorsV1alpha1().ClusterServiceVersions(namespace).Get(ctx, name, metav1.GetOptions{})
45+
//},
4346
}
4447
break // first ns is assumed to be the target ns, todo: make explicit
4548
}
@@ -54,6 +57,7 @@ type csvSource struct {
5457
logger logrus.StdLogger
5558

5659
listSubscriptions func(context.Context) (*v1alpha1.SubscriptionList, error)
60+
// getCSV func(ctx context.Context, namespace string, name string) (*v1alpha1.ClusterServiceVersion, error)
5761
}
5862

5963
func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
@@ -93,19 +97,26 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
9397
continue
9498
}
9599

96-
if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil {
97-
// we might be in an incoherent state, so let's check with live clients to make sure
98-
realSubscriptions, err := s.listSubscriptions(ctx)
99-
if err != nil {
100-
return nil, fmt.Errorf("failed to list subscriptions: %w", err)
101-
}
102-
for _, realSubscription := range realSubscriptions.Items {
103-
if realSubscription.Status.InstalledCSV == csv.Name {
104-
// oops, live cluster state is coherent
105-
return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name)
106-
}
107-
}
108-
}
100+
//if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil {
101+
// // we might be in an incoherent state, so let's check with live clients to make sure
102+
// realSubscriptions, err := s.listSubscriptions(ctx)
103+
// if err != nil {
104+
// return nil, fmt.Errorf("failed to list subscriptions: %w", err)
105+
// }
106+
// for _, realSubscription := range realSubscriptions.Items {
107+
// if realSubscription.Status.InstalledCSV == csv.Name {
108+
// // oops, live cluster state is coherent
109+
// return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name)
110+
// }
111+
// }
112+
// realCsv, err := s.getCSV(ctx, csv.Namespace, csv.Name)
113+
// if err != nil {
114+
// return nil, fmt.Errorf("lister caches might be incoherent for CSV %s/%s: %w", csv.Namespace, csv.Name, err)
115+
// }
116+
// if realCsv.GetUID() != csv.GetUID() {
117+
// return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s: differing UIDs (%s != %s)", csv.Namespace, csv.Name, csv.UID)
118+
// }
119+
//}
109120

110121
if failForwardEnabled {
111122
replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs))

pkg/controller/registry/resolver/step_resolver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
9191
}
9292

9393
func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
94-
subs, err := r.listSubscriptions(namespace)
94+
subs, err := r.subLister.Subscriptions(namespace).List(labels.Everything())
9595
if err != nil {
9696
return nil, nil, nil, err
9797
}

pkg/lib/queueinformer/queueinformer.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package queueinformer
22

33
import (
44
"context"
5-
65
"github.com/pkg/errors"
76
"github.com/sirupsen/logrus"
87
"k8s.io/client-go/tools/cache"

pkg/lib/queueinformer/queueinformer_operator.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type Operator interface {
5454
// RunInformers starts the Operator's underlying Informers.
5555
RunInformers(ctx context.Context)
5656

57+
ResyncInformers() error
58+
5759
// Run starts the Operator and its underlying Informers.
5860
Run(ctx context.Context)
5961
}
@@ -177,6 +179,17 @@ func (o *operator) RunInformers(ctx context.Context) {
177179
})
178180
}
179181

182+
func (o *operator) ResyncInformers() error {
183+
o.mu.Lock()
184+
defer o.mu.Unlock()
185+
for _, informer := range o.informers {
186+
if err := informer.GetStore().Resync(); err != nil {
187+
return err
188+
}
189+
}
190+
return nil
191+
}
192+
180193
// Run starts the operator's control loops.
181194
func (o *operator) Run(ctx context.Context) {
182195
o.reconcileOnce.Do(func() {
@@ -314,7 +327,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
314327

315328
// Sync and requeue on error (throw out failed deletion syncs)
316329
err := loop.Sync(ctx, event)
317-
if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && event.Type() != kubestate.ResourceDeleted {
330+
if requeues := queue.NumRequeues(item); err != nil && requeues < 15 && event.Type() != kubestate.ResourceDeleted {
318331
logger.WithField("requeues", requeues).Trace("requeuing with rate limiting")
319332
utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", item)))
320333
queue.AddRateLimited(item)

0 commit comments

Comments
 (0)