@@ -41,6 +41,13 @@ const (
41
41
// e.g 1m30s
42
42
BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout"
43
43
BundleUnpackPodLabel = "job-name"
44
+
45
+ // BundleUnpackRetryMinimumIntervalAnnotationKey sets a minimum interval to wait before
46
+ // attempting to recreate a failed unpack job for a bundle.
47
+ BundleUnpackRetryMinimumIntervalAnnotationKey = "operatorframework.io/bundle-unpack-min-retry-interval"
48
+
49
+ // bundleUnpackRefLabel is used to filter for all unpack jobs for a specific bundle.
50
+ bundleUnpackRefLabel = "operatorframework.io/bundle-unpack-ref"
44
51
)
45
52
46
53
type BundleUnpackResult struct {
@@ -239,6 +246,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
239
246
}
240
247
job .SetNamespace (cmRef .Namespace )
241
248
job .SetName (cmRef .Name )
249
+ job .SetLabels (map [string ]string {bundleUnpackRefLabel : cmRef .Name })
242
250
job .SetOwnerReferences ([]metav1.OwnerReference {ownerRef (cmRef )})
243
251
if c .runAsUser > 0 {
244
252
job .Spec .Template .Spec .SecurityContext .RunAsUser = & c .runAsUser
@@ -279,7 +287,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
279
287
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Unpacker
280
288
281
289
type Unpacker interface {
282
- UnpackBundle (lookup * operatorsv1alpha1.BundleLookup , timeout time.Duration ) (result * BundleUnpackResult , err error )
290
+ UnpackBundle (lookup * operatorsv1alpha1.BundleLookup , timeout , retryInterval time.Duration ) (result * BundleUnpackResult , err error )
283
291
}
284
292
285
293
type ConfigMapUnpacker struct {
@@ -440,7 +448,7 @@ const (
440
448
NotUnpackedMessage = "bundle contents have not yet been persisted to installplan status"
441
449
)
442
450
443
- func (c * ConfigMapUnpacker ) UnpackBundle (lookup * operatorsv1alpha1.BundleLookup , timeout time.Duration ) (result * BundleUnpackResult , err error ) {
451
+ func (c * ConfigMapUnpacker ) UnpackBundle (lookup * operatorsv1alpha1.BundleLookup , timeout , retryInterval time.Duration ) (result * BundleUnpackResult , err error ) {
444
452
result = newBundleUnpackResult (lookup )
445
453
446
454
// if bundle lookup failed condition already present, then there is nothing more to do
@@ -502,7 +510,7 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup,
502
510
secrets = append (secrets , corev1.LocalObjectReference {Name : secretName })
503
511
}
504
512
var job * batchv1.Job
505
- job , err = c .ensureJob (cmRef , result .Path , secrets , timeout )
513
+ job , err = c .ensureJob (cmRef , result .Path , secrets , timeout , retryInterval )
506
514
if err != nil || job == nil {
507
515
// ensureJob can return nil if the job present does not match the expected job (spec and ownerefs)
508
516
// The current job is deleted in that case so UnpackBundle needs to be retried
@@ -641,7 +649,7 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name
641
649
return
642
650
}
643
651
644
- func (c * ConfigMapUnpacker ) ensureJob (cmRef * corev1.ObjectReference , bundlePath string , secrets []corev1.LocalObjectReference , timeout time.Duration ) (job * batchv1.Job , err error ) {
652
+ func (c * ConfigMapUnpacker ) ensureJob (cmRef * corev1.ObjectReference , bundlePath string , secrets []corev1.LocalObjectReference , timeout time.Duration , unpackRetryInterval time. Duration ) (job * batchv1.Job , err error ) {
645
653
fresh := c .job (cmRef , bundlePath , secrets , timeout )
646
654
job , err = c .jobLister .Jobs (fresh .GetNamespace ()).Get (fresh .GetName ())
647
655
if err != nil {
@@ -651,13 +659,40 @@ func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath
651
659
652
660
return
653
661
}
654
- // Cleanup old unpacking job and retry
655
- if _ , isFailed := getCondition (job , batchv1 .JobFailed ); isFailed {
656
- err = c .client .BatchV1 ().Jobs (job .GetNamespace ()).Delete (context .TODO (), job .GetName (), metav1.DeleteOptions {})
657
- if err == nil {
658
- job , err = c .client .BatchV1 ().Jobs (fresh .GetNamespace ()).Create (context .TODO (), fresh , metav1.CreateOptions {})
662
+
663
+ // only check for retries if an unpackRetryInterval is specified
664
+ if unpackRetryInterval > 0 {
665
+ if failedCond , isFailed := getCondition (job , batchv1 .JobFailed ); isFailed {
666
+ lastFailureTime := failedCond .LastTransitionTime .Time
667
+ // Look for other unpack jobs for the same bundle
668
+ var jobs []* batchv1.Job
669
+ jobs , err = c .jobLister .Jobs (fresh .GetNamespace ()).List (k8slabels.ValidatedSetSelector {bundleUnpackRefLabel : cmRef .Name })
670
+ if err != nil {
671
+ return
672
+ }
673
+
674
+ var failed bool
675
+ var cond * batchv1.JobCondition
676
+ for _ , j := range jobs {
677
+ cond , failed = getCondition (j , batchv1 .JobFailed )
678
+ if ! failed {
679
+ // found an in-progress unpack attempt
680
+ job = j
681
+ break
682
+ }
683
+ if cond != nil && lastFailureTime .Before (cond .LastTransitionTime .Time ) {
684
+ lastFailureTime = cond .LastTransitionTime .Time
685
+ }
686
+ }
687
+
688
+ if failed {
689
+ if time .Now ().After (lastFailureTime .Add (unpackRetryInterval )) {
690
+ fresh .SetName (fmt .Sprintf ("%s-%d" , fresh .GetName (), len (jobs )))
691
+ job , err = c .client .BatchV1 ().Jobs (fresh .GetNamespace ()).Create (context .TODO (), fresh , metav1.CreateOptions {})
692
+ return
693
+ }
694
+ }
659
695
}
660
- return
661
696
}
662
697
663
698
if equality .Semantic .DeepDerivative (fresh .GetOwnerReferences (), job .GetOwnerReferences ()) && equality .Semantic .DeepDerivative (fresh .Spec , job .Spec ) {
@@ -825,3 +860,28 @@ func OperatorGroupBundleUnpackTimeout(ogLister v1listers.OperatorGroupNamespaceL
825
860
826
861
return d , nil
827
862
}
863
+
864
+ // OperatorGroupBundleUnpackRetryInterval returns bundle unpack retry interval from annotation if specified.
865
+ // If the retry annotation is not set, return retry = 0 which is subsequently ignored. This interval, if > 0,
866
+ // determines the minimum interval between recreating a failed unpack job.
867
+ func OperatorGroupBundleUnpackRetryInterval (ogLister v1listers.OperatorGroupNamespaceLister ) (time.Duration , error ) {
868
+ ogs , err := ogLister .List (k8slabels .Everything ())
869
+ if err != nil {
870
+ return 0 , err
871
+ }
872
+ if len (ogs ) != 1 {
873
+ return 0 , fmt .Errorf ("found %d operatorGroups, expected 1" , len (ogs ))
874
+ }
875
+
876
+ timeoutStr , ok := ogs [0 ].GetAnnotations ()[BundleUnpackRetryMinimumIntervalAnnotationKey ]
877
+ if ! ok {
878
+ return 0 , nil
879
+ }
880
+
881
+ d , err := time .ParseDuration (timeoutStr )
882
+ if err != nil {
883
+ return 0 , fmt .Errorf ("failed to parse unpack timeout annotation(%s: %s): %w" , BundleUnpackRetryMinimumIntervalAnnotationKey , timeoutStr , err )
884
+ }
885
+
886
+ return d , nil
887
+ }
0 commit comments