Skip to content

Commit 0636faf

Browse files
author
Ankita Thomas
committed
Move server side apply patch code to library, use ssa to update catalog source pods
1 parent 63a809b commit 0636faf

File tree

14 files changed

+955
-169
lines changed

14 files changed

+955
-169
lines changed

cmd/catalog/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prometheus/client_golang/prometheus/promhttp"
1414
log "github.com/sirupsen/logrus"
1515
utilclock "k8s.io/apimachinery/pkg/util/clock"
16+
k8sscheme "k8s.io/client-go/kubernetes/scheme"
1617
"k8s.io/client-go/tools/clientcmd"
1718

1819
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client"
@@ -172,7 +173,7 @@ func main() {
172173
}
173174

174175
// Create a new instance of the operator.
175-
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace)
176+
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme)
176177
if err != nil {
177178
log.Panicf("error configuring operator: %s", err.Error())
178179
}

pkg/controller/operators/catalog/operator.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"k8s.io/apimachinery/pkg/runtime"
89
"reflect"
910
"sort"
1011
"strings"
@@ -49,6 +50,7 @@ import (
4950
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/grpc"
5051
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
5152
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
53+
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
5254
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/event"
5355
index "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/index"
5456
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
@@ -74,6 +76,7 @@ const (
7476
generatedByKey = "olm.generated-by"
7577
maxInstallPlanCount = 5
7678
maxDeletesPerSweep = 5
79+
RegistryFieldManager = "olm.registry"
7780
)
7881

7982
// Operator represents a Kubernetes operator that executes InstallPlans by
@@ -107,7 +110,7 @@ type Operator struct {
107110
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
108111

109112
// NewOperator creates a new Catalog Operator.
110-
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string) (*Operator, error) {
113+
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string, scheme *runtime.Scheme) (*Operator, error) {
111114
resyncPeriod := queueinformer.ResyncWithJitter(resync, 0.2)
112115
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
113116
if err != nil {
@@ -142,6 +145,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
142145
return nil, err
143146
}
144147

148+
ssaClient, err := controllerclient.NewForConfig(config, scheme, RegistryFieldManager)
149+
if err != nil {
150+
return nil, err
151+
}
152+
145153
// Allocate the new instance of an Operator.
146154
op := &Operator{
147155
Operator: queueOperator,
@@ -162,9 +170,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
162170
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient, dynamicClient),
163171
}
164172
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
165-
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
173+
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)
166174
res := resolver.NewOperatorStepResolver(lister, crClient, opClient.KubernetesInterface(), operatorNamespace, op.sources, logger)
167-
op.resolver = resolver.NewOperatorStepResolver(lister, crClient, opClient.KubernetesInterface(), operatorNamespace, op.sources, logger)
168175
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)
169176

170177
// Wire OLM CR sharedIndexInformers

pkg/controller/operators/catalog/operator_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
5151
"github.com/operator-framework/operator-lifecycle-manager/pkg/fakes"
5252
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/clientfake"
53+
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
5354
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
5455
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
5556
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
@@ -1247,7 +1248,14 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
12471248
}
12481249
op.sources = grpc.NewSourceStore(config.logger, 1*time.Second, 5*time.Second, op.syncSourceState)
12491250
if op.reconciler == nil {
1250-
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now)
1251+
s := runtime.NewScheme()
1252+
err := k8sfake.AddToScheme(s)
1253+
if err != nil {
1254+
return nil, err
1255+
}
1256+
applier := controllerclient.NewFakeApplier(s, "testowner")
1257+
1258+
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now, applier)
12511259
}
12521260

12531261
op.RunInformers(ctx)

pkg/controller/registry/reconciler/grpc.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package reconciler
33
import (
44
"context"
55
"fmt"
6-
76
"github.com/operator-framework/api/pkg/operators/v1alpha1"
7+
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
88
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
99
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
1010
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
@@ -70,9 +70,10 @@ func (s *grpcCatalogSourceDecorator) Pod() *v1.Pod {
7070
}
7171

7272
type GrpcRegistryReconciler struct {
73-
now nowFunc
74-
Lister operatorlister.OperatorLister
75-
OpClient operatorclient.ClientInterface
73+
now nowFunc
74+
Lister operatorlister.OperatorLister
75+
OpClient operatorclient.ClientInterface
76+
SSAClient *controllerclient.ServerSideApplier
7677
}
7778

7879
var _ RegistryReconciler = &GrpcRegistryReconciler{}
@@ -194,16 +195,17 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(source grpcCatalogSourceDecorat
194195
logrus.WithField("CatalogSource", source.GetName()).Info("detect image update for catalogsource pod")
195196

196197
updateFlag = true
197-
updatePod.Labels[CatalogSourceLabelKey] = source.GetName()
198-
updatePod.Labels[CatalogSourceUpdateKey] = ""
199198

200199
// Update the update pod to promote it to serving pod
201-
_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Update(context.TODO(), updatePod, metav1.UpdateOptions{})
200+
err := c.SSAClient.Apply(context.TODO(), updatePod, func(p *v1.Pod) error {
201+
p.Labels[CatalogSourceLabelKey] = source.GetName()
202+
p.Labels[CatalogSourceUpdateKey] = ""
203+
return nil
204+
})()
205+
202206
if err != nil {
203-
return errors.Wrapf(err, "error creating new pod: %s", source.Pod().GetName())
207+
return errors.Wrapf(err, "error updating catalog source pod: %s", source.Pod().GetName())
204208
}
205-
206-
break
207209
}
208210
}
209211

