Skip to content

Commit 01af7a3

Browse files
Merge pull request #2093 from hasbro17/bundle-lookup-timeout-on-IP-status
Fail InstallPlan on bundle unpack timeout
2 parents 7529f75 + 447f30e commit 01af7a3

File tree

6 files changed

+1022
-73
lines changed

6 files changed

+1022
-73
lines changed

cmd/catalog/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ var (
7070
profiling = flag.Bool(
7171
"profiling", false, "serve profiling data (on port 8080)")
7272

73-
installPlanTimeout = flag.Duration("install-plan-retry-timeout", 1*time.Minute, "time since first attempt at which plan execution errors are considered fatal")
73+
installPlanTimeout = flag.Duration("install-plan-retry-timeout", 1*time.Minute, "time since first attempt at which plan execution errors are considered fatal")
74+
bundleUnpackTimeout = flag.Duration("bundle-unpack-timeout", 10*time.Minute, "The time limit for bundle unpacking, after which InstallPlan execution is considered to have failed. 0 is considered as having no timeout.")
7475
)
7576

7677
func init() {
@@ -175,7 +176,7 @@ func main() {
175176
}
176177

177178
// Create a new instance of the operator.
178-
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme, *installPlanTimeout)
179+
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme, *installPlanTimeout, *bundleUnpackTimeout)
179180
if err != nil {
180181
log.Panicf("error configuring operator: %s", err.Error())
181182
}

pkg/controller/bundle/bundle_unpacker.go

Lines changed: 182 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"crypto/sha256"
66
"fmt"
7+
"strings"
8+
"time"
79

810
"github.com/operator-framework/operator-registry/pkg/api"
911
"github.com/operator-framework/operator-registry/pkg/configmap"
@@ -14,6 +16,7 @@ import (
1416
apierrors "k8s.io/apimachinery/pkg/api/errors"
1517
"k8s.io/apimachinery/pkg/api/resource"
1618
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
k8slabels "k8s.io/apimachinery/pkg/labels"
1720
"k8s.io/client-go/kubernetes"
1821
listersbatchv1 "k8s.io/client-go/listers/batch/v1"
1922
listerscorev1 "k8s.io/client-go/listers/core/v1"
@@ -25,6 +28,19 @@ import (
2528
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/projection"
2629
)
2730

31+
const (
32+
// TODO: Move to operator-framework/api/pkg/operators/v1alpha1
33+
// BundleLookupFailed describes conditions types for when BundleLookups fail
34+
BundleLookupFailed operatorsv1alpha1.BundleLookupConditionType = "BundleLookupFailed"
35+
36+
// TODO: This can be a spec field
37+
// BundleUnpackTimeoutAnnotationKey allows setting a bundle unpack timeout per InstallPlan
38+
// and overrides the default specified by the --bundle-unpack-timeout flag
39+
// The time duration should be in the same format as accepted by time.ParseDuration()
40+
// e.g 1m30s
41+
BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout"
42+
)
43+
2844
type BundleUnpackResult struct {
2945
*operatorsv1alpha1.BundleLookup
3046

@@ -66,7 +82,7 @@ func newBundleUnpackResult(lookup *operatorsv1alpha1.BundleLookup) *BundleUnpack
6682
}
6783
}
6884

