Skip to content

Commit f1c36b1

Browse files
committed
⚠️ Add ability for the delegating client to avoid caching objects
A long standing issue in controller runtime, the delegating client is the default client that the manager creates. Whenever a user calls Get/List on a typed object, the internal cache spins up an informer and start watching *all* objects for that group-version-kind. This change introduces the ability for the delegating client to take a list of objects that should always hit the live api-server, and bypass the cache. We also offer the ability to build a manager.NewClientFunc with a new builder that exposes the ability to mark which objects we want to be uncached. Signed-off-by: Vince Prignano <[email protected]>
1 parent af24f3b commit f1c36b1

File tree

5 files changed

+116
-28
lines changed

5 files changed

+116
-28
lines changed

pkg/client/client_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3065,10 +3065,11 @@ var _ = Describe("DelegatingClient", func() {
30653065
cachedReader := &fakeReader{}
30663066
cl, err := client.New(cfg, client.Options{})
30673067
Expect(err).NotTo(HaveOccurred())
3068-
dReader := client.NewDelegatingClient(client.NewDelegatingClientInput{
3068+
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
30693069
CacheReader: cachedReader,
30703070
Client: cl,
30713071
})
3072+
Expect(err).NotTo(HaveOccurred())
30723073
var actual appsv1.Deployment
30733074
key := client.ObjectKey{Namespace: "ns", Name: "name"}
30743075
Expect(dReader.Get(context.TODO(), key, &actual)).To(Succeed())
@@ -3079,10 +3080,11 @@ var _ = Describe("DelegatingClient", func() {
30793080
cachedReader := &fakeReader{}
30803081
cl, err := client.New(cfg, client.Options{})
30813082
Expect(err).NotTo(HaveOccurred())
3082-
dReader := client.NewDelegatingClient(client.NewDelegatingClientInput{
3083+
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
30833084
CacheReader: cachedReader,
30843085
Client: cl,
30853086
})
3087+
Expect(err).NotTo(HaveOccurred())
30863088
dep := &appsv1.Deployment{
30873089
ObjectMeta: metav1.ObjectMeta{
30883090
Name: "deployment1",
@@ -3123,10 +3125,11 @@ var _ = Describe("DelegatingClient", func() {
31233125
cachedReader := &fakeReader{}
31243126
cl, err := client.New(cfg, client.Options{})
31253127
Expect(err).NotTo(HaveOccurred())
3126-
dReader := client.NewDelegatingClient(client.NewDelegatingClientInput{
3128+
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
31273129
CacheReader: cachedReader,
31283130
Client: cl,
31293131
})
3132+
Expect(err).NotTo(HaveOccurred())
31303133
var actual appsv1.DeploymentList
31313134
Expect(dReader.List(context.Background(), &actual)).To(Succeed())
31323135
Expect(1).To(Equal(cachedReader.Called))
@@ -3136,10 +3139,11 @@ var _ = Describe("DelegatingClient", func() {
31363139
cachedReader := &fakeReader{}
31373140
cl, err := client.New(cfg, client.Options{})
31383141
Expect(err).NotTo(HaveOccurred())
3139-
dReader := client.NewDelegatingClient(client.NewDelegatingClientInput{
3142+
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
31403143
CacheReader: cachedReader,
31413144
Client: cl,
31423145
})
3146+
Expect(err).NotTo(HaveOccurred())
31433147

31443148
actual := &unstructured.UnstructuredList{}
31453149
actual.SetGroupVersionKind(schema.GroupVersionKind{

pkg/client/split.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,44 @@ import (
2222
"k8s.io/apimachinery/pkg/api/meta"
2323
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2424
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/apimachinery/pkg/runtime/schema"
26+
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
2527
)
2628

2729
// NewDelegatingClientInput encapsulates the input parameters to create a new delegating client.
2830
type NewDelegatingClientInput struct {
29-
CacheReader Reader
30-
Client Client
31+
CacheReader Reader
32+
Client Client
33+
UncachedObjects []runtime.Object
3134
}
3235

3336
// NewDelegatingClient creates a new delegating client.
3437
//
3538
// A delegating client forms a Client by composing separate reader, writer and
3639
// statusclient interfaces. This way, you can have an Client that reads from a
3740
// cache and writes to the API server.
38-
func NewDelegatingClient(in NewDelegatingClientInput) Client {
41+
func NewDelegatingClient(in NewDelegatingClientInput) (Client, error) {
42+
uncachedGVKs := map[schema.GroupVersionKind]struct{}{}
43+
for _, obj := range in.UncachedObjects {
44+
gvk, err := apiutil.GVKForObject(obj, in.Client.Scheme())
45+
if err != nil {
46+
return nil, err
47+
}
48+
uncachedGVKs[gvk] = struct{}{}
49+
}
50+
3951
return &delegatingClient{
4052
scheme: in.Client.Scheme(),
4153
mapper: in.Client.RESTMapper(),
4254
Reader: &delegatingReader{
4355
CacheReader: in.CacheReader,
4456
ClientReader: in.Client,
57+
scheme: in.Client.Scheme(),
58+
uncachedGVKs: uncachedGVKs,
4559
},
4660
Writer: in.Client,
4761
StatusClient: in.Client,
48-
}
62+
}, nil
4963
}
5064

5165
type delegatingClient struct {
@@ -75,21 +89,37 @@ func (d *delegatingClient) RESTMapper() meta.RESTMapper {
7589
type delegatingReader struct {
7690
CacheReader Reader
7791
ClientReader Reader
92+
93+
uncachedGVKs map[schema.GroupVersionKind]struct{}
94+
scheme *runtime.Scheme
95+
}
96+
97+
func (d *delegatingReader) isUncached(obj runtime.Object) (bool, error) {
98+
gvk, err := apiutil.GVKForObject(obj, d.scheme)
99+
if err != nil {
100+
return false, err
101+
}
102+
_, isUncached := d.uncachedGVKs[gvk]
103+
_, isUnstructured := obj.(*unstructured.Unstructured)
104+
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
105+
return isUncached || isUnstructured || isUnstructuredList, nil
78106
}
79107

80108
// Get retrieves an obj for a given object key from the Kubernetes Cluster.
81109
func (d *delegatingReader) Get(ctx context.Context, key ObjectKey, obj Object) error {
82-
_, isUnstructured := obj.(*unstructured.Unstructured)
83-
if isUnstructured {
110+
if isUncached, err := d.isUncached(obj); err != nil {
111+
return err
112+
} else if isUncached {
84113
return d.ClientReader.Get(ctx, key, obj)
85114
}
86115
return d.CacheReader.Get(ctx, key, obj)
87116
}
88117

89118
// List retrieves list of objects for a given namespace and list options.
90119
func (d *delegatingReader) List(ctx context.Context, list ObjectList, opts ...ListOption) error {
91-
_, isUnstructured := list.(*unstructured.UnstructuredList)
92-
if isUnstructured {
120+
if isUncached, err := d.isUncached(list); err != nil {
121+
return err
122+
} else if isUncached {
93123
return d.ClientReader.List(ctx, list, opts...)
94124
}
95125
return d.CacheReader.List(ctx, list, opts...)

pkg/manager/client.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package manager
18+
19+
import (
20+
"k8s.io/apimachinery/pkg/runtime"
21+
"k8s.io/client-go/rest"
22+
"sigs.k8s.io/controller-runtime/pkg/cache"
23+
"sigs.k8s.io/controller-runtime/pkg/client"
24+
)
25+
26+
// DefaultNewClient creates the default caching client
27+
var DefaultNewClient = NewClientFuncBuilder().Build()
28+
29+
// NewClientBuilder builder is the interface for the builder for manager.NewClientFunc.
30+
type NewClientBuilder interface {
31+
// WithUncached takes a list of runtime objects (plain or lists) that users don't want to cache
32+
// for this client. This function can be called multiple times, it should append to an internal slice.
33+
WithUncached(objs ...runtime.Object)
34+
35+
// Build returns the NewClientFunc that can be used when creating a new manager.
36+
Build() NewClientFunc
37+
}
38+
39+
// NewClientFuncBuilder returns a builder to create a new manager.NewClientFunc to be passed when creating a Manager.
40+
func NewClientFuncBuilder() NewClientBuilder {
41+
return &newClientBuilder{}
42+
}
43+
44+
type newClientBuilder struct {
45+
uncached []runtime.Object
46+
}
47+
48+
func (n *newClientBuilder) WithUncached(objs ...runtime.Object) {
49+
n.uncached = append(n.uncached, objs...)
50+
}
51+
52+
func (n *newClientBuilder) Build() NewClientFunc {
53+
return func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
54+
// Create the Client for Write operations.
55+
c, err := client.New(config, options)
56+
if err != nil {
57+
return nil, err
58+
}
59+
60+
return client.NewDelegatingClient(client.NewDelegatingClientInput{
61+
CacheReader: cache,
62+
Client: c,
63+
UncachedObjects: n.uncached,
64+
})
65+
}
66+
}

pkg/manager/manager.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -503,20 +503,6 @@ func (o Options) setLeaderElectionConfig(obj v1alpha1.ControllerManagerConfigura
503503
return o
504504
}
505505

506-
// DefaultNewClient creates the default caching client
507-
func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
508-
// Create the Client for Write operations.
509-
c, err := client.New(config, options)
510-
if err != nil {
511-
return nil, err
512-
}
513-
514-
return client.NewDelegatingClient(client.NewDelegatingClientInput{
515-
CacheReader: cache,
516-
Client: c,
517-
}), nil
518-
}
519-
520506
// defaultHealthProbeListener creates the default health probes listener bound to the given address
521507
func defaultHealthProbeListener(addr string) (net.Listener, error) {
522508
if addr == "" || addr == "0" {

pkg/runtime/inject/inject_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ var _ = Describe("runtime inject", func() {
8787
})
8888

8989
It("should set client", func() {
90-
client := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewFakeClient()})
90+
client, err := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewFakeClient()})
91+
Expect(err).NotTo(HaveOccurred())
9192

9293
By("Validating injecting client")
9394
res, err := ClientInto(client, instance)
@@ -152,7 +153,8 @@ var _ = Describe("runtime inject", func() {
152153
})
153154

154155
It("should set api reader", func() {
155-
apiReader := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewFakeClient()})
156+
apiReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewFakeClient()})
157+
Expect(err).NotTo(HaveOccurred())
156158

157159
By("Validating injecting client")
158160
res, err := APIReaderInto(apiReader, instance)

0 commit comments

Comments
 (0)