Skip to content

Commit e1a1cae

Browse files
authored
Merge pull request kubernetes-sigs#1256 from vincepri/backport-metadata-only-05
✨ Backport release-0.5: Support metadata-only watches
2 parents 9dc7370 + 8da4f34 commit e1a1cae

File tree

12 files changed

+1637
-73
lines changed

12 files changed

+1637
-73
lines changed

pkg/builder/controller.go

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"strings"
2222

23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2324
"k8s.io/apimachinery/pkg/runtime"
2425
"k8s.io/client-go/rest"
2526
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
@@ -155,20 +156,42 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
155156
return blder.ctrl, nil
156157
}
157158

159+
func (blder *Builder) project(obj runtime.Object) (runtime.Object, error) {
160+
switch o := obj.(type) {
161+
case *onlyMetadataWrapper:
162+
metaObj := &metav1.PartialObjectMetadata{}
163+
gvk, err := getGvk(o.Object, blder.mgr.GetScheme())
164+
if err != nil {
165+
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
166+
}
167+
metaObj.SetGroupVersionKind(gvk)
168+
return metaObj, nil
169+
default:
170+
return obj, nil
171+
}
172+
}
173+
158174
func (blder *Builder) doWatch() error {
159175
// Reconcile type
160-
src := &source.Kind{Type: blder.apiType}
161-
hdler := &handler.EnqueueRequestForObject{}
162-
err := blder.ctrl.Watch(src, hdler, blder.predicates...)
176+
apiType, err := blder.project(blder.apiType)
163177
if err != nil {
164178
return err
165179
}
180+
src := &source.Kind{Type: apiType}
181+
hdler := &handler.EnqueueRequestForObject{}
182+
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
183+
return err
184+
}
166185

167186
// Watches the managed types
168187
for _, obj := range blder.managedObjects {
169-
src := &source.Kind{Type: obj}
188+
typeForSrc, err := blder.project(obj)
189+
if err != nil {
190+
return err
191+
}
192+
src := &source.Kind{Type: typeForSrc}
170193
hdler := &handler.EnqueueRequestForOwner{
171-
OwnerType: blder.apiType,
194+
OwnerType: apiType,
172195
IsController: true,
173196
}
174197
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
@@ -178,10 +201,17 @@ func (blder *Builder) doWatch() error {
178201

179202
// Do the watch requests
180203
for _, w := range blder.watchRequest {
204+
// If the source of this watch is of type *source.Kind, project it.
205+
if srckind, ok := w.src.(*source.Kind); ok {
206+
typeForSrc, err := blder.project(srckind.Type)
207+
if err != nil {
208+
return err
209+
}
210+
srckind.Type = typeForSrc
211+
}
181212
if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
182213
return err
183214
}
184-
185215
}
186216
return nil
187217
}
@@ -196,7 +226,11 @@ func (blder *Builder) getControllerName() (string, error) {
196226
if blder.name != "" {
197227
return blder.name, nil
198228
}
199-
gvk, err := getGvk(blder.apiType, blder.mgr.GetScheme())
229+
obj, err := blder.project(blder.apiType)
230+
if err != nil {
231+
return "", err
232+
}
233+
gvk, err := getGvk(obj, blder.mgr.GetScheme())
200234
if err != nil {
201235
return "", err
202236
}

pkg/builder/controller_test.go

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323

2424
. "github.com/onsi/ginkgo"
2525
. "github.com/onsi/gomega"
26+
"k8s.io/client-go/rest"
27+
"sigs.k8s.io/controller-runtime/pkg/cache"
2628

2729
appsv1 "k8s.io/api/apps/v1"
2830
corev1 "k8s.io/api/core/v1"
@@ -203,8 +205,107 @@ var _ = Describe("application", func() {
203205
close(done)
204206
}, 10)
205207
})
208+
209+
Describe("watching with projections", func() {
210+
var mgr manager.Manager
211+
BeforeEach(func() {
212+
// use a cache that intercepts requests for fully typed objects to
213+
// ensure we use the projected versions
214+
var err error
215+
mgr, err = manager.New(cfg, manager.Options{NewCache: newNonTypedOnlyCache})
216+
Expect(err).NotTo(HaveOccurred())
217+
})
218+
219+
It("should support watching For, Owns, and Watch as metadata", func() {
220+
statefulSetMaps := make(chan *metav1.PartialObjectMetadata)
221+
222+
bldr := ControllerManagedBy(mgr).
223+
For(OnlyMetadata(&appsv1.Deployment{})).
224+
Owns(OnlyMetadata(&appsv1.ReplicaSet{})).
225+
Watches(&source.Kind{Type: OnlyMetadata(&appsv1.StatefulSet{})},
226+
&handler.EnqueueRequestsFromMapFunc{
227+
ToRequests: handler.ToRequestsFunc(func(o handler.MapObject) []reconcile.Request {
228+
ometa := o.Object.(*metav1.PartialObjectMetadata)
229+
statefulSetMaps <- ometa
230+
return nil
231+
}),
232+
})
233+
234+
doReconcileTest("8", stop, bldr, mgr, true)
235+
236+
By("Creating a new stateful set")
237+
set := &appsv1.StatefulSet{
238+
ObjectMeta: metav1.ObjectMeta{
239+
Namespace: "default",
240+
Name: "test1",
241+
Labels: map[string]string{
242+
"foo": "bar",
243+
},
244+
},
245+
Spec: appsv1.StatefulSetSpec{
246+
Selector: &metav1.LabelSelector{
247+
MatchLabels: map[string]string{"foo": "bar"},
248+
},
249+
Template: corev1.PodTemplateSpec{
250+
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
251+
Spec: corev1.PodSpec{
252+
Containers: []corev1.Container{
253+
{
254+
Name: "nginx",
255+
Image: "nginx",
256+
},
257+
},
258+
},
259+
},
260+
},
261+
}
262+
err := mgr.GetClient().Create(context.TODO(), set)
263+
Expect(err).NotTo(HaveOccurred())
264+
265+
By("Checking that the mapping function has been called")
266+
Eventually(func() bool {
267+
metaSet := <-statefulSetMaps
268+
Expect(metaSet.Name).To(Equal(set.Name))
269+
Expect(metaSet.Namespace).To(Equal(set.Namespace))
270+
Expect(metaSet.Labels).To(Equal(set.Labels))
271+
return true
272+
}).Should(BeTrue())
273+
})
274+
})
206275
})
207276

