Skip to content

Commit 27ced41

Browse files
committed
Fail InstallPlan on bundle unpack timeout
The InstallPlan sync can stay stalled on the Installing phase if the bundle cannot be successfully unpacked. Adding a configurable timeout for the duration of the bundle unpack Job helps identify if an unpack Job is stalled, and the InstallPlan is then transitioned to Failed with the unpack Job's failure condition propagated to the InstallPlan condition.
1 parent 44a00ee commit 27ced41

File tree

5 files changed

+233
-32
lines changed

5 files changed

+233
-32
lines changed

cmd/catalog/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ var (
7171
"profiling", false, "serve profiling data (on port 8080)")
7272

7373
installPlanTimeout = flag.Duration("install-plan-retry-timeout", 1*time.Minute, "time since first attempt at which plan execution errors are considered fatal")
74+
// TODO(haseeb): Change to a more realistic timeout for bundle unpack jobs
75+
bundleUnpackTimeout = flag.Duration("bundle-unpack-timeout", 60*time.Second, "The time duration after which the bundle unpack Job for an installplan will be aborted and the installplan will be Failed. 0 is considered as having no timeout.")
7476
)
7577

7678
func init() {
@@ -175,7 +177,7 @@ func main() {
175177
}
176178

177179
// Create a new instance of the operator.
178-
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme, *installPlanTimeout)
180+
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme, *installPlanTimeout, *bundleUnpackTimeout)
179181
if err != nil {
180182
log.Panicf("error configuring operator: %s", err.Error())
181183
}

pkg/controller/bundle/bundle_unpacker.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha256"
66
"fmt"
7+
"time"
78

89
"github.com/operator-framework/operator-registry/pkg/api"
910
"github.com/operator-framework/operator-registry/pkg/configmap"
@@ -165,6 +166,12 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
165166
job.SetName(cmRef.Name)
166167
job.SetOwnerReferences([]metav1.OwnerReference{ownerRef(cmRef)})
167168

169+
// Don't set a timeout if it is 0
170+
if c.unpackTimeout != time.Duration(0) {
171+
t := int64(c.unpackTimeout.Seconds())
172+
job.Spec.ActiveDeadlineSeconds = &t
173+
}
174+
168175
return job
169176
}
170177

@@ -175,16 +182,17 @@ type Unpacker interface {
175182
}
176183

177184
type ConfigMapUnpacker struct {
178-
opmImage string
179-
utilImage string
180-
client kubernetes.Interface
181-
csLister listersoperatorsv1alpha1.CatalogSourceLister
182-
cmLister listerscorev1.ConfigMapLister
183-
jobLister listersbatchv1.JobLister
184-
roleLister listersrbacv1.RoleLister
185-
rbLister listersrbacv1.RoleBindingLister
186-
loader *configmap.BundleLoader
187-
now func() metav1.Time
185+
opmImage string
186+
utilImage string
187+
client kubernetes.Interface
188+
csLister listersoperatorsv1alpha1.CatalogSourceLister
189+
cmLister listerscorev1.ConfigMapLister
190+
jobLister listersbatchv1.JobLister
191+
roleLister listersrbacv1.RoleLister
192+
rbLister listersrbacv1.RoleBindingLister
193+
loader *configmap.BundleLoader
194+
now func() metav1.Time
195+
unpackTimeout time.Duration
188196
}
189197

190198
type ConfigMapUnpackerOption func(*ConfigMapUnpacker)
@@ -202,6 +210,12 @@ func NewConfigmapUnpacker(options ...ConfigMapUnpackerOption) (*ConfigMapUnpacke
202210
return unpacker, nil
203211
}
204212

213+
func WithUnpackTimeout(timeout time.Duration) ConfigMapUnpackerOption {
214+
return func(unpacker *ConfigMapUnpacker) {
215+
unpacker.unpackTimeout = timeout
216+
}
217+
}
218+
205219
func WithOPMImage(opmImage string) ConfigMapUnpackerOption {
206220
return func(unpacker *ConfigMapUnpacker) {
207221
unpacker.opmImage = opmImage
@@ -361,7 +375,22 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup)
361375
return
362376
}
363377

