Skip to content

Commit 7932558

Browse files
committed
add crd storage migration to safely deprecate old storage versions
1 parent 227f9dc commit 7932558

File tree

3 files changed

+183
-5
lines changed

3 files changed

+183
-5
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1372,11 +1372,11 @@ func validateV1Beta1CRDCompatibility(dynamicClient dynamic.Interface, oldCRD *ap
13721372
}
13731373

13741374
func validateExistingCRs(dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, newCRD *apiextensions.CustomResourceDefinition) error {
1375-
// make dynamic client
1376-
crList, err := dynamicClient.Resource(gvr).List(context.TODO(), metav1.ListOptions{})
1375+
crList, err := listExistingCRs(dynamicClient, gvr)
13771376
if err != nil {
1378-
return fmt.Errorf("error listing resources in GroupVersionResource %#v: %s", gvr, err)
1377+
return err
13791378
}
1379+
13801380
for _, cr := range crList.Items {
13811381
validator, _, err := validation.NewSchemaValidator(newCRD.Spec.Validation)
13821382
if err != nil {
@@ -1390,6 +1390,24 @@ func validateExistingCRs(dynamicClient dynamic.Interface, gvr schema.GroupVersio
13901390
return nil
13911391
}
13921392

1393+
func listExistingCRs(client dynamic.Interface, gvr schema.GroupVersionResource) (*unstructured.UnstructuredList, error) {
1394+
crList, err := client.Resource(gvr).List(context.TODO(), metav1.ListOptions{})
1395+
if err != nil {
1396+
return nil, fmt.Errorf("error listing resources in GroupVersionResource %#v: %s", gvr, err)
1397+
}
1398+
return crList, nil
1399+
}
1400+
1401+
func writeExistingCRs(client dynamic.Interface, gvr schema.GroupVersionResource, crList *unstructured.UnstructuredList) error {
1402+
list := *crList
1403+
for _, cr := range list.Items {
1404+
_, err := client.Resource(gvr).Update(context.TODO(), cr.DeepCopy(), metav1.UpdateOptions{})
1405+
if err != nil {
1406+
return fmt.Errorf("writing custom resource %s to backend", gvr.String())
1407+
}
1408+
}
1409+
return nil
1410+
}
13931411
// ExecutePlan applies a planned InstallPlan to a namespace.
13941412
func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
13951413
if plan.Status.Phase != v1alpha1.InstallPlanPhaseInstalling {

pkg/controller/operators/catalog/step.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package catalog
33
import (
44
"context"
55
"fmt"
6+
"k8s.io/apimachinery/pkg/runtime/schema"
67
"k8s.io/client-go/dynamic"
78
"k8s.io/client-go/tools/cache"
89

@@ -133,13 +134,41 @@ func (b *builder) NewCRDV1Step(client apiextensionsv1client.ApiextensionsV1Inter
133134
logger.Debugf("Found one owner for CRD %v", crd)
134135
} else if len(matchedCSV) > 1 {
135136
logger.Debugf("Found multiple owners for CRD %v", crd)
136-
137137
if err = validateV1CRDCompatibility(b.dynamicClient, currentCRD, crd); err != nil {
138138
return v1alpha1.StepStatusUnknown, errorwrap.Wrapf(err, "error validating existing CRs agains new CRD's schema: %s", step.Resource.Name)
139139
}
140140
}
141141

142-
// TODO ensure stored version compatibility
142+
// (optional) run migration to new stored version to ensure no data loss during CRD upgrade
143+
if crdlib.RunStorageMigration(currentCRD, crd) {
144+
// add new storage version to current CRD
145+
currentCRD.Status.StoredVersions = append(currentCRD.Status.StoredVersions, crdlib.GetNewStorageVersion(crd))
146+
_, err := client.CustomResourceDefinitions().UpdateStatus(context.TODO(), currentCRD, metav1.UpdateOptions{})
147+
if err != nil {
148+
return v1alpha1.StepStatusUnknown, errorwrap.Wrapf(err, "error updating CRD status: %s", step.Resource.Name)
149+
}
150+
// list all CRs corresponding to the CRD at the new storage version
151+
gvr := schema.GroupVersionResource{Group: currentCRD.Spec.Group, Version: crdlib.GetNewStorageVersion(crd), Resource: currentCRD.Spec.Names.Plural}
152+
b.logger.Infof("updating %s", gvr.String())
153+
crList, err := listExistingCRs(b.dynamicClient, gvr)
154+
if err != nil {
155+
return v1alpha1.StepStatusUnknown, errorwrap.Wrapf(err, "listing CRs from gvr: %s", step.Resource.Name, gvr.String())
156+
}
157+
// write the CRs back to the backend - this forces the backend to update the CRs to the new version in storage
158+
err = writeExistingCRs(b.dynamicClient, gvr, crList)
159+
if err != nil {
160+
return v1alpha1.StepStatusUnknown, errorwrap.Wrapf(err, "writing CRs from gvr: %s", step.Resource.Name, gvr.String())
161+
}
162+
163+
// remove old storage version from CRD
164+
deprecated := crdlib.GetDeprecatedStorageVersion(currentCRD, crd)
165+
currentCRD.Status.StoredVersions = crdlib.RemoveStorageVersion(currentCRD.Status.StoredVersions, deprecated)
166+
_, err = client.CustomResourceDefinitions().UpdateStatus(context.TODO(), currentCRD, metav1.UpdateOptions{})
167+
if err != nil {
168+
return v1alpha1.StepStatusUnknown, errorwrap.Wrapf(err, "error updating CRD status after storage migration: %s", step.Resource.Name)
169+
}
170+
}
171+
143172
// Update CRD to new version
144173
_, err = client.CustomResourceDefinitions().Update(context.TODO(), crd, metav1.UpdateOptions{})
145174
if err != nil {

pkg/lib/crd/version.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ package crd
22

33
import (
44
"fmt"
5+
"k8s.io/apimachinery/pkg/runtime"
56
"strings"
67

78
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
89
"k8s.io/apimachinery/pkg/util/yaml"
10+
11+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
12+
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
913
)
1014

1115
// V1Beta1 refers to the deprecated v1beta1 APIVersion of CRD objects
@@ -39,3 +43,130 @@ func Version(manifest *string) (string, error) {
3943
return v, nil
4044
}
4145

46+
// Versions returns all resource versions present in the CRD. Compatible with both v1beta1 and v1 CRDs.
47+
func ResourceVersions(obj runtime.Object) (map[string]struct{}, error) {
48+
versions := make(map[string]struct{})
49+
50+
switch crd := obj.(type) {
51+
case *apiextensionsv1.CustomResourceDefinition:
52+
for _, version := range crd.Spec.Versions {
53+
versions[version.Name] = struct{}{}
54+
}
55+
return versions, nil
56+
case *apiextensionsv1beta1.CustomResourceDefinition:
57+
for _, version := range crd.Spec.Versions {
58+
versions[version.Name] = struct{}{}
59+
}
60+
if crd.Spec.Version != "" {
61+
versions[crd.Spec.Version] = struct{}{}
62+
}
63+
return versions, nil
64+
default:
65+
return nil, fmt.Errorf("could not find all versions present in CRD")
66+
}
67+
}
68+
69+
func StoredVersions(obj runtime.Object) []string {
70+
switch crd := obj.(type) {
71+
case *apiextensionsv1.CustomResourceDefinition:
72+
return crd.Status.StoredVersions
73+
case *apiextensionsv1beta1.CustomResourceDefinition:
74+
return crd.Status.StoredVersions
75+
}
76+
return nil
77+
}
78+
79+
// RunStorageMigration determines whether the new CRD changes the storage version of the existing CRD.
80+
// If true, OLM must run a migration process to ensure all CRs can be stored at the new version.
81+
// See https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definition-versioning/#upgrade-existing-objects-to-a-new-stored-version
82+
func RunStorageMigration(oldCRD runtime.Object, newCRD runtime.Object) bool {
83+
newStoredVersions, oldStoredVersions := getStoredVersions(oldCRD, newCRD)
84+
85+
for name := range oldStoredVersions {
86+
if _, ok := newStoredVersions[name]; ok {
87+
// new storage version exists in old CRD present on the cluster
88+
// no need to run migration
89+
return false
90+
}
91+
}
92+
return true
93+
}
94+
95+
func getStoredVersions(oldCRD runtime.Object, newCRD runtime.Object) (newStoredVersions map[string]struct{}, oldStoredVersions map[string]struct{}) {
96+
oldStoredVersions = make(map[string]struct{})
97+
newStoredVersions = make(map[string]struct{})
98+
99+
// find old storage versions by inspect the status field of the existing on-cluster CRD
100+
switch crd := oldCRD.(type) {
101+
case *apiextensionsv1.CustomResourceDefinition:
102+
for _, version := range crd.Status.StoredVersions {
103+
oldStoredVersions[version] = struct{}{}
104+
}
105+
case *apiextensionsv1beta1.CustomResourceDefinition:
106+
for _, version := range crd.Status.StoredVersions {
107+
oldStoredVersions[version] = struct{}{}
108+
}
109+
}
110+
111+
switch crd := newCRD.(type) {
112+
case *apiextensionsv1.CustomResourceDefinition:
113+
for _, version := range crd.Spec.Versions {
114+
if version.Storage {
115+
newStoredVersions[version.Name] = struct{}{}
116+
}
117+
}
118+
case *apiextensionsv1beta1.CustomResourceDefinition:
119+
for _, version := range crd.Spec.Versions {
120+
if version.Storage {
121+
newStoredVersions[version.Name] = struct{}{}
122+
}
123+
}
124+
}
125+
126+
return newStoredVersions, oldStoredVersions
127+
}
128+
129+
// GetNewStorageVersion returns the storage version defined in the CRD.
130+
// Only one version may be specified as the storage version.
131+
func GetNewStorageVersion(crd runtime.Object) string {
132+
switch crd := crd.(type) {
133+
case *apiextensionsv1.CustomResourceDefinition:
134+
for _, version := range crd.Spec.Versions {
135+
if version.Storage {
136+
return version.Name
137+
}
138+
}
139+
case *apiextensionsv1beta1.CustomResourceDefinition:
140+
for _, version := range crd.Spec.Versions {
141+
if version.Storage {
142+
return version.Name
143+
}
144+
}
145+
}
146+
return ""
147+
}
148+
149+
// GetDeprecatedStorageVersion returns the storage version that is being deprecated
150+
func GetDeprecatedStorageVersion(oldCRD runtime.Object, newCRD runtime.Object) string {
151+
newStoredVersions, oldStoredVersions := getStoredVersions(oldCRD, newCRD)
152+
153+
for name := range oldStoredVersions {
154+
if _, ok := newStoredVersions[name]; !ok {
155+
// old storage version does not exist in new CRD - this is the deprecated version
156+
return name
157+
}
158+
}
159+
160+
return ""
161+
}
162+
163+
func RemoveStorageVersion(versions []string, deprecated string) []string {
164+
for i, v := range versions {
165+
if v == deprecated {
166+
return append(versions[:i], versions[i+1:]...)
167+
}
168+
return versions
169+
}
170+
return nil
171+
}
172+

0 commit comments

Comments
 (0)