Skip to content

Commit 7fc9110

Browse files
authored
Merge pull request #1248 from ncdc/0.6/metadata-only-watch
✨ Backport support for metadata-only watches to release-0.6
2 parents b3d7cf1 + fa91e1b commit 7fc9110

File tree

11 files changed

+1646
-341
lines changed

11 files changed

+1646
-341
lines changed

pkg/builder/controller.go

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"strings"
2222

2323
"github.com/go-logr/logr"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2425
"k8s.io/apimachinery/pkg/runtime"
2526
"k8s.io/apimachinery/pkg/runtime/schema"
2627
"k8s.io/client-go/rest"
@@ -37,6 +38,17 @@ import (
3738
var newController = controller.New
3839
var getGvk = apiutil.GVKForObject
3940

41+
// project represents other forms that the we can use to
42+
// send/receive a given resource (metadata-only, unstructured, etc)
43+
type objectProjection int
44+
45+
const (
46+
// projectAsNormal doesn't change the object from the form given
47+
projectAsNormal objectProjection = iota
48+
// projectAsMetadata turns this into an metadata-only watch
49+
projectAsMetadata
50+
)
51+
4052
// Builder builds a Controller.
4153
type Builder struct {
4254
forInput ForInput
@@ -68,8 +80,9 @@ func (blder *Builder) ForType(apiType runtime.Object) *Builder {
6880

6981
// ForInput represents the information set by For method.
7082
type ForInput struct {
71-
object runtime.Object
72-
predicates []predicate.Predicate
83+
object runtime.Object
84+
predicates []predicate.Predicate
85+
objectProjection objectProjection
7386
}
7487

7588
// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
@@ -88,8 +101,9 @@ func (blder *Builder) For(object runtime.Object, opts ...ForOption) *Builder {
88101

89102
// OwnsInput represents the information set by Owns method.
90103
type OwnsInput struct {
91-
object runtime.Object
92-
predicates []predicate.Predicate
104+
object runtime.Object
105+
predicates []predicate.Predicate
106+
objectProjection objectProjection
93107
}
94108

95109
// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
@@ -107,9 +121,10 @@ func (blder *Builder) Owns(object runtime.Object, opts ...OwnsOption) *Builder {
107121

108122
// WatchesInput represents the information set by Watches method.
109123
type WatchesInput struct {
110-
src source.Source
111-
eventhandler handler.EventHandler
112-
predicates []predicate.Predicate
124+
src source.Source
125+
eventhandler handler.EventHandler
126+
predicates []predicate.Predicate
127+
objectProjection objectProjection
113128
}
114129

115130
// Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using
@@ -195,19 +210,43 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
195210
return blder.ctrl, nil
196211
}
197212

213+
func (blder *Builder) project(obj runtime.Object, proj objectProjection) (runtime.Object, error) {
214+
switch proj {
215+
case projectAsNormal:
216+
return obj, nil
217+
case projectAsMetadata:
218+
metaObj := &metav1.PartialObjectMetadata{}
219+
gvk, err := getGvk(obj, blder.mgr.GetScheme())
220+
if err != nil {
221+
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
222+
}
223+
metaObj.SetGroupVersionKind(gvk)
224+
return metaObj, nil
225+
default:
226+
panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
227+
}
228+
}
229+
198230
func (blder *Builder) doWatch() error {
199231
// Reconcile type
200-
src := &source.Kind{Type: blder.forInput.object}
232+
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
233+
if err != nil {
234+
return err
235+
}
236+
src := &source.Kind{Type: typeForSrc}
201237
hdler := &handler.EnqueueRequestForObject{}
202238
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
203-
err := blder.ctrl.Watch(src, hdler, allPredicates...)
204-
if err != nil {
239+
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
205240
return err
206241
}
207242

208243
// Watches the managed types
209244
for _, own := range blder.ownsInput {
210-
src := &source.Kind{Type: own.object}
245+
typeForSrc, err := blder.project(own.object, own.objectProjection)
246+
if err != nil {
247+
return err
248+
}
249+
src := &source.Kind{Type: typeForSrc}
211250
hdler := &handler.EnqueueRequestForOwner{
212251
OwnerType: blder.forInput.object,
213252
IsController: true,
@@ -223,10 +262,19 @@ func (blder *Builder) doWatch() error {
223262
for _, w := range blder.watchesInput {
224263
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
225264
allPredicates = append(allPredicates, w.predicates...)
265+
266+
// If the source of this watch is of type *source.Kind, project it.
267+
if srckind, ok := w.src.(*source.Kind); ok {
268+
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
269+
if err != nil {
270+
return err
271+
}
272+
srckind.Type = typeForSrc
273+
}
274+
226275
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
227276
return err
228277
}
229-
230278
}
231279
return nil
232280
}

pkg/builder/controller_test.go

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424

2525
. "github.com/onsi/ginkgo"
2626
. "github.com/onsi/gomega"
27+
"k8s.io/client-go/rest"
28+
"sigs.k8s.io/controller-runtime/pkg/cache"
2729

2830
appsv1 "k8s.io/api/apps/v1"
2931
corev1 "k8s.io/api/core/v1"
@@ -294,8 +296,107 @@ var _ = Describe("application", func() {
294296
})
295297
})
296298

299+
Describe("watching with projections", func() {
300+
var mgr manager.Manager
301+
BeforeEach(func() {
302+
// use a cache that intercepts requests for fully typed objects to
303+
// ensure we use the projected versions
304+
var err error
305+
mgr, err = manager.New(cfg, manager.Options{NewCache: newNonTypedOnlyCache})
306+
Expect(err).NotTo(HaveOccurred())
307+
})
308+
309+
It("should support watching For, Owns, and Watch as metadata", func() {
310+
statefulSetMaps := make(chan *metav1.PartialObjectMetadata)
311+
312+
bldr := ControllerManagedBy(mgr).
313+
For(&appsv1.Deployment{}, OnlyMetadata).
314+
Owns(&appsv1.ReplicaSet{}, OnlyMetadata).
315+
Watches(&source.Kind{Type: &appsv1.StatefulSet{}},
316+
&handler.EnqueueRequestsFromMapFunc{
317+
ToRequests: handler.ToRequestsFunc(func(o handler.MapObject) []reconcile.Request {
318+
ometa := o.Object.(*metav1.PartialObjectMetadata)
319+
statefulSetMaps <- ometa
320+
return nil
321+
}),
322+
},
323+
OnlyMetadata)
324+
325+
doReconcileTest("8", stop, bldr, mgr, true)
326+
327+
By("Creating a new stateful set")
328+
set := &appsv1.StatefulSet{
329+
ObjectMeta: metav1.ObjectMeta{
330+
Namespace: "default",
331+
Name: "test1",
332+
Labels: map[string]string{
333+
"foo": "bar",
334+
},
335+
},
336+
Spec: appsv1.StatefulSetSpec{
337+
Selector: &metav1.LabelSelector{
338+
MatchLabels: map[string]string{"foo": "bar"},
339+
},
340+
Template: corev1.PodTemplateSpec{
341+
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
342+
Spec: corev1.PodSpec{
343+
Containers: []corev1.Container{
344+
{
345+
Name: "nginx",
346+
Image: "nginx",
347+
},
348+
},
349+
},
350+
},
351+
},
352+
}
353+
err := mgr.GetClient().Create(context.TODO(), set)
354+
Expect(err).NotTo(HaveOccurred())
355+
356+
By("Checking that the mapping function has been called")
357+
Eventually(func() bool {
358+
metaSet := <-statefulSetMaps
359+
Expect(metaSet.Name).To(Equal(set.Name))
360+
Expect(metaSet.Namespace).To(Equal(set.Namespace))
361+
Expect(metaSet.Labels).To(Equal(set.Labels))
362+
return true
363+
}).Should(BeTrue())
364+
})
365+
})
297366
})
298367

368+
// newNonTypedOnlyCache returns a new cache that wraps the normal cache,
369+
// returning an error if normal, typed objects have informers requested.
370+
func newNonTypedOnlyCache(config *rest.Config, opts cache.Options) (cache.Cache, error) {
371+
normalCache, err := cache.New(config, opts)
372+
if err != nil {
373+
return nil, err
374+
}
375+
return &nonTypedOnlyCache{
376+
Cache: normalCache,
377+
}, nil
378+
}
379+
380+
// nonTypedOnlyCache is a cache.Cache that only provides metadata &
381+
// unstructured informers.
382+
type nonTypedOnlyCache struct {
383+
cache.Cache
384+
}
385+
386+
func (c *nonTypedOnlyCache) GetInformer(ctx context.Context, obj runtime.Object) (cache.Informer, error) {
387+
switch obj.(type) {
388+
case (*metav1.PartialObjectMetadata):
389+
return c.Cache.GetInformer(ctx, obj)
390+
default:
391+
return nil, fmt.Errorf("did not want to provide an informer for normal type %T", obj)
392+
}
393+
}
394+
func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
395+
return nil, fmt.Errorf("don't try to sidestep the restriction on informer types by calling GetInformerForKind")
396+
}
397+
398+
// TODO(directxman12): this function has too many arguments, and the whole
399+
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time
299400
func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {
300401
deployName := "deploy-name-" + nameSuffix
301402
rsName := "rs-name-" + nameSuffix
@@ -358,8 +459,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
358459
Expect(err).NotTo(HaveOccurred())
359460

360461
By("Waiting for the Deployment Reconcile")
361-
Expect(<-ch).To(Equal(reconcile.Request{
362-
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
462+
Eventually(ch).Should(Receive(Equal(reconcile.Request{
463+
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))
363464

364465
By("Creating a ReplicaSet")
365466
// Expect a Reconcile when an Owned object is managedObjects.
@@ -388,8 +489,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
388489
Expect(err).NotTo(HaveOccurred())
389490

390491
By("Waiting for the ReplicaSet Reconcile")
391-
Expect(<-ch).To(Equal(reconcile.Request{
392-
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
492+
Eventually(ch).Should(Receive(Equal(reconcile.Request{
493+
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))
393494

394495
}
395496

pkg/builder/example_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"os"
2323

24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2425
logf "sigs.k8s.io/controller-runtime/pkg/log"
2526

2627
appsv1 "k8s.io/api/apps/v1"
@@ -34,6 +35,60 @@ import (
3435
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3536
)
3637

38+
func ExampleBuilder_metadata_only() {
39+
logf.SetLogger(zap.New())
40+
41+
var log = logf.Log.WithName("builder-examples")
42+
43+
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
44+
if err != nil {
45+
log.Error(err, "could not create manager")
46+
os.Exit(1)
47+
}
48+
49+
cl := mgr.GetClient()
50+
err = builder.
51+
ControllerManagedBy(mgr). // Create the ControllerManagedBy
52+
For(&appsv1.ReplicaSet{}). // ReplicaSet is the Application API
53+
Owns(&corev1.Pod{}, builder.OnlyMetadata). // ReplicaSet owns Pods created by it, and caches them as metadata only
54+
Complete(reconcile.Func(func(req reconcile.Request) (reconcile.Result, error) {
55+
ctx, cancel := context.WithCancel(context.Background())
56+
defer cancel()
57+
58+
// Read the ReplicaSet
59+
rs := &appsv1.ReplicaSet{}
60+
err := cl.Get(ctx, req.NamespacedName, rs)
61+
if err != nil {
62+
return reconcile.Result{}, client.IgnoreNotFound(err)
63+
}
64+
65+
// List the Pods matching the PodTemplate Labels, but only their metadata
66+
var podsMeta metav1.PartialObjectMetadataList
67+
err = cl.List(ctx, &podsMeta, client.InNamespace(req.Namespace), client.MatchingLabels(rs.Spec.Template.Labels))
68+
if err != nil {
69+
return reconcile.Result{}, client.IgnoreNotFound(err)
70+
}
71+
72+
// Update the ReplicaSet
73+
rs.Labels["pod-count"] = fmt.Sprintf("%v", len(podsMeta.Items))
74+
err = cl.Update(ctx, rs)
75+
if err != nil {
76+
return reconcile.Result{}, err
77+
}
78+
79+
return reconcile.Result{}, nil
80+
}))
81+
if err != nil {
82+
log.Error(err, "could not create controller")
83+
os.Exit(1)
84+
}
85+
86+
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
87+
log.Error(err, "could not start manager")
88+
os.Exit(1)
89+
}
90+
}
91+
3792
// This example creates a simple application ControllerManagedBy that is configured for ReplicaSets and Pods.
3893
//
3994
// * Create a new application for ReplicaSets that manages Pods owned by the ReplicaSet and calls into

pkg/builder/options.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,42 @@ var _ OwnsOption = &Predicates{}
7676
var _ WatchesOption = &Predicates{}
7777

7878
// }}}
79+
80+
// {{{ For & Owns Dual-Type options
81+
82+
// asProjection configures the projection (currently only metadata) on the input.
83+
// Currently only metadata is supported. We might want to expand
84+
// this to arbitrary non-special local projections in the future.
85+
type projectAs objectProjection
86+
87+
// ApplyToFor applies this configuration to the given ForInput options.
88+
func (p projectAs) ApplyToFor(opts *ForInput) {
89+
opts.objectProjection = objectProjection(p)
90+
}
91+
92+
// ApplyToOwns applies this configuration to the given OwnsInput options.
93+
func (p projectAs) ApplyToOwns(opts *OwnsInput) {
94+
opts.objectProjection = objectProjection(p)
95+
}
96+
97+
// ApplyToWatches applies this configuration to the given WatchesInput options.
98+
func (p projectAs) ApplyToWatches(opts *WatchesInput) {
99+
opts.objectProjection = objectProjection(p)
100+
}
101+
102+
var (
103+
// OnlyMetadata tells the controller to *only* cache metadata, and to watch
104+
// the the API server in metadata-only form. This is useful when watching
105+
// lots of objects, really big objects, or objects for which you only know
106+
// the the GVK, but not the structure. You'll need to pass
107+
// metav1.PartialObjectMetadata to the client when fetching objects in your
108+
// reconciler, otherwise you'll end up with a duplicate structured or
109+
// unstructured cache.
110+
OnlyMetadata = projectAs(projectAsMetadata)
111+
112+
_ ForOption = OnlyMetadata
113+
_ OwnsOption = OnlyMetadata
114+
_ WatchesOption = OnlyMetadata
115+
)
116+
117+
// }}}

0 commit comments

Comments
 (0)