69-
func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference) *batchv1.Job {
85+
func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, annotationUnpackTimeout time.Duration) *batchv1.Job {
7086
job := &batchv1.Job{
7187
Spec: batchv1.JobSpec{
7288
//ttlSecondsAfterFinished: 0 // can use in the future to not have to clean up job
@@ -75,7 +91,12 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
7591
Name: cmRef.Name,
7692
},
7793
Spec: corev1.PodSpec{
78-
RestartPolicy: corev1.RestartPolicyOnFailure,
94+
// With restartPolicy = "OnFailure" when the spec.backoffLimit is reached, the job controller will delete all
95+
// the job's pods to stop them from crashlooping forever.
96+
// By setting restartPolicy = "Never" the pods don't get cleaned up since they're not running after a failure.
97+
// Keeping the pods around after failures helps in inspecting the logs of a failed bundle unpack job.
98+
// See: https://kubernetes.io/docs/concepts/workloads/controllers/job/#pod-backoff-failure-policy
99+
RestartPolicy: corev1.RestartPolicyNever,
79100
ImagePullSecrets: secrets,
80101
Containers: []corev1.Container{
81102
{
@@ -165,26 +186,57 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
165186
job.SetName(cmRef.Name)
166187
job.SetOwnerReferences([]metav1.OwnerReference{ownerRef(cmRef)})
167188

189+
// By default the BackoffLimit is set to 6 which with exponential backoff 10s + 20s + 40s ...
190+
// translates to ~10m of waiting time.
191+
// We want to fail faster than that when we have repeated failures from the bundle unpack pod
192+
// so we set it to 3 which is ~1m of waiting time
193+
// See: https://kubernetes.io/docs/concepts/workloads/controllers/job/#pod-backoff-failure-policy
194+
backOffLimit := int32(3)
195+
job.Spec.BackoffLimit = &backOffLimit
196+
197+
// Set ActiveDeadlineSeconds as the unpack timeout
198+
// Don't set a timeout if it is 0
199+
if c.unpackTimeout != time.Duration(0) {
200+
t := int64(c.unpackTimeout.Seconds())
201+
job.Spec.ActiveDeadlineSeconds = &t
202+
}
203+
204+
// Check annotationUnpackTimeout which is the annotation override for the default unpack timeout
205+
// A negative timeout means the annotation was unset or malformed so we ignore it
206+
if annotationUnpackTimeout < time.Duration(0) {
207+
return job
208+
}
209+
// // 0 means no timeout so we unset ActiveDeadlineSeconds
210+
if annotationUnpackTimeout == time.Duration(0) {
211+
job.Spec.ActiveDeadlineSeconds = nil
212+
return job
213+
}
214+
215+
timeoutSeconds := int64(annotationUnpackTimeout.Seconds())
216+
job.Spec.ActiveDeadlineSeconds = &timeoutSeconds
217+
168218
return job
169219
}
170220

171221
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Unpacker
172222

173223
type Unpacker interface {
174-
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup) (result *BundleUnpackResult, err error)
224+
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error)
175225
}
176226

177227
type ConfigMapUnpacker struct {
178-
opmImage string
179-
utilImage string
180-
client kubernetes.Interface
181-
csLister listersoperatorsv1alpha1.CatalogSourceLister
182-
cmLister listerscorev1.ConfigMapLister
183-
jobLister listersbatchv1.JobLister
184-
roleLister listersrbacv1.RoleLister
185-
rbLister listersrbacv1.RoleBindingLister
186-
loader *configmap.BundleLoader
187-
now func() metav1.Time
228+
opmImage string
229+
utilImage string
230+
client kubernetes.Interface
231+
csLister listersoperatorsv1alpha1.CatalogSourceLister
232+
cmLister listerscorev1.ConfigMapLister
233+
jobLister listersbatchv1.JobLister
234+
podLister listerscorev1.PodLister
235+
roleLister listersrbacv1.RoleLister
236+
rbLister listersrbacv1.RoleBindingLister
237+
loader *configmap.BundleLoader
238+
now func() metav1.Time
239+
unpackTimeout time.Duration
188240
}
189241

190242
type ConfigMapUnpackerOption func(*ConfigMapUnpacker)
@@ -202,6 +254,12 @@ func NewConfigmapUnpacker(options ...ConfigMapUnpackerOption) (*ConfigMapUnpacke
202254
return unpacker, nil
203255
}
204256

257+
func WithUnpackTimeout(timeout time.Duration) ConfigMapUnpackerOption {
258+
return func(unpacker *ConfigMapUnpacker) {
259+
unpacker.unpackTimeout = timeout
260+
}
261+
}
262+
205263
func WithOPMImage(opmImage string) ConfigMapUnpackerOption {
206264
return func(unpacker *ConfigMapUnpacker) {
207265
unpacker.opmImage = opmImage
@@ -238,6 +296,12 @@ func WithJobLister(jobLister listersbatchv1.JobLister) ConfigMapUnpackerOption {
238296
}
239297
}
240298

299+
func WithPodLister(podLister listerscorev1.PodLister) ConfigMapUnpackerOption {
300+
return func(unpacker *ConfigMapUnpacker) {
301+
unpacker.podLister = podLister
302+
}
303+
}
304+
241305
func WithRoleLister(roleLister listersrbacv1.RoleLister) ConfigMapUnpackerOption {
242306
return func(unpacker *ConfigMapUnpacker) {
243307
unpacker.roleLister = roleLister
@@ -276,6 +340,8 @@ func (c *ConfigMapUnpacker) validate() (err error) {
276340
err = fmt.Errorf("configmap lister is nil")
277341
case c.jobLister == nil:
278342
err = fmt.Errorf("job lister is nil")
343+
case c.podLister == nil:
344+
err = fmt.Errorf("pod lister is nil")
279345
case c.roleLister == nil:
280346
err = fmt.Errorf("role lister is nil")
281347
case c.rbLister == nil:
@@ -292,6 +358,8 @@ func (c *ConfigMapUnpacker) validate() (err error) {
292358
const (
293359
CatalogSourceMissingReason = "CatalogSourceMissing"
294360
CatalogSourceMissingMessage = "referenced catalogsource not found"
361+
JobFailedReason = "JobFailed"
362+
JobFailedMessage = "unpack job has failed"
295363
JobIncompleteReason = "JobIncomplete"
296364
JobIncompleteMessage = "unpack job not completed"
297365
JobNotStartedReason = "JobNotStarted"
@@ -300,25 +368,32 @@ const (
300368
NotUnpackedMessage = "bundle contents have not yet been persisted to installplan status"
301369
)
302370

303-
func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup) (result *BundleUnpackResult, err error) {
371+
func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) {
372+
304373
result = newBundleUnpackResult(lookup)
305374

306-
// if pending condition is missing, bundle has already been unpacked
307-
cond := result.GetCondition(operatorsv1alpha1.BundleLookupPending)
308-
if cond.Status == corev1.ConditionUnknown {
375+
// if bundle lookup failed condition already present, then there is nothing more to do
376+
failedCond := result.GetCondition(BundleLookupFailed)
377+
if failedCond.Status == corev1.ConditionTrue {
378+
return result, nil
379+
}
380+
381+
// if pending condition is not true then bundle has already been unpacked(unknown)
382+
pendingCond := result.GetCondition(operatorsv1alpha1.BundleLookupPending)
383+
if pendingCond.Status != corev1.ConditionTrue {
309384
return result, nil
310385
}
311386

312387
now := c.now()
313388

314389
var cs *operatorsv1alpha1.CatalogSource
315390
if cs, err = c.csLister.CatalogSources(result.CatalogSourceRef.Namespace).Get(result.CatalogSourceRef.Name); err != nil {
316-
if apierrors.IsNotFound(err) && cond.Reason != CatalogSourceMissingReason {
317-
cond.Status = corev1.ConditionTrue
318-
cond.Reason = CatalogSourceMissingReason
319-
cond.Message = CatalogSourceMissingMessage
320-
cond.LastTransitionTime = &now
321-
result.SetCondition(cond)
391+
if apierrors.IsNotFound(err) && pendingCond.Reason != CatalogSourceMissingReason {
392+
pendingCond.Status = corev1.ConditionTrue
393+
pendingCond.Reason = CatalogSourceMissingReason
394+
pendingCond.Message = CatalogSourceMissingMessage
395+
pendingCond.LastTransitionTime = &now
396+
result.SetCondition(pendingCond)
322397
err = nil
323398
}
324399

@@ -356,17 +431,50 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup)
356431
secrets = append(secrets, corev1.LocalObjectReference{Name: secretName})
357432
}
358433
var job *batchv1.Job
359-
job, err = c.ensureJob(cmRef, result.Path, secrets)
360-
if err != nil {
434+
job, err = c.ensureJob(cmRef, result.Path, secrets, timeout)
435+
if err != nil || job == nil {
436+
// ensureJob can return nil if the job present does not match the expected job (spec and ownerefs)
437+
// The current job is deleted in that case so UnpackBundle needs to be retried
438+
return
439+
}
440+
441+
// Check if bundle unpack job has failed due a timeout
442+
// Return a BundleJobError so we can mark the InstallPlan as Failed
443+
if jobCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
444+
// Add the BundleLookupFailed condition with the message and reason from the job failure
445+
failedCond.Status = corev1.ConditionTrue
446+
failedCond.Reason = jobCond.Reason
447+
failedCond.Message = jobCond.Message
448+
failedCond.LastTransitionTime = &now
449+
result.SetCondition(failedCond)
450+
361451
return
362452
}
363453

364-
if !jobConditionTrue(job, batchv1.JobComplete) && (cond.Status != corev1.ConditionTrue || cond.Reason != JobIncompleteReason) {
365-
cond.Status = corev1.ConditionTrue
366-
cond.Reason = JobIncompleteReason
367-
cond.Message = JobIncompleteMessage
368-
cond.LastTransitionTime = &now
369-
result.SetCondition(cond)
454+
if _, isComplete := getCondition(job, batchv1.JobComplete); !isComplete {
455+
// In the case of an image pull failure for a non-existent image the bundle unpack job
456+
// can stay pending until the ActiveDeadlineSeconds timeout ~10m
457+
// To indicate why it's pending we inspect the container statuses of the
458+
// unpack Job pods to surface that information on the bundle lookup conditions
459+
pendingMessage := JobIncompleteMessage
460+
var pendingContainerStatusMsgs string
461+
pendingContainerStatusMsgs, err = c.pendingContainerStatusMessages(job)
462+
if err != nil {
463+
return
464+
}
465+
466+
if pendingContainerStatusMsgs != "" {
467+
pendingMessage = pendingMessage + ": " + pendingContainerStatusMsgs
468+
}
469+
470+
// Update BundleLookupPending condition if there are any changes
471+
if pendingCond.Status != corev1.ConditionTrue || pendingCond.Reason != JobIncompleteReason || pendingCond.Message != pendingMessage {
472+
pendingCond.Status = corev1.ConditionTrue
473+
pendingCond.Reason = JobIncompleteReason
474+
pendingCond.Message = pendingMessage
475+
pendingCond.LastTransitionTime = &now
476+
result.SetCondition(pendingCond)
477+
}
370478

371479
return
372480
}
@@ -394,6 +502,39 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup)
394502
return
395503
}
396504

505+
func (c *ConfigMapUnpacker) pendingContainerStatusMessages(job *batchv1.Job) (string, error) {
506+
containerStatusMessages := []string{}
507+
// List pods for unpack job
508+
podLabel := map[string]string{"job-name": job.GetName()}
509+
pods, listErr := c.podLister.Pods(job.GetNamespace()).List(k8slabels.SelectorFromSet(podLabel))
510+
if listErr != nil {
511+
return "", fmt.Errorf("Failed to list pods for job(%s): %v", job.GetName(), listErr)
512+
}
513+
514+
// Ideally there should be just 1 pod running but inspect all pods in the pending phase
515+
// to see if any are stuck on an ImagePullBackOff or ErrImagePull error
516+
for _, pod := range pods {
517+
if pod.Status.Phase != corev1.PodPending {
518+
// skip status check for non-pending pods
519+
continue
520+
}
521+
522+
for _, ic := range pod.Status.InitContainerStatuses {
523+
if ic.Ready {
524+
// only check non-ready containers for their waiting reasons
525+
continue
526+
}
527+
528+
// Aggregate the wait reasons for all pending containers
529+
containerStatusMessages = append(containerStatusMessages,
530+
fmt.Sprintf("Unpack pod(%s/%s) container(%s) is pending. Reason: %s, Message: %s",
531+
pod.Namespace, pod.Name, ic.Name, ic.State.Waiting.Reason, ic.State.Waiting.Message))
532+
}
533+
}
534+
535+
return strings.Join(containerStatusMessages, " | "), nil
536+
}
537+
397538
func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name string) (cm *corev1.ConfigMap, err error) {
398539
fresh := &corev1.ConfigMap{}
399540
fresh.SetNamespace(csRef.Namespace)
@@ -408,8 +549,8 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name
408549
return
409550
}
410551

411-
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference) (job *batchv1.Job, err error) {
412-
fresh := c.job(cmRef, bundlePath, secrets)
552+
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration) (job *batchv1.Job, err error) {
553+
fresh := c.job(cmRef, bundlePath, secrets, timeout)
413554
job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName())
414555
if err != nil {
415556
if apierrors.IsNotFound(err) {
@@ -540,16 +681,19 @@ func ownerRef(ref *corev1.ObjectReference) metav1.OwnerReference {
540681
}
541682
}
542683

543-
// jobConditionTrue returns true if the given job has the given condition with the given condition type true, and returns false otherwise.
544-
func jobConditionTrue(job *batchv1.Job, conditionType batchv1.JobConditionType) bool {
684+
// getCondition returns true if the given job has the given condition with the given condition type true, and returns false otherwise.
685+
// Also returns the condition if true
686+
func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (condition *batchv1.JobCondition, isTrue bool) {
545687
if job == nil {
546-
return false
688+
return
547689
}
548690

549691
for _, cond := range job.Status.Conditions {
550692
if cond.Type == conditionType && cond.Status == corev1.ConditionTrue {
551-
return true
693+
condition = &cond
694+
isTrue = true
695+
return
552696
}
553697
}
554-
return false
698+
return
555699
}

0 commit comments

Comments
 (0)