364-
if !jobConditionTrue(job, batchv1.JobComplete) && (cond.Status != corev1.ConditionTrue || cond.Reason != JobIncompleteReason) {
378+
// Check if bundle unpack job has failed due a timeout
379+
// Return a BundleJobError so we can mark the InstallPlan as Failed
380+
isFailed, jobCond := jobConditionTrue(job, batchv1.JobFailed)
381+
if isFailed {
382+
cond.Status = corev1.ConditionTrue
383+
cond.Reason = jobCond.Reason
384+
cond.Message = jobCond.Message
385+
cond.LastTransitionTime = &now
386+
result.SetCondition(cond)
387+
388+
err = NewBundleJobError(fmt.Sprintf("Bundle extract Job failed with Reason: %v, and Message: %v", jobCond.Reason, jobCond.Message))
389+
return
390+
}
391+
392+
isComplete, _ := jobConditionTrue(job, batchv1.JobComplete)
393+
if !isComplete && (cond.Status != corev1.ConditionTrue || cond.Reason != JobIncompleteReason) {
365394
cond.Status = corev1.ConditionTrue
366395
cond.Reason = JobIncompleteReason
367396
cond.Message = JobIncompleteMessage
@@ -541,15 +570,16 @@ func ownerRef(ref *corev1.ObjectReference) metav1.OwnerReference {
541570
}
542571

543572
// jobConditionTrue returns true if the given job has the given condition with the given condition type true, and returns false otherwise.
544-
func jobConditionTrue(job *batchv1.Job, conditionType batchv1.JobConditionType) bool {
573+
// Also returns the condition if true
574+
func jobConditionTrue(job *batchv1.Job, conditionType batchv1.JobConditionType) (bool, *batchv1.JobCondition) {
545575
if job == nil {
546-
return false
576+
return false, nil
547577
}
548578

549579
for _, cond := range job.Status.Conditions {
550580
if cond.Type == conditionType && cond.Status == corev1.ConditionTrue {
551-
return true
581+
return true, &cond
552582
}
553583
}
554-
return false
584+
return false, nil
555585
}

pkg/controller/bundle/errors.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package bundle
2+
3+
// bundleJobError is an error type returned by UnpackBundle() when the
4+
// bundle unpack job fails (e.g due to a timeout)
5+
type bundleJobError struct {
6+
s string
7+
}
8+
9+
func NewBundleJobError(s string) error {
10+
return bundleJobError{s: s}
11+
}
12+
13+
func (e bundleJobError) Error() string {
14+
return e.s
15+
}
16+
17+
func (e bundleJobError) IsBundleJobError() bool {
18+
return true
19+
}
20+
21+
// IsBundleJobError checks if an error is an error due to the bundle extract job failing.
22+
func IsBundleJobError(err error) bool {
23+
type bundleJobError interface {
24+
IsBundleJobError() bool
25+
}
26+
ogErr, ok := err.(bundleJobError)
27+
return ok && ogErr.IsBundleJobError()
28+
}

pkg/controller/operators/catalog/operator.go

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,13 @@ type Operator struct {
106106
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
107107
bundleUnpacker bundle.Unpacker
108108
installPlanTimeout time.Duration
109+
bundleUnpackTimeout time.Duration
109110
}
110111

111112
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
112113

113114
// NewOperator creates a new Catalog Operator.
114-
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string, scheme *runtime.Scheme, installPlanTimeout time.Duration) (*Operator, error) {
115+
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string, scheme *runtime.Scheme, installPlanTimeout time.Duration, bundleUnpackTimeout time.Duration) (*Operator, error) {
115116
resyncPeriod := queueinformer.ResyncWithJitter(resync, 0.2)
116117
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
117118
if err != nil {
@@ -170,6 +171,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
170171
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
171172
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient, dynamicClient),
172173
installPlanTimeout: installPlanTimeout,
174+
bundleUnpackTimeout: bundleUnpackTimeout,
173175
}
174176
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
175177
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)
@@ -343,6 +345,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
343345
bundle.WithOPMImage(configmapRegistryImage),
344346
bundle.WithUtilImage(utilImage),
345347
bundle.WithNow(op.now),
348+
bundle.WithUnpackTimeout(op.bundleUnpackTimeout),
346349
)
347350
if err != nil {
348351
return nil, err
@@ -1184,6 +1187,10 @@ func (o *Operator) unpackBundles(plan *v1alpha1.InstallPlan) (bool, *v1alpha1.In
11841187
lookup := out.Status.BundleLookups[i]
11851188
res, err := o.bundleUnpacker.UnpackBundle(&lookup)
11861189
if err != nil {
1190+
// If the bundle unpack job fails abort early to fail the InstallPlan
1191+
if bundle.IsBundleJobError(err) {
1192+
return false, nil, err
1193+
}
11871194
errs = append(errs, err)
11881195
continue
11891196
}
@@ -1351,20 +1358,10 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
13511358
// Mark the InstallPlan as failed for a fatal Operator Group related error
13521359
logger.Infof("attenuated service account query failed - %v", err)
13531360
ipFailError := fmt.Errorf("invalid operator group - %v", err)
1354-
now := o.now()
1355-
out := plan.DeepCopy()
1356-
out.Status.SetCondition(v1alpha1.ConditionFailed(v1alpha1.InstallPlanInstalled,
1357-
v1alpha1.InstallPlanReasonInstallCheckFailed, ipFailError.Error(), &now))
1358-
out.Status.Phase = v1alpha1.InstallPlanPhaseFailed
1359-
1360-
logger.Info("transitioning InstallPlan to failed")
1361-
if _, err := o.client.OperatorsV1alpha1().InstallPlans(plan.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{}); err != nil {
1362-
updateErr := errors.New("error updating InstallPlan status: " + err.Error())
1363-
logger = logger.WithField("updateError", updateErr)
1364-
logger.Errorf("error transitioning InstallPlan to failed")
1365-
1366-
// retry sync with error to update InstallPlan status
1367-
syncError = fmt.Errorf("InstallPlan failed: %s and error updating InstallPlan status as failed: %s", ipFailError, updateErr)
1361+
1362+
if err := o.transitionInstallPlanToFailed(plan, logger, v1alpha1.InstallPlanReasonInstallCheckFailed, ipFailError.Error()); err != nil {
1363+
// retry for failure to update status
1364+
syncError = err
13681365
return
13691366
}
13701367

@@ -1394,7 +1391,23 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
13941391
if len(plan.Status.BundleLookups) > 0 {
13951392
unpacked, out, err := o.unpackBundles(plan)
13961393
if err != nil {
1397-
syncError = fmt.Errorf("bundle unpacking failed: %v", err)
1394+
// Retry sync if non-fatal error
1395+
if !bundle.IsBundleJobError(err) {
1396+
syncError = fmt.Errorf("bundle unpacking failed: %v", err)
1397+
return
1398+
}
1399+
1400+
// Mark the InstallPlan as failed for a fatal bundle unpack error
1401+
logger.Infof("bundle unpacking failed: %v", err)
1402+
1403+
if err := o.transitionInstallPlanToFailed(plan, logger, v1alpha1.InstallPlanReasonInstallCheckFailed, err.Error()); err != nil {
1404+
// retry for failure to update status
1405+
syncError = err
1406+
return
1407+
}
1408+
1409+
// Requeue subscription to propagate SubscriptionInstallPlanFailed condtion to subscription
1410+
o.requeueSubscriptionForInstallPlan(plan, logger)
13981411
return
13991412
}
14001413

@@ -1447,6 +1460,28 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
14471460
return
14481461
}
14491462

1463+
func (o *Operator) transitionInstallPlanToFailed(plan *v1alpha1.InstallPlan, logger *logrus.Entry, reason v1alpha1.InstallPlanConditionReason, message string) (syncError error) {
1464+
now := o.now()
1465+
out := plan.DeepCopy()
1466+
out.Status.SetCondition(v1alpha1.ConditionFailed(v1alpha1.InstallPlanInstalled,
1467+
reason, message, &now))
1468+
out.Status.Phase = v1alpha1.InstallPlanPhaseFailed
1469+
1470+
logger.Info("transitioning InstallPlan to failed")
1471+
_, err := o.client.OperatorsV1alpha1().InstallPlans(plan.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{})
1472+
if err == nil {
1473+
return
1474+
}
1475+
1476+
updateErr := errors.New("error updating InstallPlan status: " + err.Error())
1477+
logger = logger.WithField("updateError", updateErr)
1478+
logger.Errorf("error transitioning InstallPlan to failed")
1479+
1480+
// retry sync with error to update InstallPlan status
1481+
syncError = fmt.Errorf("InstallPlan failed: %s and error updating InstallPlan status as failed: %s", message, updateErr)
1482+
return
1483+
}
1484+
14501485
func (o *Operator) requeueSubscriptionForInstallPlan(plan *v1alpha1.InstallPlan, logger *logrus.Entry) {
14511486
// Notify subscription loop of installplan changes
14521487
owners := ownerutil.GetOwnersByKind(plan, v1alpha1.SubscriptionKind)

test/e2e/installplan_e2e_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636

3737
opver "github.com/operator-framework/api/pkg/lib/version"
3838
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
39+
"github.com/operator-framework/api/pkg/operators/v1alpha1"
3940
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
4041
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
4142
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog"
@@ -2998,6 +2999,111 @@ var _ = Describe("Install Plan", func() {
29982999
ctx.Ctx().Logf(fmt.Sprintf("Install plan %s fetched with conditions %+v", fetchedInstallPlan.GetName(), fetchedInstallPlan.Status.Conditions))
29993000
})
30003001

3002+
It("should fail an InstallPlan when an InstallPlan has a bundle unpack job timeout", func() {
3003+
ns := &corev1.Namespace{}
3004+
ns.SetName(genName("ns-"))
3005+
3006+
c := newKubeClient()
3007+
crc := newCRClient()
3008+
now := metav1.Now()
3009+
3010+
// Create a namespace
3011+
ns, err := c.KubernetesInterface().CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{})
3012+
Expect(err).NotTo(HaveOccurred())
3013+
deleteOpts := &metav1.DeleteOptions{}
3014+
defer func() {
3015+
err = c.KubernetesInterface().CoreV1().Namespaces().Delete(context.TODO(), ns.GetName(), *deleteOpts)
3016+
Expect(err).ToNot(HaveOccurred())
3017+
}()
3018+
3019+
// Create the single (kiali) bundle catalog source
3020+
catsrc := &operatorsv1alpha1.CatalogSource{
3021+
ObjectMeta: metav1.ObjectMeta{
3022+
Name: genName("kiali-"),
3023+
Namespace: ns.GetName(),
3024+
Labels: map[string]string{"olm.catalogSource": "kaili-catalog"},
3025+
},
3026+
Spec: operatorsv1alpha1.CatalogSourceSpec{
3027+
Image: "quay.io/olmtest/single-bundle-index:1.0.0",
3028+
SourceType: operatorsv1alpha1.SourceTypeGrpc,
3029+
},
3030+
}
3031+
catsrc, err = crc.OperatorsV1alpha1().CatalogSources(catsrc.GetNamespace()).Create(context.TODO(), catsrc, metav1.CreateOptions{})
3032+
Expect(err).ToNot(HaveOccurred())
3033+
3034+
// Wait for the CatalogSource to be ready
3035+
catsrc, err = fetchCatalogSourceOnStatus(crc, catsrc.GetName(), catsrc.GetNamespace(), catalogSourceRegistryPodSynced)
3036+
Expect(err).ToNot(HaveOccurred())
3037+
3038+
// Create the OperatorGroup
3039+
og := &operatorsv1.OperatorGroup{
3040+
ObjectMeta: metav1.ObjectMeta{
3041+
Name: "og",
3042+
},
3043+
Spec: operatorsv1.OperatorGroupSpec{
3044+
TargetNamespaces: []string{ns.GetName()},
3045+
},
3046+
}
3047+
_, err = crc.OperatorsV1().OperatorGroups(ns.GetName()).Create(context.TODO(), og, metav1.CreateOptions{})
3048+
Expect(err).ToNot(HaveOccurred())
3049+
3050+
// Create an InstallPlan in the Planning phase with status.bundleLookups specified for a non-existing bundle
3051+
ip := &operatorsv1alpha1.InstallPlan{
3052+
ObjectMeta: metav1.ObjectMeta{
3053+
Name: "ip",
3054+
Namespace: ns.GetName(),
3055+
},
3056+
Spec: operatorsv1alpha1.InstallPlanSpec{
3057+
ClusterServiceVersionNames: []string{"foobar"},
3058+
Approval: v1alpha1.ApprovalAutomatic,
3059+
Approved: true,
3060+
Generation: 1,
3061+
},
3062+
Status: operatorsv1alpha1.InstallPlanStatus{
3063+
Phase: operatorsv1alpha1.InstallPlanPhasePlanning,
3064+
CatalogSources: []string{},
3065+
BundleLookups: []operatorsv1alpha1.BundleLookup{
3066+
{
3067+
Path: "quay.io/foo/bar:v0.0.1",
3068+
Identifier: "foobar.v0.0.1",
3069+
CatalogSourceRef: &corev1.ObjectReference{
3070+
Namespace: ns.GetName(),
3071+
Name: catsrc.GetName(),
3072+
},
3073+
Conditions: []operatorsv1alpha1.BundleLookupCondition{
3074+
{
3075+
Type: operatorsv1alpha1.BundleLookupPending,
3076+
Status: corev1.ConditionTrue,
3077+
Reason: "JobIncomplete",
3078+
Message: "unpack job not completed",
3079+
LastTransitionTime: &now,
3080+
},
3081+
},
3082+
},
3083+
},
3084+
},
3085+
}
3086+
3087+
outIP, err := crc.OperatorsV1alpha1().InstallPlans(ns.GetName()).Create(context.TODO(), ip, metav1.CreateOptions{})
3088+
Expect(err).NotTo(HaveOccurred())
3089+
Expect(outIP).NotTo(BeNil())
3090+
3091+
// The status gets ignored on create so we need to update it else the InstallPlan sync ignores
3092+
// InstallPlans without any steps or bundle lookups
3093+
outIP.Status = ip.Status
3094+
_, err = crc.OperatorsV1alpha1().InstallPlans(ns.GetName()).UpdateStatus(context.TODO(), outIP, metav1.UpdateOptions{})
3095+
Expect(err).NotTo(HaveOccurred())
3096+
3097+
// The InstallPlan should be Failed due to the bundle unpack job timeout
3098+
// TODO(haseeb): Use Eventually() to set a Duration just over the default bundle-unpack-timeout duration. Else this could poll for a long time.
3099+
fetchedInstallPlan, err := fetchInstallPlanWithNamespace(GinkgoT(), crc, ip.Name, ns.GetName(), buildInstallPlanPhaseCheckFunc(operatorsv1alpha1.InstallPlanPhaseFailed))
3100+
Expect(err).NotTo(HaveOccurred())
3101+
Expect(fetchedInstallPlan).NotTo(BeNil())
3102+
ctx.Ctx().Logf(fmt.Sprintf("Install plan %s fetched with phase %s", fetchedInstallPlan.GetName(), fetchedInstallPlan.Status.Phase))
3103+
ctx.Ctx().Logf(fmt.Sprintf("Install plan %s fetched with conditions %+v", fetchedInstallPlan.GetName(), fetchedInstallPlan.Status.Conditions))
3104+
3105+
})
3106+
30013107
It("compresses installplan step resource manifests to configmap references", func() {
30023108
// Test ensures that all steps for index-based catalogs are references to configmaps. This avoids the problem
30033109
// of installplans growing beyond the etcd size limit when manifests are written to the ip status.

0 commit comments

Comments
 (0)