Skip to content

Commit d71ecdc

Browse files
committed
Fail InstallPlan on bundle unpack timeout
The InstallPlan sync can stay stalled on the Installing phase if the bundle cannot be successfully unpacked. Adding a configurable timeout for the duration of the bundle unpack Job helps identify if an unpack Job is stalled, and the InstallPlan is then transitioned to Failed with the unpack Job's failure condition propagated to the InstallPlan condition.
1 parent b11215a commit d71ecdc

File tree

5 files changed

+234
-32
lines changed

5 files changed

+234
-32
lines changed

cmd/catalog/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ var (
6969

7070
profiling = flag.Bool(
7171
"profiling", false, "serve profiling data (on port 8080)")
72+
73+
// TODO(haseeb): Change to a more realistic timeout for bundle unpack jobs
74+
bundleUnpackTimeout = flag.Duration("bundle-unpack-timeout", 60*time.Second, "The time duration after which the bundle unpack Job for an installplan will be aborted and the installplan will be Failed. 0 is considered as having no timeout.")
7275
)
7376

7477
func init() {
@@ -173,7 +176,7 @@ func main() {
173176
}
174177

175178
// Create a new instance of the operator.
176-
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme)
179+
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme, *bundleUnpackTimeout)
177180
if err != nil {
178181
log.Panicf("error configuring operator: %s", err.Error())
179182
}

pkg/controller/bundle/bundle_unpacker.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha256"
66
"fmt"
7+
"time"
78

89
"github.com/operator-framework/operator-registry/pkg/api"
910
"github.com/operator-framework/operator-registry/pkg/configmap"
@@ -146,6 +147,12 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
146147
job.SetName(cmRef.Name)
147148
job.SetOwnerReferences([]metav1.OwnerReference{ownerRef(cmRef)})
148149

150+
// Don't set a timeout if it is 0
151+
if c.unpackTimeout != time.Duration(0) {
152+
t := int64(c.unpackTimeout.Seconds())
153+
job.Spec.ActiveDeadlineSeconds = &t
154+
}
155+
149156
return job
150157
}
151158

@@ -156,16 +163,17 @@ type Unpacker interface {
156163
}
157164

158165
type ConfigMapUnpacker struct {
159-
opmImage string
160-
utilImage string
161-
client kubernetes.Interface
162-
csLister listersoperatorsv1alpha1.CatalogSourceLister
163-
cmLister listerscorev1.ConfigMapLister
164-
jobLister listersbatchv1.JobLister
165-
roleLister listersrbacv1.RoleLister
166-
rbLister listersrbacv1.RoleBindingLister
167-
loader *configmap.BundleLoader
168-
now func() metav1.Time
166+
opmImage string
167+
utilImage string
168+
client kubernetes.Interface
169+
csLister listersoperatorsv1alpha1.CatalogSourceLister
170+
cmLister listerscorev1.ConfigMapLister
171+
jobLister listersbatchv1.JobLister
172+
roleLister listersrbacv1.RoleLister
173+
rbLister listersrbacv1.RoleBindingLister
174+
loader *configmap.BundleLoader
175+
now func() metav1.Time
176+
unpackTimeout time.Duration
169177
}
170178

171179
type ConfigMapUnpackerOption func(*ConfigMapUnpacker)
@@ -183,6 +191,12 @@ func NewConfigmapUnpacker(options ...ConfigMapUnpackerOption) (*ConfigMapUnpacke
183191
return unpacker, nil
184192
}
185193

194+
func WithUnpackTimeout(timeout time.Duration) ConfigMapUnpackerOption {
195+
return func(unpacker *ConfigMapUnpacker) {
196+
unpacker.unpackTimeout = timeout
197+
}
198+
}
199+
186200
func WithOPMImage(opmImage string) ConfigMapUnpackerOption {
187201
return func(unpacker *ConfigMapUnpacker) {
188202
unpacker.opmImage = opmImage
@@ -342,7 +356,22 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup)
342356
return
343357
}
344358

