Skip to content

Bug 1853601: use server-side-apply for catalog source pod update #1624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/catalog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
utilclock "k8s.io/apimachinery/pkg/util/clock"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client"
Expand Down Expand Up @@ -172,7 +173,7 @@ func main() {
}

// Create a new instance of the operator.
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace)
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme)
if err != nil {
log.Panicf("error configuring operator: %s", err.Error())
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/grpc"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/event"
index "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/index"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
Expand All @@ -74,6 +76,7 @@ const (
generatedByKey = "olm.generated-by"
maxInstallPlanCount = 5
maxDeletesPerSweep = 5
RegistryFieldManager = "olm.registry"
)

// Operator represents a Kubernetes operator that executes InstallPlans by
Expand Down Expand Up @@ -107,7 +110,7 @@ type Operator struct {
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)

// NewOperator creates a new Catalog Operator.
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string) (*Operator, error) {
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string, scheme *runtime.Scheme) (*Operator, error) {
resyncPeriod := queueinformer.ResyncWithJitter(resync, 0.2)
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
Expand Down Expand Up @@ -142,6 +145,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

ssaClient, err := controllerclient.NewForConfig(config, scheme, RegistryFieldManager)
if err != nil {
return nil, err
}

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
Expand All @@ -162,9 +170,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient, dynamicClient),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)
res := resolver.NewOperatorStepResolver(lister, crClient, opClient.KubernetesInterface(), operatorNamespace, op.sources, logger)
op.resolver = resolver.NewOperatorStepResolver(lister, crClient, opClient.KubernetesInterface(), operatorNamespace, op.sources, logger)
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)

// Wire OLM CR sharedIndexInformers
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
"github.com/operator-framework/operator-lifecycle-manager/pkg/fakes"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/clientfake"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
Expand Down Expand Up @@ -1247,7 +1248,14 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
}
op.sources = grpc.NewSourceStore(config.logger, 1*time.Second, 5*time.Second, op.syncSourceState)
if op.reconciler == nil {
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now)
s := runtime.NewScheme()
err := k8sfake.AddToScheme(s)
if err != nil {
return nil, err
}
applier := controllerclient.NewFakeApplier(s, "testowner")

op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now, applier)
}

