Skip to content

Bug 1828550: safely deprecate stored versions of CRDs #1504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,11 +1372,11 @@ func validateV1Beta1CRDCompatibility(dynamicClient dynamic.Interface, oldCRD *ap
}

func validateExistingCRs(dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, newCRD *apiextensions.CustomResourceDefinition) error {
// make dynamic client
crList, err := dynamicClient.Resource(gvr).List(context.TODO(), metav1.ListOptions{})
crList, err := listExistingCRs(dynamicClient, gvr)
if err != nil {
return fmt.Errorf("error listing resources in GroupVersionResource %#v: %s", gvr, err)
return err
}

for _, cr := range crList.Items {
validator, _, err := validation.NewSchemaValidator(newCRD.Spec.Validation)
if err != nil {
Expand All @@ -1390,6 +1390,24 @@ func validateExistingCRs(dynamicClient dynamic.Interface, gvr schema.GroupVersio
return nil
}

func listExistingCRs(client dynamic.Interface, gvr schema.GroupVersionResource) (*unstructured.UnstructuredList, error) {
crList, err := client.Resource(gvr).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("error listing resources in GroupVersionResource %#v: %s", gvr, err)
}
return crList, nil
}

func writeExistingCRs(client dynamic.Interface, gvr schema.GroupVersionResource, crList *unstructured.UnstructuredList) error {
list := *crList
for _, cr := range list.Items {
_, err := client.Resource(gvr).Update(context.TODO(), cr.DeepCopy(), metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("writing custom resource %s to backend", gvr.String())
}
}
return nil
}
// ExecutePlan applies a planned InstallPlan to a namespace.
func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
if plan.Status.Phase != v1alpha1.InstallPlanPhaseInstalling {
Expand Down
33 changes: 31 additions & 2 deletions pkg/controller/operators/catalog/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package catalog
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -133,13 +134,41 @@ func (b *builder) NewCRDV1Step(client apiextensionsv1client.ApiextensionsV1Inter
logger.Debugf("Found one owner for CRD %v", crd)
} else if len(matchedCSV) > 1 {
logger.Debugf("Found multiple owners for CRD %v", crd)

if err = validateV1CRDCompatibility(b.dynamicClient, currentCRD, crd); err != nil {
return v1alpha1.StepStatusUnknown, errorwrap.Wrapf(err, "error validating existing CRs agains new CRD's schema: %s", step.Resource.Name)
}
}

// TODO ensure stored version compatibility
// (optional) run migration to new stored version to ensure no data loss during CRD upgrade
if crdlib.RunStorageMigration(currentCRD, crd) {
// add new storage version to current CRD
currentCRD.Status.StoredVersions = append(currentCRD.Status.StoredVersions, crdlib.GetNewStorageVersion(crd))
_, err := client.CustomResourceDefinitions().UpdateStatus(context.TODO(), currentCRD, metav1.UpdateOptions{})
if err != nil {
return v1alpha1.StepStatusUnknown, errorwrap.Wrapf(err, "error updating CRD status: %s", step.Resource.Name)
}
// list all CRs corresponding to the CRD at the new storage version
gvr := schema.GroupVersionResource{Group: currentCRD.Spec.Group, Version: crdlib.GetNewStorageVersion(crd), Resource: currentCRD.Spec.Names.Plural}
b.logger.Infof("updating %s", gvr.String())
crList, err := listExistingCRs(b.dynamicClient, gvr)
if err != nil {
return v1alpha1.StepStatusUnknown, errorwrap.Wrapf(err, "listing CRs from gvr: %s", step.Resource.Name, gvr.String())
}
// write the CRs back to the backend - this forces the backend to update the CRs to the new version in storage
err = writeExistingCRs(b.dynamicClient, gvr, crList)
if err != nil {
return v1alpha1.StepStatusUnknown, errorwrap.Wrapf(err, "writing CRs from gvr: %s", step.Resource.Name, gvr.String())
}

// remove old storage version from CRD
deprecated := crdlib.GetDeprecatedStorageVersion(currentCRD, crd)
currentCRD.Status.StoredVersions = crdlib.RemoveStorageVersion(currentCRD.Status.StoredVersions, deprecated)
_, err = client.CustomResourceDefinitions().UpdateStatus(context.TODO(), currentCRD, metav1.UpdateOptions{})
if err != nil {
return v1alpha1.StepStatusUnknown, errorwrap.Wrapf(err, "error updating CRD status after storage migration: %s", step.Resource.Name)
}
}

// Update CRD to new version
_, err = client.CustomResourceDefinitions().Update(context.TODO(), crd, metav1.UpdateOptions{})
if err != nil {
Expand Down
131 changes: 131 additions & 0 deletions pkg/lib/crd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package crd

import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"strings"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/yaml"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
)

// V1Beta1 refers to the deprecated v1beta1 APIVersion of CRD objects
Expand Down Expand Up @@ -39,3 +43,130 @@ func Version(manifest *string) (string, error) {
return v, nil
}

// Versions returns all resource versions present in the CRD. Compatible with both v1beta1 and v1 CRDs.
func ResourceVersions(obj runtime.Object) (map[string]struct{}, error) {
versions := make(map[string]struct{})

switch crd := obj.(type) {
case *apiextensionsv1.CustomResourceDefinition:
for _, version := range crd.Spec.Versions {
versions[version.Name] = struct{}{}
}
return versions, nil
case *apiextensionsv1beta1.CustomResourceDefinition:
for _, version := range crd.Spec.Versions {
versions[version.Name] = struct{}{}
}
if crd.Spec.Version != "" {
versions[crd.Spec.Version] = struct{}{}
}
return versions, nil
default:
return nil, fmt.Errorf("could not find all versions present in CRD")
}
}

func StoredVersions(obj runtime.Object) []string {
switch crd := obj.(type) {
case *apiextensionsv1.CustomResourceDefinition:
return crd.Status.StoredVersions
case *apiextensionsv1beta1.CustomResourceDefinition:
return crd.Status.StoredVersions
}
return nil
}

// RunStorageMigration determines whether the new CRD changes the storage version of the existing CRD.
// If true, OLM must run a migration process to ensure all CRs can be stored at the new version.
// See https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definition-versioning/#upgrade-existing-objects-to-a-new-stored-version
func RunStorageMigration(oldCRD runtime.Object, newCRD runtime.Object) bool {
newStoredVersions, oldStoredVersions := getStoredVersions(oldCRD, newCRD)

for name := range oldStoredVersions {
if _, ok := newStoredVersions[name]; ok {
// new storage version exists in old CRD present on the cluster
// no need to run migration
return false
}
}
return true
}

func getStoredVersions(oldCRD runtime.Object, newCRD runtime.Object) (newStoredVersions map[string]struct{}, oldStoredVersions map[string]struct{}) {
oldStoredVersions = make(map[string]struct{})
newStoredVersions = make(map[string]struct{})

// find old storage versions by inspect the status field of the existing on-cluster CRD
switch crd := oldCRD.(type) {
case *apiextensionsv1.CustomResourceDefinition:
for _, version := range crd.Status.StoredVersions {
oldStoredVersions[version] = struct{}{}
}
case *apiextensionsv1beta1.CustomResourceDefinition:
for _, version := range crd.Status.StoredVersions {
oldStoredVersions[version] = struct{}{}
}
}

switch crd := newCRD.(type) {
case *apiextensionsv1.CustomResourceDefinition:
for _, version := range crd.Spec.Versions {
if version.Storage {
newStoredVersions[version.Name] = struct{}{}
}
}
case *apiextensionsv1beta1.CustomResourceDefinition:
for _, version := range crd.Spec.Versions {
if version.Storage {
newStoredVersions[version.Name] = struct{}{}
}
}
}

return newStoredVersions, oldStoredVersions
}

// GetNewStorageVersion returns the storage version defined in the CRD.
// Only one version may be specified as the storage version.
func GetNewStorageVersion(crd runtime.Object) string {
switch crd := crd.(type) {
case *apiextensionsv1.CustomResourceDefinition:
for _, version := range crd.Spec.Versions {
if version.Storage {
return version.Name
}
}
case *apiextensionsv1beta1.CustomResourceDefinition:
for _, version := range crd.Spec.Versions {
if version.Storage {
return version.Name
}
}
}
return ""
}

// GetDeprecatedStorageVersion returns the storage version that is being deprecated
func GetDeprecatedStorageVersion(oldCRD runtime.Object, newCRD runtime.Object) string {
newStoredVersions, oldStoredVersions := getStoredVersions(oldCRD, newCRD)

for name := range oldStoredVersions {
if _, ok := newStoredVersions[name]; !ok {
// old storage version does not exist in new CRD - this is the deprecated version
return name
}
}

return ""
}

func RemoveStorageVersion(versions []string, deprecated string) []string {
for i, v := range versions {
if v == deprecated {
return append(versions[:i], versions[i+1:]...)
}
return versions
}
return nil
}