Skip to content

Fail InstallPlan on bundle unpack timeout #2093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/catalog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ var (
profiling = flag.Bool(
"profiling", false, "serve profiling data (on port 8080)")

installPlanTimeout = flag.Duration("install-plan-retry-timeout", 1*time.Minute, "time since first attempt at which plan execution errors are considered fatal")
installPlanTimeout = flag.Duration("install-plan-retry-timeout", 1*time.Minute, "time since first attempt at which plan execution errors are considered fatal")
bundleUnpackTimeout = flag.Duration("bundle-unpack-timeout", 10*time.Minute, "The time limit for bundle unpacking, after which InstallPlan execution is considered to have failed. 0 is considered as having no timeout.")
)

func init() {
Expand Down Expand Up @@ -175,7 +176,7 @@ func main() {
}

// Create a new instance of the operator.
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme, *installPlanTimeout)
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme, *installPlanTimeout, *bundleUnpackTimeout)
if err != nil {
log.Panicf("error configuring operator: %s", err.Error())
}
Expand Down
220 changes: 182 additions & 38 deletions pkg/controller/bundle/bundle_unpacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"crypto/sha256"
"fmt"
"strings"
"time"

"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/configmap"
Expand All @@ -14,6 +16,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
listersbatchv1 "k8s.io/client-go/listers/batch/v1"
listerscorev1 "k8s.io/client-go/listers/core/v1"
Expand All @@ -25,6 +28,19 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/projection"
)

const (
// TODO: Move to operator-framework/api/pkg/operators/v1alpha1
// BundleLookupFailed describes conditions types for when BundleLookups fail
BundleLookupFailed operatorsv1alpha1.BundleLookupConditionType = "BundleLookupFailed"

// TODO: This can be a spec field
// BundleUnpackTimeoutAnnotationKey allows setting a bundle unpack timeout per InstallPlan
// and overrides the default specified by the --bundle-unpack-timeout flag
// The time duration should be in the same format as accepted by time.ParseDuration()
// e.g 1m30s
BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout"
)

type BundleUnpackResult struct {
*operatorsv1alpha1.BundleLookup

Expand Down Expand Up @@ -66,7 +82,7 @@ func newBundleUnpackResult(lookup *operatorsv1alpha1.BundleLookup) *BundleUnpack
}
}