345-
if !jobConditionTrue(job, batchv1.JobComplete) && (cond.Status != corev1.ConditionTrue || cond.Reason != JobIncompleteReason) {
359+
// Check if bundle unpack job has failed due a timeout
360+
// Return a BundleJobError so we can mark the InstallPlan as Failed
361+
isFailed, jobCond := jobConditionTrue(job, batchv1.JobFailed)
362+
if isFailed {
363+
cond.Status = corev1.ConditionTrue
364+
cond.Reason = jobCond.Reason
365+
cond.Message = jobCond.Message
366+
cond.LastTransitionTime = &now
367+
result.SetCondition(cond)
368+
369+
err = NewBundleJobError(fmt.Sprintf("Bundle extract Job failed with Reason: %v, and Message: %v", jobCond.Reason, jobCond.Message))
370+
return
371+
}
372+
373+
isComplete, _ := jobConditionTrue(job, batchv1.JobComplete)
374+
if !isComplete && (cond.Status != corev1.ConditionTrue || cond.Reason != JobIncompleteReason) {
346375
cond.Status = corev1.ConditionTrue
347376
cond.Reason = JobIncompleteReason
348377
cond.Message = JobIncompleteMessage
@@ -522,15 +551,16 @@ func ownerRef(ref *corev1.ObjectReference) metav1.OwnerReference {
522551
}
523552

524553
// jobConditionTrue returns true if the given job has the given condition with the given condition type true, and returns false otherwise.
525-
func jobConditionTrue(job *batchv1.Job, conditionType batchv1.JobConditionType) bool {
554+
// Also returns the condition if true
555+
func jobConditionTrue(job *batchv1.Job, conditionType batchv1.JobConditionType) (bool, *batchv1.JobCondition) {
526556
if job == nil {
527-
return false
557+
return false, nil
528558
}
529559

530560
for _, cond := range job.Status.Conditions {
531561
if cond.Type == conditionType && cond.Status == corev1.ConditionTrue {
532-
return true
562+
return true, &cond
533563
}
534564
}
535-
return false
565+
return false, nil
536566
}

pkg/controller/bundle/errors.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package bundle
2+
3+
// bundleJobError is an error type returned by UnpackBundle() when the
4+
// bundle unpack job fails (e.g due to a timeout)
5+
type bundleJobError struct {
6+
s string
7+
}
8+
9+
func NewBundleJobError(s string) error {
10+
return bundleJobError{s: s}
11+
}
12+
13+
func (e bundleJobError) Error() string {
14+
return e.s
15+
}
16+
17+
func (e bundleJobError) IsBundleJobError() bool {
18+
return true
19+
}
20+
21+
// IsBundleJobError checks if an error is an error due to the bundle extract job failing.
22+
func IsBundleJobError(err error) bool {
23+
type bundleJobError interface {
24+
IsBundleJobError() bool
25+
}
26+
ogErr, ok := err.(bundleJobError)
27+
return ok && ogErr.IsBundleJobError()
28+
}

pkg/controller/operators/catalog/operator.go

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,13 @@ type Operator struct {
105105
clientAttenuator *scoped.ClientAttenuator
106106
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
107107
bundleUnpacker bundle.Unpacker
108+
bundleUnpackTimeout time.Duration
108109
}
109110

110111
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
111112

112113
// NewOperator creates a new Catalog Operator.
113-
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string, scheme *runtime.Scheme) (*Operator, error) {
114+
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string, scheme *runtime.Scheme, bundleUnpackTimeout time.Duration) (*Operator, error) {
114115
resyncPeriod := queueinformer.ResyncWithJitter(resync, 0.2)
115116
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
116117
if err != nil {
@@ -168,6 +169,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
168169
catalogSubscriberIndexer: map[string]cache.Indexer{},
169170
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
170171
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient, dynamicClient),
172+
bundleUnpackTimeout: bundleUnpackTimeout,
171173
}
172174
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
173175
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)
@@ -341,6 +343,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
341343
bundle.WithOPMImage(configmapRegistryImage),
342344
bundle.WithUtilImage(utilImage),
343345
bundle.WithNow(op.now),
346+
bundle.WithUnpackTimeout(op.bundleUnpackTimeout),
344347
)
345348
if err != nil {
346349
return nil, err
@@ -1182,6 +1185,10 @@ func (o *Operator) unpackBundles(plan *v1alpha1.InstallPlan) (bool, *v1alpha1.In
11821185
lookup := out.Status.BundleLookups[i]
11831186
res, err := o.bundleUnpacker.UnpackBundle(&lookup)
11841187
if err != nil {
1188+
// If the bundle unpack job fails abort early to fail the InstallPlan
1189+
if bundle.IsBundleJobError(err) {
1190+
return false, nil, err
1191+
}
11851192
errs = append(errs, err)
11861193
continue
11871194
}
@@ -1349,20 +1356,10 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
13491356
// Mark the InstallPlan as failed for a fatal Operator Group related error
13501357
logger.Infof("attenuated service account query failed - %v", err)
13511358
ipFailError := fmt.Errorf("invalid operator group - %v", err)
1352-
now := o.now()
1353-
out := plan.DeepCopy()
1354-
out.Status.SetCondition(v1alpha1.ConditionFailed(v1alpha1.InstallPlanInstalled,
1355-
v1alpha1.InstallPlanReasonInstallCheckFailed, ipFailError.Error(), &now))
1356-
out.Status.Phase = v1alpha1.InstallPlanPhaseFailed
1357-
1358-
logger.Info("transitioning InstallPlan to failed")
1359-
if _, err := o.client.OperatorsV1alpha1().InstallPlans(plan.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{}); err != nil {
1360-
updateErr := errors.New("error updating InstallPlan status: " + err.Error())
1361-
logger = logger.WithField("updateError", updateErr)
1362-
logger.Errorf("error transitioning InstallPlan to failed")
1363-
1364-
// retry sync with error to update InstallPlan status
1365-
syncError = fmt.Errorf("InstallPlan failed: %s and error updating InstallPlan status as failed: %s", ipFailError, updateErr)
1359+
1360+
if err := o.transitionInstallPlanToFailed(plan, logger, v1alpha1.InstallPlanReasonInstallCheckFailed, ipFailError.Error()); err != nil {
1361+
// retry for failure to update status
1362+
syncError = err
13661363
return
13671364
}
13681365

@@ -1392,7 +1389,23 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
13921389
if len(plan.Status.BundleLookups) > 0 {
13931390
unpacked, out, err := o.unpackBundles(plan)
13941391
if err != nil {
1395-
syncError = fmt.Errorf("bundle unpacking failed: %v", err)
1392+
// Retry sync if non-fatal error
1393+
if !bundle.IsBundleJobError(err) {
1394+
syncError = fmt.Errorf("bundle unpacking failed: %v", err)
1395+
return
1396+
}
1397+
1398+
// Mark the InstallPlan as failed for a fatal bundle unpack error
1399+
logger.Infof("bundle unpacking failed: %v", err)
1400+
1401+
if err := o.transitionInstallPlanToFailed(plan, logger, v1alpha1.InstallPlanReasonInstallCheckFailed, err.Error()); err != nil {
1402+
// retry for failure to update status
1403+
syncError = err
1404+
return
1405+
}
1406+
1407+
// Requeue subscription to propagate SubscriptionInstallPlanFailed condtion to subscription
1408+
o.requeueSubscriptionForInstallPlan(plan, logger)
13961409
return
13971410
}
13981411

@@ -1445,6 +1458,28 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
14451458
return
14461459
}
14471460

1461+
func (o *Operator) transitionInstallPlanToFailed(plan *v1alpha1.InstallPlan, logger *logrus.Entry, reason v1alpha1.InstallPlanConditionReason, message string) (syncError error) {
1462+
now := o.now()
1463+
out := plan.DeepCopy()
1464+
out.Status.SetCondition(v1alpha1.ConditionFailed(v1alpha1.InstallPlanInstalled,
1465+
reason, message, &now))
1466+
out.Status.Phase = v1alpha1.InstallPlanPhaseFailed
1467+
1468+
logger.Info("transitioning InstallPlan to failed")
1469+
_, err := o.client.OperatorsV1alpha1().InstallPlans(plan.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{})
1470+
if err == nil {
1471+
return
1472+
}
1473+
1474+
updateErr := errors.New("error updating InstallPlan status: " + err.Error())
1475+
logger = logger.WithField("updateError", updateErr)
1476+
logger.Errorf("error transitioning InstallPlan to failed")
1477+
1478+
// retry sync with error to update InstallPlan status
1479+
syncError = fmt.Errorf("InstallPlan failed: %s and error updating InstallPlan status as failed: %s", message, updateErr)
1480+
return
1481+
}
1482+
14481483
func (o *Operator) requeueSubscriptionForInstallPlan(plan *v1alpha1.InstallPlan, logger *logrus.Entry) {
14491484
// Notify subscription loop of installplan changes
14501485
owners := ownerutil.GetOwnersByKind(plan, v1alpha1.SubscriptionKind)

test/e2e/installplan_e2e_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636

3737
opver "github.com/operator-framework/api/pkg/lib/version"
3838
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
39+
"github.com/operator-framework/api/pkg/operators/v1alpha1"
3940
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
4041
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
4142
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog"
@@ -2913,6 +2914,111 @@ var _ = Describe("Install Plan", func() {
29132914
ctx.Ctx().Logf(fmt.Sprintf("Install plan %s fetched with conditions %+v", fetchedInstallPlan.GetName(), fetchedInstallPlan.Status.Conditions))
29142915
})
29152916

2917+
It("should fail an InstallPlan when an InstallPlan has a bundle unpack job timeout", func() {
2918+
ns := &corev1.Namespace{}
2919+
ns.SetName(genName("ns-"))
2920+
2921+
c := newKubeClient()
2922+
crc := newCRClient()
2923+
now := metav1.Now()
2924+
2925+
// Create a namespace
2926+
ns, err := c.KubernetesInterface().CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{})
2927+
Expect(err).NotTo(HaveOccurred())
2928+
deleteOpts := &metav1.DeleteOptions{}
2929+
defer func() {
2930+
err = c.KubernetesInterface().CoreV1().Namespaces().Delete(context.TODO(), ns.GetName(), *deleteOpts)
2931+
Expect(err).ToNot(HaveOccurred())
2932+
}()
2933+
2934+
// Create the single (kiali) bundle catalog source
2935+
catsrc := &operatorsv1alpha1.CatalogSource{
2936+
ObjectMeta: metav1.ObjectMeta{
2937+
Name: genName("kiali-"),
2938+
Namespace: ns.GetName(),
2939+
Labels: map[string]string{"olm.catalogSource": "kaili-catalog"},
2940+
},
2941+
Spec: operatorsv1alpha1.CatalogSourceSpec{
2942+
Image: "quay.io/olmtest/single-bundle-index:1.0.0",
2943+
SourceType: operatorsv1alpha1.SourceTypeGrpc,
2944+
},
2945+
}
2946+
catsrc, err = crc.OperatorsV1alpha1().CatalogSources(catsrc.GetNamespace()).Create(context.TODO(), catsrc, metav1.CreateOptions{})
2947+
Expect(err).ToNot(HaveOccurred())
2948+
2949+
// Wait for the CatalogSource to be ready
2950+
catsrc, err = fetchCatalogSourceOnStatus(crc, catsrc.GetName(), catsrc.GetNamespace(), catalogSourceRegistryPodSynced)
2951+
Expect(err).ToNot(HaveOccurred())
2952+
2953+
// Create the OperatorGroup
2954+
og := &operatorsv1.OperatorGroup{
2955+
ObjectMeta: metav1.ObjectMeta{
2956+
Name: "og",
2957+
},
2958+
Spec: operatorsv1.OperatorGroupSpec{
2959+
TargetNamespaces: []string{ns.GetName()},
2960+
},
2961+
}
2962+
_, err = crc.OperatorsV1().OperatorGroups(ns.GetName()).Create(context.TODO(), og, metav1.CreateOptions{})
2963+
Expect(err).ToNot(HaveOccurred())
2964+
2965+
// Create an InstallPlan in the Planning phase with status.bundleLookups specified for a non-existing bundle
2966+
ip := &operatorsv1alpha1.InstallPlan{
2967+
ObjectMeta: metav1.ObjectMeta{
2968+
Name: "ip",
2969+
Namespace: ns.GetName(),
2970+
},
2971+
Spec: operatorsv1alpha1.InstallPlanSpec{
2972+
ClusterServiceVersionNames: []string{"foobar"},
2973+
Approval: v1alpha1.ApprovalAutomatic,
2974+
Approved: true,
2975+
Generation: 1,
2976+
},
2977+
Status: operatorsv1alpha1.InstallPlanStatus{
2978+
Phase: operatorsv1alpha1.InstallPlanPhasePlanning,
2979+
CatalogSources: []string{},
2980+
BundleLookups: []operatorsv1alpha1.BundleLookup{
2981+
{
2982+
Path: "quay.io/foo/bar:v0.0.1",
2983+
Identifier: "foobar.v0.0.1",
2984+
CatalogSourceRef: &corev1.ObjectReference{
2985+
Namespace: ns.GetName(),
2986+
Name: catsrc.GetName(),
2987+
},
2988+
Conditions: []operatorsv1alpha1.BundleLookupCondition{
2989+
{
2990+
Type: operatorsv1alpha1.BundleLookupPending,
2991+
Status: corev1.ConditionTrue,
2992+
Reason: "JobIncomplete",
2993+
Message: "unpack job not completed",
2994+
LastTransitionTime: &now,
2995+
},
2996+
},
2997+
},
2998+
},
2999+
},
3000+
}
3001+
3002+
outIP, err := crc.OperatorsV1alpha1().InstallPlans(ns.GetName()).Create(context.TODO(), ip, metav1.CreateOptions{})
3003+
Expect(err).NotTo(HaveOccurred())
3004+
Expect(outIP).NotTo(BeNil())
3005+
3006+
// The status gets ignored on create so we need to update it else the InstallPlan sync ignores
3007+
// InstallPlans without any steps or bundle lookups
3008+
outIP.Status = ip.Status
3009+
_, err = crc.OperatorsV1alpha1().InstallPlans(ns.GetName()).UpdateStatus(context.TODO(), outIP, metav1.UpdateOptions{})
3010+
Expect(err).NotTo(HaveOccurred())
3011+
3012+
// The InstallPlan should be Failed due to the bundle unpack job timeout
3013+
// TODO(haseeb): Use Eventually() to set a Duration just over the default bundle-unpack-timeout duration. Else this could poll for a long time.
3014+
fetchedInstallPlan, err := fetchInstallPlanWithNamespace(GinkgoT(), crc, ip.Name, ns.GetName(), buildInstallPlanPhaseCheckFunc(operatorsv1alpha1.InstallPlanPhaseFailed))
3015+
Expect(err).NotTo(HaveOccurred())
3016+
Expect(fetchedInstallPlan).NotTo(BeNil())
3017+
ctx.Ctx().Logf(fmt.Sprintf("Install plan %s fetched with phase %s", fetchedInstallPlan.GetName(), fetchedInstallPlan.Status.Phase))
3018+
ctx.Ctx().Logf(fmt.Sprintf("Install plan %s fetched with conditions %+v", fetchedInstallPlan.GetName(), fetchedInstallPlan.Status.Conditions))
3019+
3020+
})
3021+
29163022
It("compresses installplan step resource manifests to configmap references", func() {
29173023
// Test ensures that all steps for index-based catalogs are references to configmaps. This avoids the problem
29183024
// of installplans growing beyond the etcd size limit when manifests are written to the ip status.

0 commit comments

Comments
 (0)