@@ -2,19 +2,22 @@ package reconciler
2
2
3
3
import (
4
4
"context"
5
+ "errors"
5
6
"fmt"
7
+
6
8
"strings"
7
9
"time"
8
10
9
11
"github.com/google/go-cmp/cmp"
10
12
"github.com/operator-framework/api/pkg/operators/v1alpha1"
11
- "github.com/pkg/errors"
13
+ pkgerrors "github.com/pkg/errors"
12
14
"github.com/sirupsen/logrus"
13
15
corev1 "k8s.io/api/core/v1"
14
16
apierrors "k8s.io/apimachinery/pkg/api/errors"
15
17
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
18
"k8s.io/apimachinery/pkg/labels"
17
19
"k8s.io/apimachinery/pkg/util/intstr"
20
+ "k8s.io/utils/ptr"
18
21
19
22
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
20
23
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
@@ -262,7 +265,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata
262
265
//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
263
266
sa , err := c .ensureSA (source )
264
267
if err != nil && ! apierrors .IsAlreadyExists (err ) {
265
- return errors .Wrapf (err , "error ensuring service account: %s" , source .GetName ())
268
+ return pkgerrors .Wrapf (err , "error ensuring service account: %s" , source .GetName ())
266
269
}
267
270
268
271
sa , err = c .OpClient .GetServiceAccount (sa .GetNamespace (), sa .GetName ())
@@ -285,20 +288,20 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata
285
288
return err
286
289
}
287
290
if err := c .ensurePod (logger , source , sa , overwritePod ); err != nil {
288
- return errors .Wrapf (err , "error ensuring pod: %s" , pod .GetName ())
291
+ return pkgerrors .Wrapf (err , "error ensuring pod: %s" , pod .GetName ())
289
292
}
290
293
if err := c .ensureUpdatePod (logger , sa , source ); err != nil {
291
294
if _ , ok := err .(UpdateNotReadyErr ); ok {
292
295
return err
293
296
}
294
- return errors .Wrapf (err , "error ensuring updated catalog source pod: %s" , pod .GetName ())
297
+ return pkgerrors .Wrapf (err , "error ensuring updated catalog source pod: %s" , pod .GetName ())
295
298
}
296
299
service , err := source .Service ()
297
300
if err != nil {
298
301
return err
299
302
}
300
303
if err := c .ensureService (source , overwrite ); err != nil {
301
- return errors .Wrapf (err , "error ensuring service: %s" , service .GetName ())
304
+ return pkgerrors .Wrapf (err , "error ensuring service: %s" , service .GetName ())
302
305
}
303
306
304
307
if overwritePod {
@@ -338,16 +341,41 @@ func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) (bool, err
338
341
}
339
342
340
343
func (c * GrpcRegistryReconciler ) ensurePod (logger * logrus.Entry , source grpcCatalogSourceDecorator , serviceAccount * corev1.ServiceAccount , overwrite bool ) error {
341
- // currentLivePods refers to the currently live instances of the catalog source
342
- currentLivePods := c .currentPods (logger , source )
343
- if len (currentLivePods ) > 0 {
344
+ // currentPods refers to the current pod instances of the catalog source
345
+ currentPods := c .currentPods (logger , source )
346
+
347
+ var forceDeleteErrs []error
348
+ // Remove dead pods from the slice without allocating a new slice
349
+ // See https://go.dev/wiki/SliceTricks#filtering-without-allocating
350
+ tmpSlice := currentPods [:0 ]
351
+ for _ , pod := range currentPods {
352
+ if ! isPodDead (pod ) {
353
+ logger .WithFields (logrus.Fields {"pod.namespace" : source .GetNamespace (), "pod.name" : pod .GetName ()}).Debug ("pod is alive" )
354
+ tmpSlice = append (tmpSlice , pod )
355
+ continue
356
+ }
357
+
358
+ logger .WithFields (logrus.Fields {"pod.namespace" : source .GetNamespace (), "pod.name" : pod .GetName ()}).Info ("force deleting dead pod" )
359
+ if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Delete (context .TODO (), pod .GetName (), metav1.DeleteOptions {
360
+ GracePeriodSeconds : ptr.To [int64 ](0 ),
361
+ }); err != nil && ! apierrors .IsNotFound (err ) {
362
+ forceDeleteErrs = append (forceDeleteErrs , pkgerrors .Wrapf (err , "error deleting old pod: %s" , pod .GetName ()))
363
+ }
364
+ }
365
+ currentPods = tmpSlice
366
+
367
+ if len (forceDeleteErrs ) > 0 {
368
+ return errors .Join (forceDeleteErrs ... )
369
+ }
370
+
371
+ if len (currentPods ) > 0 {
344
372
if ! overwrite {
345
373
return nil
346
374
}
347
- for _ , p := range currentLivePods {
375
+ for _ , p := range currentPods {
348
376
logger .WithFields (logrus.Fields {"pod.namespace" : source .GetNamespace (), "pod.name" : p .GetName ()}).Info ("deleting current pod" )
349
377
if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Delete (context .TODO (), p .GetName (), * metav1 .NewDeleteOptions (1 )); err != nil && ! apierrors .IsNotFound (err ) {
350
- return errors .Wrapf (err , "error deleting old pod: %s" , p .GetName ())
378
+ return pkgerrors .Wrapf (err , "error deleting old pod: %s" , p .GetName ())
351
379
}
352
380
}
353
381
}
@@ -358,7 +386,7 @@ func (c *GrpcRegistryReconciler) ensurePod(logger *logrus.Entry, source grpcCata
358
386
logger .WithFields (logrus.Fields {"pod.namespace" : source .GetNamespace (), "pod.name" : desiredPod .Namespace }).Info ("deleting current pod" )
359
387
_ , err = c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Create (context .TODO (), desiredPod , metav1.CreateOptions {})
360
388
if err != nil {
361
- return errors .Wrapf (err , "error creating new pod: %s" , desiredPod .GetGenerateName ())
389
+ return pkgerrors .Wrapf (err , "error creating new pod: %s" , desiredPod .GetGenerateName ())
362
390
}
363
391
364
392
return nil
@@ -378,7 +406,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(logger *logrus.Entry, serviceAc
378
406
logger .Infof ("catalog update required at %s" , time .Now ().String ())
379
407
pod , err := c .createUpdatePod (source , serviceAccount )
380
408
if err != nil {
381
- return errors .Wrapf (err , "creating update catalog source pod" )
409
+ return pkgerrors .Wrapf (err , "creating update catalog source pod" )
382
410
}
383
411
source .SetLastUpdateTime ()
384
412
return UpdateNotReadyErr {catalogName : source .GetName (), podName : pod .GetName ()}
@@ -410,7 +438,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(logger *logrus.Entry, serviceAc
410
438
for _ , p := range currentLivePods {
411
439
logger .WithFields (logrus.Fields {"live-pod.namespace" : source .GetNamespace (), "live-pod.name" : p .Name }).Info ("deleting current live pods" )
412
440
if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Delete (context .TODO (), p .GetName (), * metav1 .NewDeleteOptions (1 )); err != nil && ! apierrors .IsNotFound (err ) {
413
- return errors .Wrapf (errors .Wrapf (err , "error deleting pod: %s" , p .GetName ()), "detected imageID change: error deleting old catalog source pod" )
441
+ return pkgerrors .Wrapf (pkgerrors .Wrapf (err , "error deleting pod: %s" , p .GetName ()), "detected imageID change: error deleting old catalog source pod" )
414
442
}
415
443
}
416
444
// done syncing
@@ -420,7 +448,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(logger *logrus.Entry, serviceAc
420
448
// delete update pod right away, since the digest match, to prevent long-lived duplicate catalog pods
421
449
logger .WithFields (logrus.Fields {"update-pod.namespace" : updatePod .Namespace , "update-pod.name" : updatePod .Name }).Debug ("catalog polling result: no update; removing duplicate update pod" )
422
450
if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Delete (context .TODO (), updatePod .GetName (), * metav1 .NewDeleteOptions (1 )); err != nil && ! apierrors .IsNotFound (err ) {
423
- return errors .Wrapf (errors .Wrapf (err , "error deleting pod: %s" , updatePod .GetName ()), "duplicate catalog polling pod" )
451
+ return pkgerrors .Wrapf (pkgerrors .Wrapf (err , "error deleting pod: %s" , updatePod .GetName ()), "duplicate catalog polling pod" )
424
452
}
425
453
}
426
454
@@ -523,6 +551,29 @@ func imageChanged(logger *logrus.Entry, updatePod *corev1.Pod, servingPods []*co
523
551
return false
524
552
}
525
553
554
+ func isPodDead (pod * corev1.Pod ) bool {
555
+ for _ , check := range []func (* corev1.Pod ) bool {
556
+ isPodDeletedByTaintManager ,
557
+ } {
558
+ if check (pod ) {
559
+ return true
560
+ }
561
+ }
562
+ return false
563
+ }
564
+
565
+ func isPodDeletedByTaintManager (pod * corev1.Pod ) bool {
566
+ if pod .DeletionTimestamp == nil {
567
+ return false
568
+ }
569
+ for _ , condition := range pod .Status .Conditions {
570
+ if condition .Type == corev1 .DisruptionTarget && condition .Reason == "DeletionByTaintManager" && condition .Status == corev1 .ConditionTrue {
571
+ return true
572
+ }
573
+ }
574
+ return false
575
+ }
576
+
526
577
// imageID returns the ImageID of the primary catalog source container or an empty string if the image ID isn't available yet.
527
578
// Note: the pod must be running and the container in a ready status to return a valid ImageID.
528
579
func imageID (pod * corev1.Pod ) string {
@@ -545,7 +596,7 @@ func imageID(pod *corev1.Pod) string {
545
596
func (c * GrpcRegistryReconciler ) removePods (pods []* corev1.Pod , namespace string ) error {
546
597
for _ , p := range pods {
547
598
if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (namespace ).Delete (context .TODO (), p .GetName (), * metav1 .NewDeleteOptions (1 )); err != nil && ! apierrors .IsNotFound (err ) {
548
- return errors .Wrapf (err , "error deleting pod: %s" , p .GetName ())
599
+ return pkgerrors .Wrapf (err , "error deleting pod: %s" , p .GetName ())
549
600
}
550
601
}
551
602
return nil
@@ -623,7 +674,7 @@ func (c *GrpcRegistryReconciler) podFailed(pod *corev1.Pod) (bool, error) {
623
674
logrus .WithField ("UpdatePod" , pod .GetName ()).Infof ("catalog polling result: update pod %s failed to start" , pod .GetName ())
624
675
err := c .removePods ([]* corev1.Pod {pod }, pod .GetNamespace ())
625
676
if err != nil {
626
- return true , errors .Wrapf (err , "error deleting failed catalog polling pod: %s" , pod .GetName ())
677
+ return true , pkgerrors .Wrapf (err , "error deleting failed catalog polling pod: %s" , pod .GetName ())
627
678
}
628
679
return true , nil
629
680
}
0 commit comments