func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference) *batchv1.Job {
func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, annotationUnpackTimeout time.Duration) *batchv1.Job {
job := &batchv1.Job{
Spec: batchv1.JobSpec{
//ttlSecondsAfterFinished: 0 // can use in the future to not have to clean up job
Expand All @@ -75,7 +91,12 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
Name: cmRef.Name,
},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
// With restartPolicy = "OnFailure" when the spec.backoffLimit is reached, the job controller will delete all
// the job's pods to stop them from crashlooping forever.
// By setting restartPolicy = "Never" the pods don't get cleaned up since they're not running after a failure.
// Keeping the pods around after failures helps in inspecting the logs of a failed bundle unpack job.
// See: https://kubernetes.io/docs/concepts/workloads/controllers/job/#pod-backoff-failure-policy
RestartPolicy: corev1.RestartPolicyNever,
ImagePullSecrets: secrets,
Containers: []corev1.Container{
{
Expand Down Expand Up @@ -165,26 +186,57 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
job.SetName(cmRef.Name)
job.SetOwnerReferences([]metav1.OwnerReference{ownerRef(cmRef)})

// By default the BackoffLimit is set to 6 which with exponential backoff 10s + 20s + 40s ...
// translates to ~10m of waiting time.
// We want to fail faster than that when we have repeated failures from the bundle unpack pod
// so we set it to 3 which is ~1m of waiting time
// See: https://kubernetes.io/docs/concepts/workloads/controllers/job/#pod-backoff-failure-policy
backOffLimit := int32(3)
job.Spec.BackoffLimit = &backOffLimit

// Set ActiveDeadlineSeconds as the unpack timeout
// Don't set a timeout if it is 0
if c.unpackTimeout != time.Duration(0) {
t := int64(c.unpackTimeout.Seconds())
job.Spec.ActiveDeadlineSeconds = &t
}

// Check annotationUnpackTimeout which is the annotation override for the default unpack timeout
// A negative timeout means the annotation was unset or malformed so we ignore it
if annotationUnpackTimeout < time.Duration(0) {
return job
}
// // 0 means no timeout so we unset ActiveDeadlineSeconds
if annotationUnpackTimeout == time.Duration(0) {
job.Spec.ActiveDeadlineSeconds = nil
return job
}

timeoutSeconds := int64(annotationUnpackTimeout.Seconds())
job.Spec.ActiveDeadlineSeconds = &timeoutSeconds

return job
}

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Unpacker

type Unpacker interface {
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup) (result *BundleUnpackResult, err error)
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error)
}

type ConfigMapUnpacker struct {
opmImage string
utilImage string
client kubernetes.Interface
csLister listersoperatorsv1alpha1.CatalogSourceLister
cmLister listerscorev1.ConfigMapLister
jobLister listersbatchv1.JobLister
roleLister listersrbacv1.RoleLister
rbLister listersrbacv1.RoleBindingLister
loader *configmap.BundleLoader
now func() metav1.Time
opmImage string
utilImage string
client kubernetes.Interface
csLister listersoperatorsv1alpha1.CatalogSourceLister
cmLister listerscorev1.ConfigMapLister
jobLister listersbatchv1.JobLister
podLister listerscorev1.PodLister
roleLister listersrbacv1.RoleLister
rbLister listersrbacv1.RoleBindingLister
loader *configmap.BundleLoader
now func() metav1.Time
unpackTimeout time.Duration
}

type ConfigMapUnpackerOption func(*ConfigMapUnpacker)
Expand All @@ -202,6 +254,12 @@ func NewConfigmapUnpacker(options ...ConfigMapUnpackerOption) (*ConfigMapUnpacke
return unpacker, nil
}

func WithUnpackTimeout(timeout time.Duration) ConfigMapUnpackerOption {
return func(unpacker *ConfigMapUnpacker) {
unpacker.unpackTimeout = timeout
}
}

func WithOPMImage(opmImage string) ConfigMapUnpackerOption {
return func(unpacker *ConfigMapUnpacker) {
unpacker.opmImage = opmImage
Expand Down Expand Up @@ -238,6 +296,12 @@ func WithJobLister(jobLister listersbatchv1.JobLister) ConfigMapUnpackerOption {
}
}

func WithPodLister(podLister listerscorev1.PodLister) ConfigMapUnpackerOption {
return func(unpacker *ConfigMapUnpacker) {
unpacker.podLister = podLister
}
}

func WithRoleLister(roleLister listersrbacv1.RoleLister) ConfigMapUnpackerOption {
return func(unpacker *ConfigMapUnpacker) {
unpacker.roleLister = roleLister
Expand Down Expand Up @@ -276,6 +340,8 @@ func (c *ConfigMapUnpacker) validate() (err error) {
err = fmt.Errorf("configmap lister is nil")
case c.jobLister == nil:
err = fmt.Errorf("job lister is nil")
case c.podLister == nil:
err = fmt.Errorf("pod lister is nil")
case c.roleLister == nil:
err = fmt.Errorf("role lister is nil")
case c.rbLister == nil:
Expand All @@ -292,6 +358,8 @@ func (c *ConfigMapUnpacker) validate() (err error) {
const (
CatalogSourceMissingReason = "CatalogSourceMissing"
CatalogSourceMissingMessage = "referenced catalogsource not found"
JobFailedReason = "JobFailed"
JobFailedMessage = "unpack job has failed"
JobIncompleteReason = "JobIncomplete"
JobIncompleteMessage = "unpack job not completed"
JobNotStartedReason = "JobNotStarted"
Expand All @@ -300,25 +368,32 @@ const (
NotUnpackedMessage = "bundle contents have not yet been persisted to installplan status"
)

func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup) (result *BundleUnpackResult, err error) {
func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) {

result = newBundleUnpackResult(lookup)

// if pending condition is missing, bundle has already been unpacked
cond := result.GetCondition(operatorsv1alpha1.BundleLookupPending)
if cond.Status == corev1.ConditionUnknown {
// if bundle lookup failed condition already present, then there is nothing more to do
failedCond := result.GetCondition(BundleLookupFailed)
if failedCond.Status == corev1.ConditionTrue {
return result, nil
}

// if pending condition is not true then bundle has already been unpacked(unknown)
pendingCond := result.GetCondition(operatorsv1alpha1.BundleLookupPending)
if pendingCond.Status != corev1.ConditionTrue {
return result, nil
}

now := c.now()

var cs *operatorsv1alpha1.CatalogSource
if cs, err = c.csLister.CatalogSources(result.CatalogSourceRef.Namespace).Get(result.CatalogSourceRef.Name); err != nil {
if apierrors.IsNotFound(err) && cond.Reason != CatalogSourceMissingReason {
cond.Status = corev1.ConditionTrue
cond.Reason = CatalogSourceMissingReason
cond.Message = CatalogSourceMissingMessage
cond.LastTransitionTime = &now
result.SetCondition(cond)
if apierrors.IsNotFound(err) && pendingCond.Reason != CatalogSourceMissingReason {
pendingCond.Status = corev1.ConditionTrue
pendingCond.Reason = CatalogSourceMissingReason
pendingCond.Message = CatalogSourceMissingMessage
pendingCond.LastTransitionTime = &now
result.SetCondition(pendingCond)
err = nil
}

Expand Down Expand Up @@ -356,17 +431,50 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup)
secrets = append(secrets, corev1.LocalObjectReference{Name: secretName})
}
var job *batchv1.Job
job, err = c.ensureJob(cmRef, result.Path, secrets)
if err != nil {
job, err = c.ensureJob(cmRef, result.Path, secrets, timeout)
if err != nil || job == nil {
// ensureJob can return nil if the job present does not match the expected job (spec and ownerefs)
// The current job is deleted in that case so UnpackBundle needs to be retried
return
}

// Check if bundle unpack job has failed due a timeout
// Return a BundleJobError so we can mark the InstallPlan as Failed
if jobCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
// Add the BundleLookupFailed condition with the message and reason from the job failure
failedCond.Status = corev1.ConditionTrue
failedCond.Reason = jobCond.Reason
failedCond.Message = jobCond.Message
failedCond.LastTransitionTime = &now
result.SetCondition(failedCond)

return
}

if !jobConditionTrue(job, batchv1.JobComplete) && (cond.Status != corev1.ConditionTrue || cond.Reason != JobIncompleteReason) {
cond.Status = corev1.ConditionTrue
cond.Reason = JobIncompleteReason
cond.Message = JobIncompleteMessage
cond.LastTransitionTime = &now
result.SetCondition(cond)
if _, isComplete := getCondition(job, batchv1.JobComplete); !isComplete {
// In the case of an image pull failure for a non-existent image the bundle unpack job
// can stay pending until the ActiveDeadlineSeconds timeout ~10m
// To indicate why it's pending we inspect the container statuses of the
// unpack Job pods to surface that information on the bundle lookup conditions
pendingMessage := JobIncompleteMessage
var pendingContainerStatusMsgs string
pendingContainerStatusMsgs, err = c.pendingContainerStatusMessages(job)
if err != nil {
return
}

if pendingContainerStatusMsgs != "" {
pendingMessage = pendingMessage + ": " + pendingContainerStatusMsgs
}

// Update BundleLookupPending condition if there are any changes
if pendingCond.Status != corev1.ConditionTrue || pendingCond.Reason != JobIncompleteReason || pendingCond.Message != pendingMessage {
pendingCond.Status = corev1.ConditionTrue
pendingCond.Reason = JobIncompleteReason
pendingCond.Message = pendingMessage
pendingCond.LastTransitionTime = &now
result.SetCondition(pendingCond)
}

return
}
Expand Down Expand Up @@ -394,6 +502,39 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup)
return
}

func (c *ConfigMapUnpacker) pendingContainerStatusMessages(job *batchv1.Job) (string, error) {
containerStatusMessages := []string{}
// List pods for unpack job
podLabel := map[string]string{"job-name": job.GetName()}
pods, listErr := c.podLister.Pods(job.GetNamespace()).List(k8slabels.SelectorFromSet(podLabel))
if listErr != nil {
return "", fmt.Errorf("Failed to list pods for job(%s): %v", job.GetName(), listErr)
}

// Ideally there should be just 1 pod running but inspect all pods in the pending phase
// to see if any are stuck on an ImagePullBackOff or ErrImagePull error
for _, pod := range pods {
if pod.Status.Phase != corev1.PodPending {
// skip status check for non-pending pods
continue
}

for _, ic := range pod.Status.InitContainerStatuses {
if ic.Ready {
// only check non-ready containers for their waiting reasons
continue
}

// Aggregate the wait reasons for all pending containers
containerStatusMessages = append(containerStatusMessages,
fmt.Sprintf("Unpack pod(%s/%s) container(%s) is pending. Reason: %s, Message: %s",
pod.Namespace, pod.Name, ic.Name, ic.State.Waiting.Reason, ic.State.Waiting.Message))
}
}

return strings.Join(containerStatusMessages, " | "), nil
}

func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name string) (cm *corev1.ConfigMap, err error) {
fresh := &corev1.ConfigMap{}
fresh.SetNamespace(csRef.Namespace)
Expand All @@ -408,8 +549,8 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name
return
}

func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference) (job *batchv1.Job, err error) {
fresh := c.job(cmRef, bundlePath, secrets)
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration) (job *batchv1.Job, err error) {
fresh := c.job(cmRef, bundlePath, secrets, timeout)
job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName())
if err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -540,16 +681,19 @@ func ownerRef(ref *corev1.ObjectReference) metav1.OwnerReference {
}
}

// jobConditionTrue returns true if the given job has the given condition with the given condition type true, and returns false otherwise.
func jobConditionTrue(job *batchv1.Job, conditionType batchv1.JobConditionType) bool {
// getCondition returns true if the given job has the given condition with the given condition type true, and returns false otherwise.
// Also returns the condition if true
func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (condition *batchv1.JobCondition, isTrue bool) {
if job == nil {
return false
return
}

for _, cond := range job.Status.Conditions {
if cond.Type == conditionType && cond.Status == corev1.ConditionTrue {
return true
condition = &cond
isTrue = true
return
}
}
return false
return
}
Loading