pkg/controller/registry/reconciler/reconciler.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package reconciler
33

44
import (
55
"github.com/operator-framework/api/pkg/operators/v1alpha1"
6+
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
67
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
78
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
89
v1 "k8s.io/api/core/v1"
@@ -46,6 +47,7 @@ type registryReconcilerFactory struct {
4647
Lister operatorlister.OperatorLister
4748
OpClient operatorclient.ClientInterface
4849
ConfigMapServerImage string
50+
SSAClient *controllerclient.ServerSideApplier
4951
}
5052

5153
// ReconcilerForSource returns a RegistryReconciler based on the configuration of the given CatalogSource.
@@ -62,9 +64,10 @@ func (r *registryReconcilerFactory) ReconcilerForSource(source *v1alpha1.Catalog
6264
case v1alpha1.SourceTypeGrpc:
6365
if source.Spec.Image != "" {
6466
return &GrpcRegistryReconciler{
65-
now: r.now,
66-
Lister: r.Lister,
67-
OpClient: r.OpClient,
67+
now: r.now,
68+
Lister: r.Lister,
69+
OpClient: r.OpClient,
70+
SSAClient: r.SSAClient,
6871
}
6972
} else if source.Spec.Address != "" {
7073
return &GrpcAddressRegistryReconciler{
@@ -76,12 +79,13 @@ func (r *registryReconcilerFactory) ReconcilerForSource(source *v1alpha1.Catalog
7679
}
7780

7881
// NewRegistryReconcilerFactory returns an initialized RegistryReconcilerFactory.
79-
func NewRegistryReconcilerFactory(lister operatorlister.OperatorLister, opClient operatorclient.ClientInterface, configMapServerImage string, now nowFunc) RegistryReconcilerFactory {
82+
func NewRegistryReconcilerFactory(lister operatorlister.OperatorLister, opClient operatorclient.ClientInterface, configMapServerImage string, now nowFunc, ssaClient *controllerclient.ServerSideApplier) RegistryReconcilerFactory {
8083
return &registryReconcilerFactory{
8184
now: now,
8285
Lister: lister,
8386
OpClient: opClient,
8487
ConfigMapServerImage: configMapServerImage,
88+
SSAClient: ssaClient,
8589
}
8690
}
8791

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"k8s.io/apimachinery/pkg/runtime"
6+
"k8s.io/apimachinery/pkg/types"
7+
k8scontrollerclient "sigs.k8s.io/controller-runtime/pkg/client"
8+
fakecontrollerclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
9+
)
10+
11+
// FakeApplier provides a wrapper around the fake k8s controller client to convert the unsupported apply-type patches to merge patches.
12+
func NewFakeApplier(scheme *runtime.Scheme, owner string, objs ...runtime.Object) *ServerSideApplier {
13+
return &ServerSideApplier{
14+
client: &fakeApplier{fakecontrollerclient.NewFakeClientWithScheme(scheme, objs...)},
15+
Scheme: scheme,
16+
Owner: k8scontrollerclient.FieldOwner(owner),
17+
}
18+
}
19+
20+
type fakeApplier struct {
21+
k8scontrollerclient.Client
22+
}
23+
24+
func (c *fakeApplier) Patch(ctx context.Context, obj runtime.Object, patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) error {
25+
patch, opts = convertApplyToMergePatch(patch, opts...)
26+
return c.Client.Patch(ctx, obj, patch, opts...)
27+
}
28+
29+
func (c *fakeApplier) Status() k8scontrollerclient.StatusWriter {
30+
return fakeStatusWriter{c.Client.Status()}
31+
}
32+
33+
type fakeStatusWriter struct {
34+
k8scontrollerclient.StatusWriter
35+
}
36+
37+
func (c fakeStatusWriter) Patch(ctx context.Context, obj runtime.Object, patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) error {
38+
patch, opts = convertApplyToMergePatch(patch, opts...)
39+
return c.StatusWriter.Patch(ctx, obj, patch, opts...)
40+
}
41+
42+
func convertApplyToMergePatch(patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) (k8scontrollerclient.Patch, []k8scontrollerclient.PatchOption) {
43+
// Apply patch type is not supported on the fake controller
44+
if patch.Type() == types.ApplyPatchType {
45+
patch = k8scontrollerclient.Merge
46+
patchOptions := make([]k8scontrollerclient.PatchOption, 0, len(opts))
47+
for _, opt := range opts {
48+
if opt == k8scontrollerclient.ForceOwnership {
49+
continue
50+
}
51+
patchOptions = append(patchOptions, opt)
52+
}
53+
opts = patchOptions
54+
}
55+
return patch, opts
56+
}

0 commit comments

Comments
 (0)