Skip to content

Commit 17f58eb

Browse files
committed
cleanup old jobs
Signed-off-by: Ankita Thomas <[email protected]> Upstream-repository: operator-lifecycle-manager Upstream-commit: 4fc64d230962fa9cd69a1dc2bc3e4244af452e13
1 parent 8c3a7b9 commit 17f58eb

File tree

7 files changed

+219
-78
lines changed

7 files changed

+219
-78
lines changed

staging/operator-lifecycle-manager/pkg/controller/bundle/bundle_unpacker.go

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha256"
66
"fmt"
7+
"sort"
78
"strings"
89
"time"
910

@@ -657,47 +658,35 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name
657658

658659
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) {
659660
fresh := c.job(cmRef, bundlePath, secrets, timeout)
660-
job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName())
661+
var jobs, toDelete []*batchv1.Job
662+
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
661663
if err != nil {
662-
if apierrors.IsNotFound(err) {
663-
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
664-
}
665-
666664
return
667665
}
666+
if len(jobs) == 0 {
667+
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
668+
return
669+
}
670+
671+
maxRetainedJobs := 5 // TODO: make this configurable
672+
job, toDelete = sortUnpackJobs(jobs, maxRetainedJobs) // choose latest or on-failed job attempt
668673

669674
// only check for retries if an unpackRetryInterval is specified
670675
if unpackRetryInterval > 0 {
671-
if failedCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
672-
lastFailureTime := failedCond.LastTransitionTime.Time
676+
if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
673677
// Look for other unpack jobs for the same bundle
674-
var jobs []*batchv1.Job
675-
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
676-
if err != nil {
677-
return
678-
}
679-
680-
var failed bool
681-
var cond *batchv1.JobCondition
682-
for _, j := range jobs {
683-
cond, failed = getCondition(j, batchv1.JobFailed)
684-
if !failed {
685-
// found an in-progress unpack attempt
686-
job = j
687-
break
688-
}
689-
if cond != nil && lastFailureTime.Before(cond.LastTransitionTime.Time) {
690-
lastFailureTime = cond.LastTransitionTime.Time
691-
}
692-
}
693-
694-
if failed {
695-
if time.Now().After(lastFailureTime.Add(unpackRetryInterval)) {
678+
if cond, failed := getCondition(job, batchv1.JobFailed); failed {
679+
if time.Now().After(cond.LastTransitionTime.Time.Add(unpackRetryInterval)) {
696680
fresh.SetName(names.SimpleNameGenerator.GenerateName(fresh.GetName()))
697681
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
698682
}
699-
return
700683
}
684+
685+
// cleanup old failed jobs, but don't clean up successful jobs to avoid repeat unpacking
686+
for _, j := range toDelete {
687+
_ = c.client.BatchV1().Jobs(j.GetNamespace()).Delete(context.TODO(), j.GetName(), metav1.DeleteOptions{})
688+
}
689+
return
701690
}
702691
}
703692

@@ -840,6 +829,37 @@ func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (con
840829
return
841830
}
842831

832+
func sortUnpackJobs(jobs []*batchv1.Job, maxRetainedJobs int) (latest *batchv1.Job, toDelete []*batchv1.Job) {
833+
if len(jobs) == 0 {
834+
return
835+
}
836+
// sort jobs so that latest job is first
837+
// with preference for non-failed jobs
838+
sort.Slice(jobs, func(i, j int) bool {
839+
condI, failedI := getCondition(jobs[i], batchv1.JobFailed)
840+
condJ, failedJ := getCondition(jobs[j], batchv1.JobFailed)
841+
if failedI != failedJ {
842+
return !failedI // non-failed job goes first
843+
}
844+
return condI.LastTransitionTime.After(condJ.LastTransitionTime.Time)
845+
})
846+
latest = jobs[0]
847+
if len(jobs) <= maxRetainedJobs {
848+
return
849+
}
850+
if maxRetainedJobs == 0 {
851+
toDelete = jobs[1:]
852+
return
853+
}
854+
855+
// cleanup old failed jobs, n-1 recent jobs and the oldest job
856+
for i := 0; i < maxRetainedJobs && i+maxRetainedJobs < len(jobs); i++ {
857+
toDelete = append(toDelete, jobs[maxRetainedJobs+i])
858+
}
859+
860+
return
861+
}
862+
843863
// OperatorGroupBundleUnpackTimeout returns bundle timeout from annotation if specified.
844864
// If the timeout annotation is not set, return timeout < 0 which is subsequently ignored.
845865
// This is to overrides the --bundle-unpack-timeout flag value on per-OperatorGroup basis.

staging/operator-lifecycle-manager/pkg/controller/bundle/bundle_unpacker_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ func TestConfigMapUnpacker(t *testing.T) {
438438
ObjectMeta: metav1.ObjectMeta{
439439
Name: digestHash,
440440
Namespace: "ns-a",
441+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: digestHash},
441442
OwnerReferences: []metav1.OwnerReference{
442443
{
443444
APIVersion: "v1",
@@ -706,6 +707,7 @@ func TestConfigMapUnpacker(t *testing.T) {
706707
ObjectMeta: metav1.ObjectMeta{
707708
Name: digestHash,
708709
Namespace: "ns-a",
710+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: digestHash},
709711
OwnerReferences: []metav1.OwnerReference{
710712
{
711713
APIVersion: "v1",
@@ -968,6 +970,7 @@ func TestConfigMapUnpacker(t *testing.T) {
968970
ObjectMeta: metav1.ObjectMeta{
969971
Name: pathHash,
970972
Namespace: "ns-a",
973+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash},
971974
OwnerReferences: []metav1.OwnerReference{
972975
{
973976
APIVersion: "v1",
@@ -1200,6 +1203,7 @@ func TestConfigMapUnpacker(t *testing.T) {
12001203
ObjectMeta: metav1.ObjectMeta{
12011204
Name: pathHash,
12021205
Namespace: "ns-a",
1206+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash},
12031207
OwnerReferences: []metav1.OwnerReference{
12041208
{
12051209
APIVersion: "v1",
@@ -1443,6 +1447,7 @@ func TestConfigMapUnpacker(t *testing.T) {
14431447
ObjectMeta: metav1.ObjectMeta{
14441448
Name: pathHash,
14451449
Namespace: "ns-a",
1450+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash},
14461451
OwnerReferences: []metav1.OwnerReference{
14471452
{
14481453
APIVersion: "v1",
@@ -1933,3 +1938,101 @@ func TestOperatorGroupBundleUnpackRetryInterval(t *testing.T) {
19331938
})
19341939
}
19351940
}
1941+
1942+
func TestSortUnpackJobs(t *testing.T) {
1943+
// if there is a non-failed job, it should be first
1944+
// otherwise, the latest job should be first
1945+
//first n-1 jobs and oldest job are preserved
1946+
testJob := func(name string, failed bool, ts int64) *batchv1.Job {
1947+
conditions := []batchv1.JobCondition{}
1948+
if failed {
1949+
conditions = append(conditions, batchv1.JobCondition{
1950+
Type: batchv1.JobFailed,
1951+
Status: corev1.ConditionTrue,
1952+
LastTransitionTime: metav1.Time{Time: time.Unix(ts, 0)},
1953+
})
1954+
}
1955+
return &batchv1.Job{
1956+
ObjectMeta: metav1.ObjectMeta{
1957+
Name: name,
1958+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: "test"},
1959+
},
1960+
Status: batchv1.JobStatus{
1961+
Conditions: conditions,
1962+
},
1963+
}
1964+
}
1965+
failedJobs := []*batchv1.Job{
1966+
testJob("f-1", true, 1),
1967+
testJob("f-2", true, 2),
1968+
testJob("f-3", true, 3),
1969+
testJob("f-4", true, 4),
1970+
testJob("f-5", true, 5),
1971+
}
1972+
nonFailedJob := testJob("s-1", false, 1)
1973+
for _, tc := range []struct {
1974+
name string
1975+
jobs []*batchv1.Job
1976+
maxRetained int
1977+
expectedLatest *batchv1.Job
1978+
expectedToDelete []*batchv1.Job
1979+
}{
1980+
{
1981+
name: "no job history",
1982+
maxRetained: 0,
1983+
jobs: []*batchv1.Job{
1984+
failedJobs[1],
1985+
failedJobs[2],
1986+
failedJobs[0],
1987+
},
1988+
expectedLatest: failedJobs[2],
1989+
expectedToDelete: []*batchv1.Job{
1990+
failedJobs[1],
1991+
failedJobs[0],
1992+
},
1993+
}, {
1994+
name: "empty job list",
1995+
maxRetained: 1,
1996+
}, {
1997+
name: "retain oldest",
1998+
maxRetained: 1,
1999+
jobs: []*batchv1.Job{
2000+
failedJobs[2],
2001+
failedJobs[0],
2002+
failedJobs[1],
2003+
},
2004+
expectedToDelete: []*batchv1.Job{
2005+
failedJobs[1],
2006+
},
2007+
expectedLatest: failedJobs[2],
2008+
}, {
2009+
name: "multiple old jobs",
2010+
maxRetained: 2,
2011+
jobs: []*batchv1.Job{
2012+
failedJobs[1],
2013+
failedJobs[0],
2014+
failedJobs[2],
2015+
failedJobs[3],
2016+
failedJobs[4],
2017+
},
2018+
expectedLatest: failedJobs[4],
2019+
expectedToDelete: []*batchv1.Job{
2020+
failedJobs[1],
2021+
failedJobs[2],
2022+
},
2023+
}, {
2024+
name: "select non-failed as latest",
2025+
maxRetained: 3,
2026+
jobs: []*batchv1.Job{
2027+
failedJobs[0],
2028+
failedJobs[1],
2029+
nonFailedJob,
2030+
},
2031+
expectedLatest: nonFailedJob,
2032+
},
2033+
} {
2034+
latest, toDelete := sortUnpackJobs(tc.jobs, tc.maxRetained)
2035+
assert.Equal(t, tc.expectedLatest, latest)
2036+
assert.ElementsMatch(t, tc.expectedToDelete, toDelete)
2037+
}
2038+
}

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,7 +1510,7 @@ type UnpackedBundleReference struct {
15101510
Properties string `json:"properties"`
15111511
}
15121512

1513-
func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, minUnpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
1513+
func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, unpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
15141514
unpacked := true
15151515

15161516
outBundleLookups := make([]v1alpha1.BundleLookup, len(bundleLookups))
@@ -1525,7 +1525,7 @@ func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.
15251525
var errs []error
15261526
for i := 0; i < len(outBundleLookups); i++ {
15271527
lookup := outBundleLookups[i]
1528-
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, minUnpackRetryInterval)
1528+
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, unpackRetryInterval)
15291529
if err != nil {
15301530
errs = append(errs, err)
15311531
continue

staging/operator-lifecycle-manager/test/e2e/registry.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ func createDockerRegistry(client operatorclient.ClientInterface, namespace strin
5454
Port: int32(5000),
5555
},
5656
},
57-
Type: corev1.ServiceTypeNodePort,
5857
},
5958
}
6059

staging/operator-lifecycle-manager/test/e2e/subscription_e2e_test.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2524,6 +2524,11 @@ var _ = Describe("Subscription", func() {
25242524
})
25252525
When("bundle unpack retries are enabled", func() {
25262526
It("should retry failing unpack jobs", func() {
2527+
if ok, err := inKind(c); ok && err == nil {
2528+
Skip("This spec fails when run using KIND cluster. See https://github.com/operator-framework/operator-lifecycle-manager/issues/2420 for more details")
2529+
} else if err != nil {
2530+
Skip("Could not determine whether running in a kind cluster. Skipping.")
2531+
}
25272532
By("Ensuring a registry to host bundle images")
25282533
local, err := Local(c)
25292534
Expect(err).NotTo(HaveOccurred(), "cannot determine if test running locally or on CI: %s", err)
@@ -2579,20 +2584,14 @@ var _ = Describe("Subscription", func() {
25792584
}
25802585
}
25812586

2582-
// testImage is the name of the image used throughout the test - the image overwritten by skopeo
2583-
// the tag is generated randomly and appended to the end of the testImage
2587+
// The remote image to be copied onto the local registry
25842588
srcImage := "quay.io/olmtest/example-operator-bundle:"
25852589
srcTag := "0.1.0"
2586-
bundleImage := fmt.Sprint(registryURL, "/unpack-retry-bundle", ":")
2590+
2591+
// on-cluster image ref
2592+
bundleImage := registryURL + "/unpack-retry-bundle:"
25872593
bundleTag := genName("x")
2588-
//// hash hashes data with sha256 and returns the hex string.
2589-
//func hash(data string) string {
2590-
// // A SHA256 hash is 64 characters, which is within the 253 character limit for kube resource names
2591-
// h := fmt.Sprintf("%x", sha256.Sum256([]byte(data)))
2592-
//
2593-
// // Make the hash 63 characters instead to comply with the 63 character limit for labels
2594-
// return fmt.Sprintf(h[:len(h)-1])
2595-
//}
2594+
25962595
unpackRetryCatalog := fmt.Sprintf(`
25972596
schema: olm.package
25982597
name: unpack-retry-package
@@ -2656,7 +2655,7 @@ properties:
26562655
setBundleUnpackRetryMinimumIntervalAnnotation(context.Background(), ctx.Ctx().Client(), ogNN, "1s")
26572656

26582657
By("waiting until the subscription has an IP reference")
2659-
subscription, err := fetchSubscription(crc, generatedNamespace.GetName(), unpackRetrySubName, subscriptionHasInstallPlanChecker())
2658+
subscription, err := fetchSubscription(crc, generatedNamespace.GetName(), unpackRetrySubName, subscriptionHasInstallPlanChecker)
26602659
Expect(err).Should(BeNil())
26612660

26622661
By("waiting for the v0.1.0 CSV to report a succeeded phase")
@@ -2692,7 +2691,7 @@ properties:
26922691
createSubscriptionForCatalog(crc, generatedNamespace.GetName(), subName, catalogSourceName, "packageA", stableChannel, "", operatorsv1alpha1.ApprovalAutomatic)
26932692

26942693
By("waiting until the subscription has an IP reference")
2695-
subscription, err := fetchSubscription(crc, generatedNamespace.GetName(), subName, subscriptionHasInstallPlanChecker())
2694+
subscription, err := fetchSubscription(crc, generatedNamespace.GetName(), subName, subscriptionHasInstallPlanChecker)
26962695
Expect(err).Should(BeNil())
26972696

26982697
By("waiting for the v0.1.0 CSV to report a succeeded phase")

0 commit comments

Comments
 (0)