Skip to content

Commit 5e5b11f

Browse files
Merge pull request #1661 from njhale/ext-adopt
feat(operator): adopt referenced installplans
2 parents 83e1f76 + d68c913 commit 5e5b11f

File tree

7 files changed

+263
-65
lines changed

7 files changed

+263
-65
lines changed

pkg/controller/operators/adoption_controller.go

Lines changed: 126 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package operators
22

33
import (
44
"context"
5-
"fmt"
65
"sync"
76

87
"github.com/go-logr/logr"
@@ -54,6 +53,7 @@ func (r *AdoptionReconciler) SetupWithManager(mgr ctrl.Manager) error {
5453
err := ctrl.NewControllerManagedBy(mgr).
5554
For(&operatorsv1alpha1.Subscription{}).
5655
Watches(&source.Kind{Type: &operatorsv1alpha1.ClusterServiceVersion{}}, enqueueSub).
56+
Watches(&source.Kind{Type: &operatorsv1alpha1.InstallPlan{}}, enqueueSub).
5757
Complete(reconcile.Func(r.ReconcileSubscription))
5858
if err != nil {
5959
return err
@@ -77,7 +77,6 @@ func (r *AdoptionReconciler) SetupWithManager(mgr ctrl.Manager) error {
7777
Watches(&source.Kind{Type: &apiextensionsv1.CustomResourceDefinition{}}, enqueueCSV).
7878
Watches(&source.Kind{Type: &apiregistrationv1.APIService{}}, enqueueCSV).
7979
Watches(&source.Kind{Type: &operatorsv1alpha1.Subscription{}}, enqueueCSV).
80-
Watches(&source.Kind{Type: &operatorsv1alpha1.InstallPlan{}}, enqueueCSV).
8180
Complete(reconcile.Func(r.ReconcileClusterServiceVersion))
8281
if err != nil {
8382
return err
@@ -107,8 +106,6 @@ func NewAdoptionReconciler(cli client.Client, log logr.Logger, scheme *runtime.S
107106
}, nil
108107
}
109108

110-
var fieldOwner = client.FieldOwner("olm")
111-
112109
// ReconcileSubscription labels the CSVs installed by a Subscription as components of an operator named after the subscribed package and install namespace.
113110
func (r *AdoptionReconciler) ReconcileSubscription(req ctrl.Request) (reconcile.Result, error) {
114111
// Set up a convenient log object so we don't have to type request over and over again
@@ -141,51 +138,30 @@ func (r *AdoptionReconciler) ReconcileSubscription(req ctrl.Request) (reconcile.
141138
return reconcile.Result{}, nil
142139
}
143140

144-
// Ensure the subscription is adopted
141+
// Adopt the Subscription
145142
var errs []error
146-
out := in.DeepCopy()
147-
adopted, err := operator.AdoptComponent(out)
148-
if err != nil {
143+
if err := r.adopt(ctx, operator, in); err != nil {
144+
log.Error(err, "Error adopting Subscription")
149145
errs = append(errs, err)
150146
}
151-
if adopted {
152-
if err := r.Patch(ctx, out, client.MergeFrom(in)); err != nil {
153-
log.Error(err, "Error adopting Subscription")
154-
errs = append(errs, err)
155-
}
156-
}
157147

158-
// Find the Subscription's CSVs and apply the component label if necessary
159-
adoptCSV := func(name string) error {
148+
// Adopt the Subscription's installed CSV
149+
if name := in.Status.InstalledCSV; name != "" {
160150
csv := &operatorsv1alpha1.ClusterServiceVersion{}
161-
if err := r.Get(ctx, types.NamespacedName{Namespace: in.GetNamespace(), Name: name}, csv); err != nil {
162-
if apierrors.IsNotFound(err) {
163-
err = nil
164-
}
165-
166-
return err
167-
}
168-
log.Info("found CSV")
169-
170-
candidate := csv.DeepCopy()
171-
adopted, err := operator.AdoptComponent(csv)
172-
if err != nil {
173-
return err
174-
}
175-
176-
if adopted {
177-
// Only update the CSV if freshly adopted
178-
if err := r.Patch(ctx, csv, client.MergeFrom(candidate)); err != nil {
179-
return err
180-
}
151+
csv.SetNamespace(in.GetNamespace())
152+
csv.SetName(name)
153+
if err := r.adopt(ctx, operator, csv); err != nil {
154+
log.Error(err, "Error adopting installed CSV")
155+
errs = append(errs, err)
181156
}
182-
183-
return nil
184157
}
185158

186-
if name := in.Status.InstalledCSV; name != "" {
187-
if err := adoptCSV(name); err != nil {
188-
log.Error(err, "Error adopting installed CSV")
159+
// Adopt the Subscription's latest InstallPlan and Disown all others in the same namespace
160+
if ref := in.Status.InstallPlanRef; ref != nil {
161+
ip := &operatorsv1alpha1.InstallPlan{}
162+
ip.SetNamespace(ref.Namespace)
163+
ip.SetName(ref.Name)
164+
if err := r.adoptInstallPlan(ctx, operator, ip); err != nil {
189165
errs = append(errs, err)
190166
}
191167
}
@@ -204,12 +180,12 @@ func (r *AdoptionReconciler) ReconcileClusterServiceVersion(req ctrl.Request) (r
204180
in := &operatorsv1alpha1.ClusterServiceVersion{}
205181
if err := r.Get(ctx, req.NamespacedName, in); err != nil {
206182
if apierrors.IsNotFound(err) {
207-
log.Info("Could not find ClusterServiceVersion")
183+
err = nil
208184
} else {
209185
log.Error(err, "Error finding ClusterServiceVersion")
210186
}
211187

212-
return reconcile.Result{}, nil
188+
return reconcile.Result{}, err
213189
}
214190

215191
// Adopt all resources owned by the CSV if necessary
@@ -239,38 +215,90 @@ func (r *AdoptionReconciler) adoptComponents(ctx context.Context, csv *operators
239215
}
240216

241217
// Label (adopt) prospective components
242-
var errs []error
243-
// TODO(njhale): parallelize
218+
var (
219+
errs []error
220+
mu sync.Mutex
221+
wg sync.WaitGroup
222+
)
244223
for _, operator := range operators {
245224
components, err := r.adoptees(ctx, operator, csv)
246225
if err != nil {
247-
errs = append(errs, err)
226+
func() {
227+
mu.Lock()
228+
defer mu.Unlock()
229+
errs = append(errs, err)
230+
}()
248231
continue
249232
}
250233

251234
for _, component := range components {
252-
candidate := component.DeepCopyObject()
253-
adopted, err := operator.AdoptComponent(component)
254-
if err != nil {
255-
errs = append(errs, err)
256-
continue
257-
}
235+
var (
236+
// Copy variables into iteration scope
237+
operator = operator
238+
component = component
239+
)
240+
wg.Add(1)
241+
242+
go func() {
243+
defer wg.Done()
244+
if err := r.adopt(ctx, &operator, component); err != nil {
245+
mu.Lock()
246+
defer mu.Unlock()
247+
errs = append(errs, err)
248+
}
249+
}()
250+
}
251+
}
252+
wg.Wait()
258253

259-
if !adopted {
260-
// Don't update since we didn't adopt
261-
// This shouldn't occur since we already filtered candidates
262-
r.log.Error(fmt.Errorf("failed to adopt component candidate"), "candidate not adopted", "candidate", component)
263-
continue
264-
}
254+
return utilerrors.NewAggregate(errs)
255+
}
265256

266-
// Patch the component to adopt
267-
if err = r.Patch(ctx, component, client.MergeFrom(candidate)); err != nil {
268-
errs = append(errs, err)
269-
}
257+
func (r *AdoptionReconciler) adopt(ctx context.Context, operator *decorators.Operator, component runtime.Object) error {
258+
m, err := meta.Accessor(component)
259+
if err != nil {
260+
return nil
261+
}
262+
263+
if err := r.Get(ctx, types.NamespacedName{Namespace: m.GetNamespace(), Name: m.GetName()}, component); err != nil {
264+
if apierrors.IsNotFound(err) {
265+
r.log.Error(err, "component not found")
266+
err = nil
270267
}
268+
269+
return err
271270
}
271+
candidate := component.DeepCopyObject()
272272

273-
return utilerrors.NewAggregate(errs)
273+
adopted, err := operator.AdoptComponent(candidate)
274+
if err != nil {
275+
return err
276+
}
277+
278+
if adopted {
279+
// Only update if freshly adopted
280+
r.log.Info("component adopted", "component", candidate)
281+
return r.Patch(ctx, candidate, client.MergeFrom(component))
282+
}
283+
284+
return nil
285+
}
286+
287+
func (r *AdoptionReconciler) disown(ctx context.Context, operator *decorators.Operator, component runtime.Object) error {
288+
candidate := component.DeepCopyObject()
289+
disowned, err := operator.DisownComponent(candidate)
290+
if err != nil {
291+
return err
292+
}
293+
294+
if !disowned {
295+
// Wasn't a component
296+
return nil
297+
}
298+
299+
// Only update if freshly disowned
300+
r.log.V(4).Info("component disowned", "component", candidate)
301+
return r.Patch(ctx, candidate, client.MergeFrom(component))
274302
}
275303

276304
func (r *AdoptionReconciler) adoptees(ctx context.Context, operator decorators.Operator, csv *operatorsv1alpha1.ClusterServiceVersion) ([]runtime.Object, error) {
@@ -350,6 +378,41 @@ func (r *AdoptionReconciler) adoptees(ctx context.Context, operator decorators.O
350378
return components, nil
351379
}
352380

381+
func (r *AdoptionReconciler) adoptInstallPlan(ctx context.Context, operator *decorators.Operator, latest *operatorsv1alpha1.InstallPlan) error {
382+
// Adopt the latest InstallPlan
383+
if err := r.adopt(ctx, operator, latest); err != nil {
384+
return err
385+
}
386+
387+
// Disown older InstallPlans
388+
selector, err := operator.ComponentSelector()
389+
if err != nil {
390+
return err
391+
}
392+
393+
var (
394+
ips = &operatorsv1alpha1.InstallPlanList{}
395+
opt = client.MatchingLabelsSelector{Selector: selector}
396+
)
397+
if err := r.List(ctx, ips, opt, client.InNamespace(latest.GetNamespace())); err != nil {
398+
return err
399+
}
400+
401+
var errs []error
402+
for _, ip := range ips.Items {
403+
if ip.GetName() == latest.GetName() {
404+
// Don't disown latest
405+
continue
406+
}
407+
408+
if err := r.disown(ctx, operator, &ip); err != nil {
409+
errs = append(errs, err)
410+
}
411+
}
412+
413+
return utilerrors.NewAggregate(errs)
414+
}
415+
353416
func (r *AdoptionReconciler) mapToSubscriptions(obj handler.MapObject) (requests []reconcile.Request) {
354417
if obj.Meta == nil {
355418
return
@@ -367,7 +430,6 @@ func (r *AdoptionReconciler) mapToSubscriptions(obj handler.MapObject) (requests
367430
nsn := types.NamespacedName{Namespace: sub.GetNamespace(), Name: sub.GetName()}
368431
requests = append(requests, reconcile.Request{NamespacedName: nsn})
369432
}
370-
r.log.Info("requeueing subscriptions", "requests", requests)
371433

372434
return
373435
}

pkg/controller/operators/adoption_controller_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
apierrors "k8s.io/apimachinery/pkg/api/errors"
1414
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1515
"k8s.io/apimachinery/pkg/runtime"
16+
"k8s.io/client-go/tools/reference"
1617
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
1718

1819
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
@@ -106,6 +107,81 @@ var _ = Describe("Adoption Controller", func() {
106107
componentLabelKey = fmt.Sprintf("%s%s.%s", decorators.ComponentLabelKeyPrefix, sub.Spec.Package, sub.GetNamespace())
107108
})
108109

110+
Context("that references an existing installplan", func() {
111+
var (
112+
ip *operatorsv1alpha1.InstallPlan
113+
)
114+
115+
BeforeEach(func() {
116+
ip = fixtures.Fill(&operatorsv1alpha1.InstallPlan{}).(*operatorsv1alpha1.InstallPlan)
117+
ip.SetNamespace(sub.GetNamespace())
118+
ip.SetName(genName("poultry-"))
119+
120+
Eventually(func() error {
121+
owned := testobj.WithOwner(sub, ip)
122+
return k8sClient.Create(ctx, owned)
123+
}).Should(Succeed())
124+
created = append(created, ip)
125+
126+
ref, err := reference.GetReference(scheme, ip)
127+
Expect(err).ToNot(HaveOccurred())
128+
129+
// Set the Subscription's status separately
130+
status := sub.DeepCopy().Status
131+
status.InstallPlanRef = ref
132+
Eventually(func() error {
133+
if err := k8sClient.Get(ctx, testobj.NamespacedName(sub), sub); err != nil {
134+
return err
135+
}
136+
sub.Status = status
137+
138+
return k8sClient.Status().Update(ctx, sub)
139+
}).Should(Succeed())
140+
})
141+
142+
Context("and has other, non-latest, adopted installplans", func() {
143+
var (
144+
ips []*operatorsv1alpha1.InstallPlan
145+
)
146+
147+
BeforeEach(func() {
148+
for i := 0; i < 4; i++ {
149+
ip := fixtures.Fill(&operatorsv1alpha1.InstallPlan{}).(*operatorsv1alpha1.InstallPlan)
150+
ip.SetNamespace(sub.GetNamespace())
151+
ip.SetName(genName(""))
152+
ip.SetLabels(map[string]string{
153+
componentLabelKey: "",
154+
})
155+
156+
Eventually(func() error {
157+
return k8sClient.Create(ctx, ip)
158+
}).Should(Succeed())
159+
160+
created = append(created, ip)
161+
ips = append(ips, ip)
162+
}
163+
164+
})
165+
166+
Specify("correct component labels", func() {
167+
installPlan := ip.DeepCopy()
168+
Eventually(func() (map[string]string, error) {
169+
err := k8sClient.Get(ctx, testobj.NamespacedName(ip), installPlan)
170+
return installPlan.GetLabels(), err
171+
}).Should(HaveKey(componentLabelKey))
172+
173+
for _, ip := range ips {
174+
Eventually(func() (map[string]string, error) {
175+
err := k8sClient.Get(ctx, testobj.NamespacedName(ip), ip)
176+
return ip.GetLabels(), err
177+
}, timeout, interval).ShouldNot(HaveKey(componentLabelKey))
178+
}
179+
180+
})
181+
182+
})
183+
})
184+
109185
Context("that has an existing installed csv", func() {
110186

111187
BeforeEach(func() {

0 commit comments

Comments
 (0)