op.RunInformers(ctx)
Expand Down
22 changes: 12 additions & 10 deletions pkg/controller/registry/reconciler/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package reconciler
import (
"context"
"fmt"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
Expand Down Expand Up @@ -70,9 +70,10 @@ func (s *grpcCatalogSourceDecorator) Pod() *v1.Pod {
}

type GrpcRegistryReconciler struct {
now nowFunc
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
now nowFunc
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
SSAClient *controllerclient.ServerSideApplier
}

var _ RegistryReconciler = &GrpcRegistryReconciler{}
Expand Down Expand Up @@ -194,16 +195,17 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(source grpcCatalogSourceDecorat
logrus.WithField("CatalogSource", source.GetName()).Info("detect image update for catalogsource pod")

updateFlag = true
updatePod.Labels[CatalogSourceLabelKey] = source.GetName()
updatePod.Labels[CatalogSourceUpdateKey] = ""

// Update the update pod to promote it to serving pod
_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Update(context.TODO(), updatePod, metav1.UpdateOptions{})
err := c.SSAClient.Apply(context.TODO(), updatePod, func(p *v1.Pod) error {
p.Labels[CatalogSourceLabelKey] = source.GetName()
p.Labels[CatalogSourceUpdateKey] = ""
return nil
})()

if err != nil {
return errors.Wrapf(err, "error creating new pod: %s", source.Pod().GetName())
return errors.Wrapf(err, "error updating catalog source pod: %s", source.Pod().GetName())
}

break
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/registry/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reconciler

import (
"github.com/operator-framework/api/pkg/operators/v1alpha1"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -46,6 +47,7 @@ type registryReconcilerFactory struct {
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
ConfigMapServerImage string
SSAClient *controllerclient.ServerSideApplier
}

// ReconcilerForSource returns a RegistryReconciler based on the configuration of the given CatalogSource.
Expand All @@ -62,9 +64,10 @@ func (r *registryReconcilerFactory) ReconcilerForSource(source *v1alpha1.Catalog
case v1alpha1.SourceTypeGrpc:
if source.Spec.Image != "" {
return &GrpcRegistryReconciler{
now: r.now,
Lister: r.Lister,
OpClient: r.OpClient,
now: r.now,
Lister: r.Lister,
OpClient: r.OpClient,
SSAClient: r.SSAClient,
}
} else if source.Spec.Address != "" {
return &GrpcAddressRegistryReconciler{
Expand All @@ -76,12 +79,13 @@ func (r *registryReconcilerFactory) ReconcilerForSource(source *v1alpha1.Catalog
}

// NewRegistryReconcilerFactory returns an initialized RegistryReconcilerFactory.
func NewRegistryReconcilerFactory(lister operatorlister.OperatorLister, opClient operatorclient.ClientInterface, configMapServerImage string, now nowFunc) RegistryReconcilerFactory {
func NewRegistryReconcilerFactory(lister operatorlister.OperatorLister, opClient operatorclient.ClientInterface, configMapServerImage string, now nowFunc, ssaClient *controllerclient.ServerSideApplier) RegistryReconcilerFactory {
return &registryReconcilerFactory{
now: now,
Lister: lister,
OpClient: opClient,
ConfigMapServerImage: configMapServerImage,
SSAClient: ssaClient,
}
}

Expand Down
56 changes: 56 additions & 0 deletions pkg/lib/controller-runtime/client/fake_ssa.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package client

import (
"context"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
k8scontrollerclient "sigs.k8s.io/controller-runtime/pkg/client"
fakecontrollerclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
)

// FakeApplier provides a wrapper around the fake k8s controller client to convert the unsupported apply-type patches to merge patches.
func NewFakeApplier(scheme *runtime.Scheme, owner string, objs ...runtime.Object) *ServerSideApplier {
return &ServerSideApplier{
client: &fakeApplier{fakecontrollerclient.NewFakeClientWithScheme(scheme, objs...)},
Scheme: scheme,
Owner: k8scontrollerclient.FieldOwner(owner),
}
}

type fakeApplier struct {
k8scontrollerclient.Client
}

func (c *fakeApplier) Patch(ctx context.Context, obj runtime.Object, patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) error {
patch, opts = convertApplyToMergePatch(patch, opts...)
return c.Client.Patch(ctx, obj, patch, opts...)
}

func (c *fakeApplier) Status() k8scontrollerclient.StatusWriter {
return fakeStatusWriter{c.Client.Status()}
}

type fakeStatusWriter struct {
k8scontrollerclient.StatusWriter
}

func (c fakeStatusWriter) Patch(ctx context.Context, obj runtime.Object, patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) error {
patch, opts = convertApplyToMergePatch(patch, opts...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure an apply patch can be directly converted to a merge patch (since I think you need both the old and new resource to generate the delta for a merge patch), but if this is working, I think it might be okay.

I'm a little concerned that we're swapping SSA out for another patch type in our unit tests, but at least it lets us test the rest of the logic in our unit tests.

return c.StatusWriter.Patch(ctx, obj, patch, opts...)
}

func convertApplyToMergePatch(patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) (k8scontrollerclient.Patch, []k8scontrollerclient.PatchOption) {
// Apply patch type is not supported on the fake controller
if patch.Type() == types.ApplyPatchType {
patch = k8scontrollerclient.Merge
patchOptions := make([]k8scontrollerclient.PatchOption, 0, len(opts))
for _, opt := range opts {
if opt == k8scontrollerclient.ForceOwnership {
continue
}
patchOptions = append(patchOptions, opt)
}
opts = patchOptions
}
return patch, opts
}
Loading