Skip to content

✨ Backport release-0.5: Support metadata-only watches #1256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -155,20 +156,42 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
return blder.ctrl, nil
}

func (blder *Builder) project(obj runtime.Object) (runtime.Object, error) {
switch o := obj.(type) {
case *onlyMetadataWrapper:
metaObj := &metav1.PartialObjectMetadata{}
gvk, err := getGvk(o.Object, blder.mgr.GetScheme())
if err != nil {
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
}
metaObj.SetGroupVersionKind(gvk)
return metaObj, nil
default:
return obj, nil
}
}

func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.apiType}
hdler := &handler.EnqueueRequestForObject{}
err := blder.ctrl.Watch(src, hdler, blder.predicates...)
apiType, err := blder.project(blder.apiType)
if err != nil {
return err
}
src := &source.Kind{Type: apiType}
hdler := &handler.EnqueueRequestForObject{}
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
return err
}

// Watches the managed types
for _, obj := range blder.managedObjects {
src := &source.Kind{Type: obj}
typeForSrc, err := blder.project(obj)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.apiType,
OwnerType: apiType,
IsController: true,
}
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
Expand All @@ -178,10 +201,17 @@ func (blder *Builder) doWatch() error {

// Do the watch requests
for _, w := range blder.watchRequest {
// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type)
if err != nil {
return err
}
srckind.Type = typeForSrc
}
if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
return err
}

}
return nil
}
Expand All @@ -196,7 +226,11 @@ func (blder *Builder) getControllerName() (string, error) {
if blder.name != "" {
return blder.name, nil
}
gvk, err := getGvk(blder.apiType, blder.mgr.GetScheme())
obj, err := blder.project(blder.apiType)
if err != nil {
return "", err
}
gvk, err := getGvk(obj, blder.mgr.GetScheme())
if err != nil {
return "", err
}
Expand Down
109 changes: 105 additions & 4 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -203,8 +205,107 @@ var _ = Describe("application", func() {
close(done)
}, 10)
})

Describe("watching with projections", func() {
var mgr manager.Manager
BeforeEach(func() {
// use a cache that intercepts requests for fully typed objects to
// ensure we use the projected versions
var err error
mgr, err = manager.New(cfg, manager.Options{NewCache: newNonTypedOnlyCache})
Expect(err).NotTo(HaveOccurred())
})

It("should support watching For, Owns, and Watch as metadata", func() {
statefulSetMaps := make(chan *metav1.PartialObjectMetadata)

bldr := ControllerManagedBy(mgr).
For(OnlyMetadata(&appsv1.Deployment{})).
Owns(OnlyMetadata(&appsv1.ReplicaSet{})).
Watches(&source.Kind{Type: OnlyMetadata(&appsv1.StatefulSet{})},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(func(o handler.MapObject) []reconcile.Request {
ometa := o.Object.(*metav1.PartialObjectMetadata)
statefulSetMaps <- ometa
return nil
}),
})

doReconcileTest("8", stop, bldr, mgr, true)

By("Creating a new stateful set")
set := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test1",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: appsv1.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
},
},
}
err := mgr.GetClient().Create(context.TODO(), set)
Expect(err).NotTo(HaveOccurred())

By("Checking that the mapping function has been called")
Eventually(func() bool {
metaSet := <-statefulSetMaps
Expect(metaSet.Name).To(Equal(set.Name))
Expect(metaSet.Namespace).To(Equal(set.Namespace))
Expect(metaSet.Labels).To(Equal(set.Labels))
return true
}).Should(BeTrue())
})
})
})

// newNonTypedOnlyCache returns a new cache that wraps the normal cache,
// returning an error if normal, typed objects have informers requested.
func newNonTypedOnlyCache(config *rest.Config, opts cache.Options) (cache.Cache, error) {
normalCache, err := cache.New(config, opts)
if err != nil {
return nil, err
}
return &nonTypedOnlyCache{
Cache: normalCache,
}, nil
}

// nonTypedOnlyCache is a cache.Cache that only provides metadata &
// unstructured informers.
type nonTypedOnlyCache struct {
cache.Cache
}

func (c *nonTypedOnlyCache) GetInformer(obj runtime.Object) (cache.Informer, error) {
switch obj.(type) {
case (*metav1.PartialObjectMetadata):
return c.Cache.GetInformer(obj)
default:
return nil, fmt.Errorf("did not want to provide an informer for normal type %T", obj)
}
}
func (c *nonTypedOnlyCache) GetInformerForKind(gvk schema.GroupVersionKind) (cache.Informer, error) {
return nil, fmt.Errorf("don't try to sidestep the restriction on informer types by calling GetInformerForKind")
}

// TODO(directxman12): this function has too many arguments, and the whole
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time
func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {
deployName := "deploy-name-" + nameSuffix
rsName := "rs-name-" + nameSuffix
Expand Down Expand Up @@ -267,8 +368,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
Expect(err).NotTo(HaveOccurred())

By("Waiting for the Deployment Reconcile")
Expect(<-ch).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
Eventually(ch).Should(Receive(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))

By("Creating a ReplicaSet")
// Expect a Reconcile when an Owned object is managedObjects.
Expand Down Expand Up @@ -297,8 +398,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
Expect(err).NotTo(HaveOccurred())

By("Waiting for the ReplicaSet Reconcile")
Expect(<-ch).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
Eventually(ch).Should(Receive(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))

}

Expand Down
55 changes: 55 additions & 0 deletions pkg/builder/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

appsv1 "k8s.io/api/apps/v1"
Expand All @@ -34,6 +35,60 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func ExampleBuilder_metadata_only() {
logf.SetLogger(zap.New())

var log = logf.Log.WithName("builder-examples")

mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}

cl := mgr.GetClient()
err = builder.
ControllerManagedBy(mgr). // Create the ControllerManagedBy
For(&appsv1.ReplicaSet{}). // ReplicaSet is the Application API
Owns(builder.OnlyMetadata(&corev1.Pod{})). // ReplicaSet owns Pods created by it, and caches them as metadata only
Complete(reconcile.Func(func(req reconcile.Request) (reconcile.Result, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Read the ReplicaSet
rs := &appsv1.ReplicaSet{}
err := cl.Get(ctx, req.NamespacedName, rs)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// List the Pods matching the PodTemplate Labels, but only their metadata
var podsMeta metav1.PartialObjectMetadataList
err = cl.List(ctx, &podsMeta, client.InNamespace(req.Namespace), client.MatchingLabels(rs.Spec.Template.Labels))
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// Update the ReplicaSet
rs.Labels["pod-count"] = fmt.Sprintf("%v", len(podsMeta.Items))
err = cl.Update(ctx, rs)
if err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}))
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}

if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}

// This example creates a simple application ControllerManagedBy that is configured for ReplicaSets and Pods.
//
// * Create a new application for ReplicaSets that manages Pods owned by the ReplicaSet and calls into
Expand Down
36 changes: 36 additions & 0 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package builder

import "k8s.io/apimachinery/pkg/runtime"

// OnlyMetadata tells the controller to *only* cache metadata, and to watch
// the the API server in metadata-only form. This is useful when watching
// lots of objects, really big objects, or objects for which you only know
// the the GVK, but not the structure. You'll need to pass
// metav1.PartialObjectMetadata to the client when fetching objects in your
// reconciler, otherwise you'll end up with a duplicate structured or
// unstructured cache.
func OnlyMetadata(obj runtime.Object) runtime.Object {
return &onlyMetadataWrapper{obj}
}

type onlyMetadataWrapper struct {
runtime.Object
}

// }}}
Loading