Skip to content

feat(operator): adopt referenced installplans #1661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 126 additions & 64 deletions pkg/controller/operators/adoption_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package operators

import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -54,6 +53,7 @@ func (r *AdoptionReconciler) SetupWithManager(mgr ctrl.Manager) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&operatorsv1alpha1.Subscription{}).
Watches(&source.Kind{Type: &operatorsv1alpha1.ClusterServiceVersion{}}, enqueueSub).
Watches(&source.Kind{Type: &operatorsv1alpha1.InstallPlan{}}, enqueueSub).
Complete(reconcile.Func(r.ReconcileSubscription))
if err != nil {
return err
Expand All @@ -77,7 +77,6 @@ func (r *AdoptionReconciler) SetupWithManager(mgr ctrl.Manager) error {
Watches(&source.Kind{Type: &apiextensionsv1.CustomResourceDefinition{}}, enqueueCSV).
Watches(&source.Kind{Type: &apiregistrationv1.APIService{}}, enqueueCSV).
Watches(&source.Kind{Type: &operatorsv1alpha1.Subscription{}}, enqueueCSV).
Watches(&source.Kind{Type: &operatorsv1alpha1.InstallPlan{}}, enqueueCSV).
Complete(reconcile.Func(r.ReconcileClusterServiceVersion))
if err != nil {
return err
Expand Down Expand Up @@ -107,8 +106,6 @@ func NewAdoptionReconciler(cli client.Client, log logr.Logger, scheme *runtime.S
}, nil
}

var fieldOwner = client.FieldOwner("olm")

// ReconcileSubscription labels the CSVs installed by a Subscription as components of an operator named after the subscribed package and install namespace.
func (r *AdoptionReconciler) ReconcileSubscription(req ctrl.Request) (reconcile.Result, error) {
// Set up a convenient log object so we don't have to type request over and over again
Expand Down Expand Up @@ -141,51 +138,30 @@ func (r *AdoptionReconciler) ReconcileSubscription(req ctrl.Request) (reconcile.
return reconcile.Result{}, nil
}

// Ensure the subscription is adopted
// Adopt the Subscription
var errs []error
out := in.DeepCopy()
adopted, err := operator.AdoptComponent(out)
if err != nil {
if err := r.adopt(ctx, operator, in); err != nil {
log.Error(err, "Error adopting Subscription")
errs = append(errs, err)
}
if adopted {
if err := r.Patch(ctx, out, client.MergeFrom(in)); err != nil {
log.Error(err, "Error adopting Subscription")
errs = append(errs, err)
}
}

// Find the Subscription's CSVs and apply the component label if necessary
adoptCSV := func(name string) error {
// Adopt the Subscription's installed CSV
if name := in.Status.InstalledCSV; name != "" {
csv := &operatorsv1alpha1.ClusterServiceVersion{}
if err := r.Get(ctx, types.NamespacedName{Namespace: in.GetNamespace(), Name: name}, csv); err != nil {
if apierrors.IsNotFound(err) {
err = nil
}

return err
}
log.Info("found CSV")

candidate := csv.DeepCopy()
adopted, err := operator.AdoptComponent(csv)
if err != nil {
return err
}

if adopted {
// Only update the CSV if freshly adopted
if err := r.Patch(ctx, csv, client.MergeFrom(candidate)); err != nil {
return err
}
csv.SetNamespace(in.GetNamespace())
csv.SetName(name)
if err := r.adopt(ctx, operator, csv); err != nil {
log.Error(err, "Error adopting installed CSV")
errs = append(errs, err)
}

return nil
}

if name := in.Status.InstalledCSV; name != "" {
if err := adoptCSV(name); err != nil {
log.Error(err, "Error adopting installed CSV")
// Adopt the Subscription's latest InstallPlan and Disown all others in the same namespace
if ref := in.Status.InstallPlanRef; ref != nil {
ip := &operatorsv1alpha1.InstallPlan{}
ip.SetNamespace(ref.Namespace)
ip.SetName(ref.Name)
if err := r.adoptInstallPlan(ctx, operator, ip); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -204,12 +180,12 @@ func (r *AdoptionReconciler) ReconcileClusterServiceVersion(req ctrl.Request) (r
in := &operatorsv1alpha1.ClusterServiceVersion{}
if err := r.Get(ctx, req.NamespacedName, in); err != nil {
if apierrors.IsNotFound(err) {
log.Info("Could not find ClusterServiceVersion")
err = nil
} else {
log.Error(err, "Error finding ClusterServiceVersion")
}

return reconcile.Result{}, nil
return reconcile.Result{}, err
}

// Adopt all resources owned by the CSV if necessary
Expand Down Expand Up @@ -239,38 +215,90 @@ func (r *AdoptionReconciler) adoptComponents(ctx context.Context, csv *operators
}

// Label (adopt) prospective components
var errs []error
// TODO(njhale): parallelize
var (
errs []error
mu sync.Mutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not an error channel instead of a mutex + err slice?

wg sync.WaitGroup
)
for _, operator := range operators {
components, err := r.adoptees(ctx, operator, csv)
if err != nil {
errs = append(errs, err)
func() {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}()
continue
}

for _, component := range components {
candidate := component.DeepCopyObject()
adopted, err := operator.AdoptComponent(component)
if err != nil {
errs = append(errs, err)
continue
}
var (
// Copy variables into iteration scope
operator = operator
component = component
)
wg.Add(1)

go func() {
defer wg.Done()
if err := r.adopt(ctx, &operator, component); err != nil {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
}()
}
}
wg.Wait()

if !adopted {
// Don't update since we didn't adopt
// This shouldn't occur since we already filtered candidates
r.log.Error(fmt.Errorf("failed to adopt component candidate"), "candidate not adopted", "candidate", component)
continue
}
return utilerrors.NewAggregate(errs)
}

// Patch the component to adopt
if err = r.Patch(ctx, component, client.MergeFrom(candidate)); err != nil {
errs = append(errs, err)
}
func (r *AdoptionReconciler) adopt(ctx context.Context, operator *decorators.Operator, component runtime.Object) error {
m, err := meta.Accessor(component)
if err != nil {
return nil
}

if err := r.Get(ctx, types.NamespacedName{Namespace: m.GetNamespace(), Name: m.GetName()}, component); err != nil {
if apierrors.IsNotFound(err) {
r.log.Error(err, "component not found")
err = nil
}

return err
}
candidate := component.DeepCopyObject()

return utilerrors.NewAggregate(errs)
adopted, err := operator.AdoptComponent(candidate)
if err != nil {
return err
}

if adopted {
// Only update if freshly adopted
r.log.Info("component adopted", "component", candidate)
return r.Patch(ctx, candidate, client.MergeFrom(component))
}

return nil
}

func (r *AdoptionReconciler) disown(ctx context.Context, operator *decorators.Operator, component runtime.Object) error {
candidate := component.DeepCopyObject()
disowned, err := operator.DisownComponent(candidate)
if err != nil {
return err
}

if !disowned {
// Wasn't a component
return nil
}

// Only update if freshly disowned
r.log.V(4).Info("component disowned", "component", candidate)
return r.Patch(ctx, candidate, client.MergeFrom(component))
}

func (r *AdoptionReconciler) adoptees(ctx context.Context, operator decorators.Operator, csv *operatorsv1alpha1.ClusterServiceVersion) ([]runtime.Object, error) {
Expand Down Expand Up @@ -350,6 +378,41 @@ func (r *AdoptionReconciler) adoptees(ctx context.Context, operator decorators.O
return components, nil
}

func (r *AdoptionReconciler) adoptInstallPlan(ctx context.Context, operator *decorators.Operator, latest *operatorsv1alpha1.InstallPlan) error {
// Adopt the latest InstallPlan
if err := r.adopt(ctx, operator, latest); err != nil {
return err
}

// Disown older InstallPlans
selector, err := operator.ComponentSelector()
if err != nil {
return err
}

var (
ips = &operatorsv1alpha1.InstallPlanList{}
opt = client.MatchingLabelsSelector{Selector: selector}
)
if err := r.List(ctx, ips, opt, client.InNamespace(latest.GetNamespace())); err != nil {
return err
}

var errs []error
for _, ip := range ips.Items {
if ip.GetName() == latest.GetName() {
// Don't disown latest
continue
}

if err := r.disown(ctx, operator, &ip); err != nil {
errs = append(errs, err)
}
}

return utilerrors.NewAggregate(errs)
}

func (r *AdoptionReconciler) mapToSubscriptions(obj handler.MapObject) (requests []reconcile.Request) {
if obj.Meta == nil {
return
Expand All @@ -367,7 +430,6 @@ func (r *AdoptionReconciler) mapToSubscriptions(obj handler.MapObject) (requests
nsn := types.NamespacedName{Namespace: sub.GetNamespace(), Name: sub.GetName()}
requests = append(requests, reconcile.Request{NamespacedName: nsn})
}
r.log.Info("requeueing subscriptions", "requests", requests)

return
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/controller/operators/adoption_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/reference"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"

operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
Expand Down Expand Up @@ -106,6 +107,81 @@ var _ = Describe("Adoption Controller", func() {
componentLabelKey = fmt.Sprintf("%s%s.%s", decorators.ComponentLabelKeyPrefix, sub.Spec.Package, sub.GetNamespace())
})

Context("that references an existing installplan", func() {
var (
ip *operatorsv1alpha1.InstallPlan
)

BeforeEach(func() {
ip = fixtures.Fill(&operatorsv1alpha1.InstallPlan{}).(*operatorsv1alpha1.InstallPlan)
ip.SetNamespace(sub.GetNamespace())
ip.SetName(genName("poultry-"))

Eventually(func() error {
owned := testobj.WithOwner(sub, ip)
return k8sClient.Create(ctx, owned)
}).Should(Succeed())
created = append(created, ip)

ref, err := reference.GetReference(scheme, ip)
Expect(err).ToNot(HaveOccurred())

// Set the Subscription's status separately
status := sub.DeepCopy().Status
status.InstallPlanRef = ref
Eventually(func() error {
if err := k8sClient.Get(ctx, testobj.NamespacedName(sub), sub); err != nil {
return err
}
sub.Status = status

return k8sClient.Status().Update(ctx, sub)
}).Should(Succeed())
})

Context("and has other, non-latest, adopted installplans", func() {
var (
ips []*operatorsv1alpha1.InstallPlan
)

BeforeEach(func() {
for i := 0; i < 4; i++ {
ip := fixtures.Fill(&operatorsv1alpha1.InstallPlan{}).(*operatorsv1alpha1.InstallPlan)
ip.SetNamespace(sub.GetNamespace())
ip.SetName(genName(""))
ip.SetLabels(map[string]string{
componentLabelKey: "",
})

Eventually(func() error {
return k8sClient.Create(ctx, ip)
}).Should(Succeed())

created = append(created, ip)
ips = append(ips, ip)
}

})

Specify("correct component labels", func() {
installPlan := ip.DeepCopy()
Eventually(func() (map[string]string, error) {
err := k8sClient.Get(ctx, testobj.NamespacedName(ip), installPlan)
return installPlan.GetLabels(), err
}).Should(HaveKey(componentLabelKey))

for _, ip := range ips {
Eventually(func() (map[string]string, error) {
err := k8sClient.Get(ctx, testobj.NamespacedName(ip), ip)
return ip.GetLabels(), err
}, timeout, interval).ShouldNot(HaveKey(componentLabelKey))
}

})

})
})

Context("that has an existing installed csv", func() {

BeforeEach(func() {
Expand Down
Loading