277+
// newNonTypedOnlyCache returns a new cache that wraps the normal cache,
278+
// returning an error if normal, typed objects have informers requested.
279+
func newNonTypedOnlyCache(config *rest.Config, opts cache.Options) (cache.Cache, error) {
280+
normalCache, err := cache.New(config, opts)
281+
if err != nil {
282+
return nil, err
283+
}
284+
return &nonTypedOnlyCache{
285+
Cache: normalCache,
286+
}, nil
287+
}
288+
289+
// nonTypedOnlyCache is a cache.Cache that only provides metadata &
290+
// unstructured informers.
291+
type nonTypedOnlyCache struct {
292+
cache.Cache
293+
}
294+
295+
func (c *nonTypedOnlyCache) GetInformer(obj runtime.Object) (cache.Informer, error) {
296+
switch obj.(type) {
297+
case (*metav1.PartialObjectMetadata):
298+
return c.Cache.GetInformer(obj)
299+
default:
300+
return nil, fmt.Errorf("did not want to provide an informer for normal type %T", obj)
301+
}
302+
}
303+
func (c *nonTypedOnlyCache) GetInformerForKind(gvk schema.GroupVersionKind) (cache.Informer, error) {
304+
return nil, fmt.Errorf("don't try to sidestep the restriction on informer types by calling GetInformerForKind")
305+
}
306+
307+
// TODO(directxman12): this function has too many arguments, and the whole
308+
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time
208309
func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {
209310
deployName := "deploy-name-" + nameSuffix
210311
rsName := "rs-name-" + nameSuffix
@@ -267,8 +368,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
267368
Expect(err).NotTo(HaveOccurred())
268369

269370
By("Waiting for the Deployment Reconcile")
270-
Expect(<-ch).To(Equal(reconcile.Request{
271-
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
371+
Eventually(ch).Should(Receive(Equal(reconcile.Request{
372+
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))
272373

273374
By("Creating a ReplicaSet")
274375
// Expect a Reconcile when an Owned object is managedObjects.
@@ -297,8 +398,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
297398
Expect(err).NotTo(HaveOccurred())
298399

299400
By("Waiting for the ReplicaSet Reconcile")
300-
Expect(<-ch).To(Equal(reconcile.Request{
301-
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
401+
Eventually(ch).Should(Receive(Equal(reconcile.Request{
402+
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))
302403

303404
}
304405

pkg/builder/example_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"os"
2323

24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2425
logf "sigs.k8s.io/controller-runtime/pkg/log"
2526

2627
appsv1 "k8s.io/api/apps/v1"
@@ -34,6 +35,60 @@ import (
3435
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3536
)
3637

38+
func ExampleBuilder_metadata_only() {
39+
logf.SetLogger(zap.New())
40+
41+
var log = logf.Log.WithName("builder-examples")
42+
43+
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
44+
if err != nil {
45+
log.Error(err, "could not create manager")
46+
os.Exit(1)
47+
}
48+
49+
cl := mgr.GetClient()
50+
err = builder.
51+
ControllerManagedBy(mgr). // Create the ControllerManagedBy
52+
For(&appsv1.ReplicaSet{}). // ReplicaSet is the Application API
53+
Owns(builder.OnlyMetadata(&corev1.Pod{})). // ReplicaSet owns Pods created by it, and caches them as metadata only
54+
Complete(reconcile.Func(func(req reconcile.Request) (reconcile.Result, error) {
55+
ctx, cancel := context.WithCancel(context.Background())
56+
defer cancel()
57+
58+
// Read the ReplicaSet
59+
rs := &appsv1.ReplicaSet{}
60+
err := cl.Get(ctx, req.NamespacedName, rs)
61+
if err != nil {
62+
return reconcile.Result{}, client.IgnoreNotFound(err)
63+
}
64+
65+
// List the Pods matching the PodTemplate Labels, but only their metadata
66+
var podsMeta metav1.PartialObjectMetadataList
67+
err = cl.List(ctx, &podsMeta, client.InNamespace(req.Namespace), client.MatchingLabels(rs.Spec.Template.Labels))
68+
if err != nil {
69+
return reconcile.Result{}, client.IgnoreNotFound(err)
70+
}
71+
72+
// Update the ReplicaSet
73+
rs.Labels["pod-count"] = fmt.Sprintf("%v", len(podsMeta.Items))
74+
err = cl.Update(ctx, rs)
75+
if err != nil {
76+
return reconcile.Result{}, err
77+
}
78+
79+
return reconcile.Result{}, nil
80+
}))
81+
if err != nil {
82+
log.Error(err, "could not create controller")
83+
os.Exit(1)
84+
}
85+
86+
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
87+
log.Error(err, "could not start manager")
88+
os.Exit(1)
89+
}
90+
}
91+
3792
// This example creates a simple application ControllerManagedBy that is configured for ReplicaSets and Pods.
3893
//
3994
// * Create a new application for ReplicaSets that manages Pods owned by the ReplicaSet and calls into

pkg/builder/options.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package builder
18+
19+
import "k8s.io/apimachinery/pkg/runtime"
20+
21+
// OnlyMetadata tells the controller to *only* cache metadata, and to watch
22+
// the the API server in metadata-only form. This is useful when watching
23+
// lots of objects, really big objects, or objects for which you only know
24+
// the the GVK, but not the structure. You'll need to pass
25+
// metav1.PartialObjectMetadata to the client when fetching objects in your
26+
// reconciler, otherwise you'll end up with a duplicate structured or
27+
// unstructured cache.
28+
func OnlyMetadata(obj runtime.Object) runtime.Object {
29+
return &onlyMetadataWrapper{obj}
30+
}
31+
32+
type onlyMetadataWrapper struct {
33+
runtime.Object
34+
}
35+
36+
// }}}

0 commit comments

Comments
 (0)